SSE流式响应:从Reactor Flux到生产级AI聊天的工程实践——5分钟超时、线程隔离、背压处理全解析
大家好,我是程序员小策。
首先给大家去一个例子:凌晨两点,P0 告警炸了。
AI 聊天接口全部超时,用户消息发出去转圈转了 120 秒然后报错。你打开监控一看:Tomcat 线程池满了,200 个工作线程全部卡在"等待 AI 回复"这个状态上。新的请求进来直接 reject。
原因很简单——OpenAI 那边的 API 延迟突然飙到了 90 秒。而你的后端用的是同步阻塞调用:httpClient.post(url, body),线程就一直干等着。一个用户等 90 秒,十个用户就是 900 秒·线程,两百个线程全部耗尽只用了不到三分钟。
这就是同步调用大模型在生产上最经典的翻车场景。
今天这篇文章,我就带你看一个生产级的 AI 聊天系统是怎么解决这个问题的——用 Reactor Flux + SSE 把同步阻塞变成异步流式,用独立的线程池把 AI 调用和 Web 请求线程隔离开,用 CountDownLatch 守住最后一道超时防线。
问题定义
AI 聊天的核心矛盾:LLM 响应时间极度不确定(5 秒到 180 秒),而 HTTP 请求线程是稀缺资源(Tomcat 默认 200 个)。
朴素方案是同步阻塞调用:请求进来 → 调 LLM → 等返回 → 返回给前端。在 LLM 响应稳定的情况下(比如 5 秒以内),这个方案没问题。但 LLM 的响应时间取决于模型负载、输入长度、输出长度,波动巨大。一旦延迟飙上去,线程池就被打满。
那必然的改进方向是异步 + 流式。
但这不是简单的"把同步改成异步"就完事了。你还要面对:
- 流式数据如何一帧一帧推给前端?用什么协议?
- AI 调用在哪个线程里执行?会不会影响主线程?
- 如果 AI 调用一直不返回,谁来兜底超时?
- 流式过程中用户刷新页面断开连接,后端怎么感知?
核心概念
SSE(Server-Sent Events):基于 HTTP 协议的单向流式传输,服务端可以持续向客户端推送数据,客户端通过
EventSourceAPI 接收。相比 WebSocket,SSE 更轻量,不需要协议升级,网关/CDN 兼容性更好。相比短轮询,SSE 实时性更高,不需要客户端反复请求。
Flux:Reactor 框架中的响应式流发布者,代表一个包含 0 到 N 个元素的异步序列。
Flux<String>可以逐个推送字符串给订阅者。
FluxSink:Flux 的编程式发射器——你不需要预先准备好所有数据,而是在运行时按需调用
sink.next(data)把数据推出去。
用交通类比理解这个架构:
想象一条高速公路收费站。同步阻塞就是"一辆车到了收费窗口,等它交完钱、找完零、拿完票据、开走,下一辆车才能上来"。高峰期必堵。
异步流式就是"ETC 不停车收费"——车开过去,交易在后台异步完成。多车道并行处理,一辆车堵了不影响其他车道。
在这个系统里:
- 收费站 = HTTP 请求线程(稀缺资源,快速释放)
- ETC 后台 = 独立线程池
ThreadPoolTaskExecutor(专门执行 AI 调用) - 车道 =
FluxSink(每个请求一条车道,互不干扰) - 超时栏杆 =
CountDownLatch.await(5, TimeUnit.MINUTES)(5 分钟没通过就拦截)
实现
看代码。第一步,Controller 层——SSE 的入口:
@RestController@RequestMapping("/api/xunzhi/v1/ai")@RequiredArgsConstructorpublicclassAiMessageController{privatefinalAiMessageServiceaiMessageService;@PostMapping(value="/sessions/{sessionId}/chat",produces=MediaType.TEXT_EVENT_STREAM_VALUE)// ← 告诉浏览器这是SSE流publicFlux<String>chat(@PathVariableStringsessionId,@RequestBodyAiMessageReqDTOrequestParam,@CurrentUserStringusername,HttpServletResponseresponse){response.setHeader("Cache-Control","no-cache");// ← 禁止浏览器缓存SSE数据response.setHeader("Connection","keep-alive");// ← 保持连接response.setHeader("Access-Control-Allow-Origin","*");response.setHeader("Access-Control-Allow-Headers","Cache-Control");requestParam.setSessionId(sessionId);returnaiMessageService.aiChatFlux(requestParam,username);// ← 返回Flux,Spring自动转SSE}}注意produces = MediaType.TEXT_EVENT_STREAM_VALUE——这一句告诉 Spring:“这个接口的返回值不要一次性序列化成 JSON 返回,而是以text/event-stream格式逐条推送。”
第二步,核心——Service 层的流式编排。这是整个系统最重要的一段代码:
@Service@RequiredArgsConstructorpublicclassAiMessageServiceImplimplementsAiMessageService{privatestaticfinalStringDEFAULT_ERROR_CONTENT="Sorry, an error occurred while processing your request.";privatestaticfinalStringUNSUPPORTED_AI_TYPE="Current AI type is not supported";privatefinalAiPropertiesServiceaiPropertiesService;privatefinalAiConversationServiceaiConversationService;privatefinalAiChatHandlerFactoryaiChatHandlerFactory;privatefinalConversationMessageHistoryServiceconversationMessageHistoryService;privatefinalConversationMessagePersistenceServiceconversationMessagePersistenceService;privatefinalConversationStreamingSupportconversationStreamingSupport;privatefinalThreadPoolTaskExecutorthreadPoolTaskExecutor;// ← 独立线程池!@OverridepublicFlux<String>aiChatFlux(AiMessageReqDTOrequestParam,Stringusername){// 参数校验 + 权限校验if(requestParam==null){returnFlux.error(newClientException("request body cannot be empty"));}if(StrUtil.isBlank(requestParam.getSessionId())){returnFlux.error(newClientException("sessionId cannot be empty"));}aiConversationService.requireOwnedConversation(requestParam.getSessionId(),username);requestParam.setUserName(username);returnaiChatFluxInternal(requestParam);}privateFlux<String>aiChatFluxInternal(AiMessageReqDTOrequestParam){returnFlux.create(sink->{// ← Flux.create():编程式创建FluxStringuserMessage=StrUtil.blankToDefault(requestParam.getInputMessage(),"No input");LongaiId=requestParam.getAiId();AIContentAccumulatoraccumulator=newAIContentAccumulator();// ← 内容累积器// 核心!AI调用提交到独立线程池,不占用Tomcat线程threadPoolTaskExecutor.submit(()->processChat(sessionId,aiId,userMessage,sink,accumulator));// 监听前端断开连接事件sink.onCancel(()->log.warn("AI chat flux cancelled, sessionId={}",sessionId));sink.onDispose(()->log.info("AI chat flux disposed, sessionId={}",sessionId));});}}这段代码的设计精髓在三个点:
第一个:Flux.create()而不是Flux.just()。Flux.just("a", "b", "c")是你事先知道所有数据,一次性列出来。但 AI 聊天是流式的——你不可能事先知道 AI 会说什么。Flux.create()给你一个FluxSink,你可以在任何时候调用sink.next(data)推数据,非常灵活。
第二个:threadPoolTaskExecutor.submit()把 AI 调用隔离到独立线程池。这一步极其关键。Tomcat 的工作线程拿到请求后,校验参数、校验权限,然后把 AI 调用丢给独立线程池,自己立刻返回去处理下一个请求。Tomcat 线程不会被阻塞 90 秒。
第三个:sink.onCancel()和sink.onDispose()监听的是前端断开连接。用户刷新页面或关闭标签页时,SSE 连接会断开,后端需要知道并及时释放资源。
第三步,Handler 层的流式发射和超时兜底。这是UniversalAiChatHandler.streamToSink()的核心:
@OverridepublicvoidstreamToSink(AiPropertiesDOaiProperties,StringuserMessage,List<AiMessageHistoryRespDTO>historyMessages,FluxSink<String>sink,AIContentAccumulatoraccumulator)throwsException{ChatClientchatClient=createChatClient(aiProperties);List<Message>messages=buildMessages(aiProperties,userMessage,historyMessages);CountDownLatchlatch=newCountDownLatch(1);// ← 超时防线!finalThrowable[]streamError=newThrowable[1];// ← 异常收集器chatClient.prompt().messages(messages).stream().chatResponse().subscribe(chatResponse->{// 正常收到一个token → 推送给前端Generationgeneration=chatResponse.getResult();if(generation!=null){Stringcontent=generation.getOutput().getText();if(StrUtil.isNotEmpty(content)){AiChatStreamRespDTOresp=AiChatStreamRespDTO.builder().type("content").content(content).build();sink.next(JSON.toJSONString(resp));// ← 推给SSEaccumulator.appendSimpleContent(content);// ← 累积完整回复}// DeepSeek R1 的 reasoning_content 处理 ← 关键!Stringreasoning=null;try{MethodgetReasoningContent=generation.getOutput().getClass().getMethod("getReasoningContent");ObjectreasoningVal=getReasoningContent.invoke(generation.getOutput());if(reasoningVal!=null)reasoning=reasoningVal.toString();}catch(Exceptionignore){}if(reasoning==null){ObjectreasoningObj=generation.getOutput().getMetadata().get("reasoningContent");if(reasoningObj!=null)reasoning=reasoningObj.toString();}if(StrUtil.isNotEmpty(reasoning)){AiChatStreamRespDTOresp=AiChatStreamRespDTO.builder().type("reasoning_content").content(reasoning).build();sink.next(JSON.toJSONString(resp));// ← 深度思考推给前端accumulator.appendReasoningChunk(reasoning.getBytes());}}},error->{log.error("流式响应发生错误",error);streamError[0]=error;sink.error(error);latch.countDown();},latch::countDown// ← 正常完成时countDown);// 核心!最多等5分钟 ← 超时兜底if(!latch.await(5,TimeUnit.MINUTES)){thrownewRuntimeException("AI 响应超时");}if(streamError[0]!=null){thrownewRuntimeException(streamError[0]);}}这段代码里有三个生产级的实践细节:
第一:CountDownLatch作为同步屏障。Spring AI 的stream().subscribe()是异步的——你发起流式调用后,代码会立刻继续往下走。如果不加等待,方法直接返回了,FluxSink 还没收到任何数据。latch.await(5, MINUTES)的意思是:“我等你最多 5 分钟,5 分钟后还没完成就抛超时异常”。
第二:streamError[]数组收集异常。为什么不是直接throw?因为subscribe()的 error 回调是在另一个线程里执行的,直接 throw 不会传播到当前线程。用数组做中转,当前线程在latch.await()返回后检查。
第三:DeepSeek R1 的reasoningContent处理用了反射。因为 Spring AI 的通用AssistantMessage类没有直接暴露reasoningContent字段,但 DeepSeek 的返回里有。代码先用反射去取getReasoningContent()方法,取不到再 fallback 到 metadata 字段——兼容的优雅写法。
边界情况与陷阱
陷阱一:用户刷新页面前端断开连接,FluxSink继续写数据会怎样?
会抛异常。但如果你的代码在收到onCancel信号后还在向 AI 发请求,这个请求就成了"孤儿请求"——白白消耗 Token,结果没人接收。这个系统的防御是:在sink.onCancel()里打日志,并在上层processChat里的sink.isCancelled()检查中跳过推送。
陷阱二:CountDownLatch设 5 分钟够不够?
这取决于你的业务场景。对于 GPT-4 生成一篇长文章,5 分钟可能不够。但注意——CountDownLatch的超时是整个流式过程的总时长,不是单个 token 的等待时间。只要 AI 一直在输出 token(即使很慢),latch就不会超时,因为latch.countDown()只有在流式完成后才会调用。所以 5 分钟超时真正罩住的是"流式调用发起了但一直没有返回任何数据"或者"中间网络断了导致无限等待"的场景。
陷阱三:线程池的拒绝策略。
ThreadPoolTaskExecutor如果配置不当(核心线程数太小、队列太大),高峰期任务会被堆积。默认的AbortPolicy会直接抛异常。这个系统用的是ThreadPoolTaskExecutor,需要去看实际配置有没有设置合理的拒绝策略——比如CallerRunsPolicy(让调用线程自己执行,虽然慢但不会丢任务)。
高级考量:SSE vs WebSocket vs gRPC Stream
当你的 AI 聊天不只是文本,还要支持图片、语音、文件传输时,SSE 还够不够用?
SSE 只支持服务端到客户端的单向推送。如果未来你需要客户端上传语音流、服务端实时转文字并流式返回,那 WebSocket 的双向通信就更合适。但 WebSocket 有它的代价——需要协议升级(HTTP → WS),某些网关/CDN 不兼容,调试也比 SSE 复杂。
gRPC Stream 是另一个选择——双向流,性能比 WebSocket 好,但需要客户端支持 gRPC(通常是后端服务之间通信用,浏览器端不太方便)。
本项目选 SSE 的原因:AI 聊天就是单向的(用户发一条消息 → AI 流式回复),SSE 正好够用,不引入额外复杂度。
对比表格
| 方案 | 实现方式 | 线程模型 | 超时控制 | 前端断开感知 | 适用场景 |
|---|---|---|---|---|---|
| 同步阻塞 | httpClient.post()等返回 | Tomcat线程直接等待 | HTTP超时(粗糙) | HTTP断开抛异常 | 响应<3秒的低延迟API |
| DeferredResult | Spring MVC 异步 | 业务线程池 + Servlet3.0异步 | Future.get(timeout) | onTimeout回调 | 传统Spring MVC项目改异步 |
| WebFlux SSE(本项目) | Flux.create()+FluxSink | 独立线程池,Tomcat线程立刻释放 | CountDownLatch.await(5min) | sink.onCancel() | AI流式聊天、实时推送 |
| WebSocket | WebSocketHandler | 长连接线程池 | 心跳检测 | onClose事件 | 双向实时通信 |
| gRPC Stream | StreamObserver | gRPC线程池 | deadline | onCompleted/onError | 微服务间流式通信 |
面试追问
面试追问 1:为什么用Flux.create()而不是Flux.push()或Flux.generate()?
→ 回答方向:Flux.create()最适合"外部异步源推数据"的场景——你拿到一个FluxSink,可以在任何地方(包括其他线程)调用sink.next()。Flux.generate()是同步的、一次一个地生成数据,不适合这里。Flux.push()是create()的单线程版本,但 AI 回调可能在多个线程里触发。所以Flux.create()是正确选择。
面试追问 2:CountDownLatch等待 5 分钟,如果主线程是 Tomcat 线程,不还是阻塞了吗?
→ 回答方向:问得好,这就是threadPoolTaskExecutor.submit()的关键作用。streamToSink()不是在 Tomcat 线程里执行的——它被提交到了独立线程池。Tomcat 线程在Flux.create()返回后就已经释放了。CountDownLatch.await()阻塞的是独立线程池里的工作线程,不影响 Tomcat 接收新请求。
面试追问 3:AiChatStreamRespDTO有两种 type——content和reasoning_content,前端怎么区分和渲染?
→ 回答方向:前端收到 SSE 事件后,按type字段分流。content类型的渲染到聊天气泡的主区域,reasoning_content类型的渲染到一个可折叠的"思考过程"区域(类似 DeepSeek 官网那种灰色小字)。两种类型交替推送——可能在思考过程中间夹杂正式回复,前端要做好顺序拼接。
总结
流式输出的关键是两个隔离:线程隔离(AI 调用不占用 Web 线程)和时间隔离(超时不依赖 HTTP 超时)。
读完这篇你应该能:
- 用
Flux.create()+FluxSink实现一个 SSE 流式推送接口 - 用
CountDownLatch给异步流式调用加超时兜底 - 理解为什么需要独立线程池来执行 AI 调用
- 在面试时说出"FluxSink 编程式发射 + 线程池隔离 + CountDownLatch 超时"而不只是"用了 WebFlux"
