# Http Long Polling

关键词: HTTP长轮询 Spring Web MVC Asynchronous Requests(异步请求) DeferredResult Callable

# 数据交互两种模式

# Push(推模式)

推模式指的是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。

# Pull(拉模式)

拉模式指的是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

# 异步请求

# Servlet异步请求处理的非常简洁的概述:(Nacos)

  • 一个servlet请求ServletRequest可以通过调用request.startAsync() 方法而进入异步模式。这样做的主要效果是Servlet(以及任何过滤器)都可以退出,但是响应(response)仍然是开放的,以便稍后处理完成。
  • request.startAsync()返回一个AsyncContext对象,可用它进一步控制异步处理。例如,它提供了dispatch方法,类似于Servlet API的转发(forward),只是它允许应用程序在Servlet容器线程上恢复请求处理。
  • ServletRequest提供了获取当前DispatcherType的方式,可以使用它来区分处理初始请求、异步调度、转发和其他调度程序类型。

# DeferredResult 处理流程如下:(Apollo)

  • Controller先返回一个DeferredResult对象,并将其保存在内存队列或列表中,以便访问它。
  • Spring MVC 调用request.startAsync()
  • 同时,DispatcherServlet和所有配置的过滤器退出请求处理线程,但响应保持打开状态。
  • 在应用程序中的某一个线程设置DeferredResult,然后Spring MVC 调度request请求返回到Servlet容器中。
  • DispatcherServlet再次调用,恢复对该异步返回结果的处理。

# Callable 处理流程如下:

  • Controller先返回一个Callable对象。
  • Spring MVC调用request.startAsync()并把该Callable对象提交给独立线程的执行器TaskExecutor处理
  • 同时,DispatcherServlet和所有过滤器退出Servlet容器线程,但响应保持打开状态。
  • 最终,Callable生成一个返回结果,此时Spring MVC将重新把请求发送回Servlet容器,恢复处理。
  • DispatcherServlet再次被调用,恢复对该异步返回结果的处理。

# polling 和 long polling 的区别

这里暂时抛开某些场景 webSocket 的解决方案

举一个栗子来说明长轮询的好处,例如携程Apollo配置中心,怎么实时查询配置中心有数据更新呢?pollinglong polling的方式分别如下:

  • polling: 轮询会每隔1s去向服务器发起一次查询请求,返回是否有数据更新,数据最长有1s的延时。
  • long polling: 首先发起查询请求,服务端没有更新的话就不回复,直到60s后或者有数据变更时立即返回给客户端,客户端收到服务器响应后,立即发起下一次请求。长轮询保证了数据变更获取的实时性,也极大的较少了与服务器的交互,基于web异步处理技术,大大的提升了服务性能。

延伸思考

long polling的方式和发布订阅的模式的不同点有哪些?

# 携程Apollo配置中心 Http Long Polling 的具体实现

服务端:

// com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2.java

@RestController
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {
    
    // ...

    @GetMapping
    public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
            @RequestParam(value = "appId") String appId,
            @RequestParam(value = "cluster") String cluster,
            @RequestParam(value = "notifications") String notificationsAsString,
            @RequestParam(value = "dataCenter", required = false) String dataCenter,
            @RequestParam(value = "ip", required = false) String clientIp) {
        List<ApolloConfigNotification> notifications = null;

        try {
            notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);
        } catch (Throwable ex) {
            Tracer.logError(ex);
        }

        if (CollectionUtils.isEmpty(notifications)) {
            throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
        }

        Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);

        if (CollectionUtils.isEmpty(filteredNotifications)) {
            throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
        }

        DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
        Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
        Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());

        for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
            String normalizedNamespace = notificationEntry.getKey();
            ApolloConfigNotification notification = notificationEntry.getValue();
            namespaces.add(normalizedNamespace);
            clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
            if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
                deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
            }
        }

        Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
        
        // ...

        return deferredResultWrapper.getResult();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

客户端:

com.ctrip.framework.apollo.internals.RemoteConfigLongPollService.java

public class RemoteConfigLongPollService {
    
    // ...

