Spring Boot中使用长轮询推送消息
Spring Boot中使用长轮询推送消息
长轮询是对上边短轮询的一种改进版本,在尽可能减少对服务器资源浪费的同时,保证消息的相对实时性。长轮询在中间件中应用的很广泛,比如Nacos
和apollo
配置中心,消息队列kafka
、RocketMQ
中都有用到长轮询。
这次我使用apollo
配置中心实现长轮询的方式,应用了一个类DeferredResult
,它是在servelet3.0
后经过Spring封装提供的一种异步请求机制,直意就是延迟结果。
DeferredResult
可以允许容器线程快速释放占用的资源,不阻塞请求线程,以此接受更多的请求提升系统的吞吐量,然后启动异步工作线程处理真正的业务逻辑,处理完成调用DeferredResult.setResult(xxx)
提交响应结果。
controller
@Controller
@RequestMapping("/polling")
public class PollingController {
// 存放监听某个Id的长轮询集合
// 线程同步结构
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/**
* 超时时间 10秒, 实际根据业务情况设置
*/
private static final long TIME_OUT = 10000L;
/**
* 设置监听
*/
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// 延迟对象设置超时时间
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// 异步请求完成时移除 key,防止内存溢出
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// 注册长轮询请求
watchRequests.put(id, deferredResult);
return deferredResult;
}
/**
* 变更数据
*/
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// 数据变更 取出监听ID的所有长轮询请求,并一一响应处理
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult("我更新了" + LocalDateTime.now());
}
return "success";
}
return "fail";
}
}
请求超过设置的超时时间,会抛出AsyncRequestTimeoutException
异常,这里直接用@ControllerAdvice
全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。
@Slf4j
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
log.error("AsyncRequestTimeoutException", e);
return "304";
}
}
前端使用:
首先发起长轮询请求/polling/watch/10086
监听消息更变,请求被挂起,不变更数据直至超时,再次发起了长轮询请求;紧接着手动变更数据/polling/publish/10086
,长轮询得到响应,前端处理业务逻辑完成后再次发起请求,如此循环往复。
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。