    private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
        final Random random = new Random();
        ServiceDTO lastServiceDto = null;
        while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
            if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
                //wait at most 5 seconds
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                }
            }
            Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
            String url = null;
            try {
                if (lastServiceDto == null) {
                    List<ServiceDTO> configServices = getConfigServices();
                    lastServiceDto = configServices.get(random.nextInt(configServices.size()));
                }

                url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications);

                logger.debug("Long polling from {}", url);

                HttpRequest request = new HttpRequest(url);
                request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
                if (!StringUtils.isBlank(secret)) {
                    Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
                    request.setHeaders(headers);
                }

                transaction.addData("Url", url);

                final HttpResponse<List<ApolloConfigNotification>> response = m_httpUtil.doGet(request, m_responseType);

                logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
                if (response.getStatusCode() == 200 && response.getBody() != null) {
                    updateNotifications(response.getBody());
                    updateRemoteNotifications(response.getBody());
                    transaction.addData("Result", response.getBody().toString());
                    notify(lastServiceDto, response.getBody());
                }

                //try to load balance
                if (response.getStatusCode() == 304 && random.nextBoolean()) {
                    lastServiceDto = null;
                }

                m_longPollFailSchedulePolicyInSecond.success();
                transaction.addData("StatusCode", response.getStatusCode());
                transaction.setStatus(Transaction.SUCCESS);
            } catch (Throwable ex) {
                lastServiceDto = null;
                Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
                transaction.setStatus(ex);
                long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
                logger.warn(
                        "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
                        sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
                try {
                    TimeUnit.SECONDS.sleep(sleepTimeInSecond);
                } catch (InterruptedException ie) {
                    //ignore
                }
            } finally {
                transaction.complete();
            }
        }
    }

    // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

Apollo客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。

长连接实际上我们是通过Http Long Polling实现的,具体而言:

  1. 客户端发起一个Http请求到服务端
  2. 服务端会保持住这个连接60秒
    • 如果在60秒内有客户端关心的配置变化,被保持住的客户端请求会立即返回,并告知客户端有配置变化的namespace信息,客户端会据此拉取对应namespace的最新配置
    • 如果在60秒内没有客户端关心的配置变化,那么会返回Http状态码304给客户端
  3. 客户端在收到服务端请求后会立即重新发起连接,回到第一步

考虑到会有数万客户端向服务端发起长连,在服务端我们使用了async servlet(Spring DeferredResult)来服务Http Long Polling请求。

# Nacos 配置中心 Http Long Polling 的具体实现

https://nacos.io/zh-cn/ (opens new window) | Nacos 架构 (opens new window)

Nacos 配置中心的几个核心概念:dataIdgroupnamespace,它们的层级关系如下图: Nacos数据模型

  • dataId:是配置中心里最基础的单元,它是一种key-value结构,key通常是我们的配置文件名称,比如:application.ymlmybatis.xml,而value是整个文件下的内容。
  • groupdataId配置的分组管理,比如同在dev环境下开发,但同环境不同分支需要不同的配置数据,这时就可以用分组隔离,默认分组DEFAULT_GROUP
  • namespace:项目开发过程中肯定会有devtestpro等多个不同环境,namespace则是对不同环境进行隔离,默认所有配置都在public里。

TIP

下边以Nacos 2.0.1 版本源码分析,2.0以后的版本改动较多,和网上的很多资料略有些不同 地址:https://github.com/alibaba/nacos/releases/tag/2.0.1 (opens new window)

# PC 端实时查询用户个人消息 Http Long Polling 的具体实现 💯

定义 DeferredResult 封装器

public class UserNewMessageDeferredResultWrapper implements Comparable<UserNewMessageDeferredResultWrapper> {


    private DeferredResult<UserNewMessageDTO> result;

    public UserNewMessageDeferredResultWrapper(long timeoutInMilli) {
        result = new DeferredResult<>(timeoutInMilli);
    }

    public void onTimeout(Runnable timeoutCallback) {
        result.onTimeout(timeoutCallback);
    }

    public void onCompletion(Runnable completionCallback) {
        result.onCompletion(completionCallback);
    }

    public void setResult(UserNewMessageDTO userNewMessage) {
        result.setResult(userNewMessage);
    }


    public DeferredResult<UserNewMessageDTO> getResult() {
        return result;
    }

    @Override
    public int compareTo(@NonNull UserNewMessageDeferredResultWrapper deferredResultWrapper) {
        return Integer.compare(this.hashCode(), deferredResultWrapper.hashCode());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Controller接口定义返回DeferredResult,具体实现如下:

    // ...

    @ApiOperation("新消息 - HTTP LONG POLLING")
    @GetMapping("/new")
    public DeferredResult<UserNewMessageDTO> pollUserNewMessage(
            @RequestAttribute("userId") Long userId,
            @RequestParam(value = "newMessageId", required = false) Long newMessageId) {
        
        // `DeferredResult` 封装器,设置超时时间
        UserNewMessageDeferredResultWrapper deferredResultWrapper = new UserNewMessageDeferredResultWrapper(CommonConstants.DEFAULT_LONG_POLLING_TIMEOUT);
        
        //查询当前用户的新消息
        UserNewMessageDTO userNewMessage = this.qaUserMessageServiceFacade.findUserNewMessage(userId);
        //比较当前新消息是否最新的消息,如果是立即返回,如果不是,则等待, 其中setResult方法是关键
        if (Objects.nonNull(userNewMessage.getId()) && ObjectUtil.notEqual(userNewMessage.getId(), newMessageId)) {
            deferredResultWrapper.setResult(userNewMessage);
        } else {
            // 没有新消息,进入等待中的状态,保存当前请求的 UserNewMessageDeferredResultWrapper
            this.qaUserMessageServiceFacade.process(newMessageId, userId, deferredResultWrapper);
        }

        return deferredResultWrapper.getResult();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

process 方法处理逻辑如下:

     // ...

    @Override
    public void process(@Nullable Long newMessageId, Long userId, UserNewMessageDeferredResultWrapper deferredResultWrapper) {
        String watchedKey = CacheConstants.getUserMessageWatchedKey(userId);
        
        // 设置超时后,接口的返回值
        deferredResultWrapper.onTimeout(() -> deferredResultWrapper.setResult(new UserNewMessageDTO(HttpStatus.NOT_MODIFIED.value(), newMessageId)));
        
        // 设置完成后,移除缓存中的deferredResultWrapper
        deferredResultWrapper.onCompletion(() -> deferredResults.remove(watchedKey, deferredResultWrapper));
            
        // 缓存deferredResultWrapper,当有新消息时会根据watchedKey来获取deferredResultWrapper
        deferredResults.put(watchedKey, deferredResultWrapper);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

用户有新消息的处理逻辑如下:

     // ...
     @Override
    public void settingResult(@NonNull Long userId, @NonNull UserNewMessageDTO userNewMessage) {
        String watchedKey = CacheConstants.getUserMessageWatchedKey(userId);
        if (deferredResults.containsKey(watchedKey)) {
            // 获取到UserNewMessageDeferredResultWrapper,在调用setResult方法设置值,前面的接口立即返回值
            List<UserNewMessageDeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(watchedKey));
            if (CollectionUtil.isNotEmpty(results)) {
                results.forEach(result -> result.setResult(userNewMessage));
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12

由于我们日常工作中,所有的服务都是集群部署的,那么有可能缓存的deferredResultWrapper和处理新消息的线程不在一个服务中,导致无法调用setResult方法设置值。 所以当有新消息的情况下,需要使用广播的方式通知集群下所有的服务都执行新消息处理逻辑。

# Long Polling 的实现为什么需要设置超时时间

主要原因是网络传输层主要走的是tcp协议tcp协议是可靠面向连接的协议,通过三次握手建立连接。但是所建立的连接是虚拟的, 可能存在某段时间网络不通,或者服务端程序非正常关闭,亦或服务端机器非正常关机,面对这些情况客户端根本不知道服务端此时已经不能互通, 还在傻傻的等服务端发数据过来,而这一等一般都是很长时间。

当然tcp协议在实现上有保活计时器来保证的,但是等到保活计时器发现连接已经断开需要很长时间,如果没有专门配置过相关的tcp参数,一般需要2个小时,而且这些参数是机器操作系统层面。 所以,以此方式来保活不太靠谱,故Long Polling的实现上一般是需要设置超时时间的。


参考文档

Last Updated: 20 days ago