当前位置: 首页 > news >正文

Spring AI实战:实现流式对话中的会话终止功能

前言

在AI对话系统中,流式响应(Streaming)已成为提升用户体验的重要技术。然而,当用户面对长时间生成的回复时,往往希望能够在中途终止对话。本文将详细介绍如何在基于Spring AI的项目中实现流式对话的会话终止功能,完整保存对话上下文,确保系统资源得到合理释放。

一、需求背景

在我们的AI对话平台中,当用户发起一个复杂问题时,AI可能需要较长时间才能生成完整回复。如果用户在等待过程中发现提问不准确或已获得足够信息,希望能够随时终止当前会话。这个功能需要解决以下挑战:

  1. 安全中断正在生成的AI响应流
  2. 保存已生成的部分内容到对话历史
  3. 释放相关系统资源
  4. 保持对话上下文的完整性

二、整体架构设计

我们采用了分层架构来实现会话终止功能:

  • Controller层:提供/stop接口,接收会话终止请求
  • Service层:处理业务逻辑,包括会话状态检查、资源清理和记忆保存
  • Handler层ChatSessionHandler负责管理活动会话和流式响应
  • 存储层:Redis存储会话状态和消息内容,ChatMemory管理对话上下文

三、核心实现代码与解析

1. 会话管理器:ChatSessionHandler

ChatSessionHandler是整个功能的核心,它负责跟踪活动会话、收集部分响应并处理取消操作:

@Slf4j @Component public class ChatSessionHandler { // 存储活动会话 private final Map<String, Disposable> activeSessions = new ConcurrentHashMap<>(); // 存储SSE连接 private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>(); // 会话取消标志 private final Map<String, AtomicBoolean> cancellationFlags = new ConcurrentHashMap<>(); // 存储部分响应 private final Map<String, List<String>> partialResponses = new ConcurrentHashMap<>(); // 注册新会话 public void registerSession(String sessionId, Disposable disposable, SseEmitter emitter) { activeSessions.put(sessionId, disposable); emitterMap.put(sessionId, emitter); cancellationFlags.put(sessionId, new AtomicBoolean(false)); partialResponses.put(sessionId, new ArrayList<>()); emitter.onTimeout(() -> cleanupSession(sessionId)); emitter.onCompletion(() -> cleanupSession(sessionId)); } // 收集AI的部分响应 public void collectPartialResponse(String sessionId, String chunk) { if (partialResponses.containsKey(sessionId)) { partialResponses.get(sessionId).add(chunk); } } // 取消会话 public void cancelSession(String sessionId, String userId, UserMessage userMessage) { // 标记会话已取消 if (cancellationFlags.containsKey(sessionId)) { cancellationFlags.get(sessionId).set(true); } // 保存聊天记忆 saveChatMemoryOnCancel(sessionId, userId, userMessage); // 取消流处理 Disposable disposable = activeSessions.remove(sessionId); if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } // 关闭SSE连接并发送终止消息 SseEmitter emitter = emitterMap.remove(sessionId); if (emitter != null) { try { emitter.send(SseEmitter.event().data(SseResponse.interrupted(sessionId, "[对话已终止]"))); emitter.complete(); } catch (IOException e) { log.error("SSE连接关闭失败: {}", sessionId, e); try { emitter.completeWithError(e); } catch (Exception ex) { log.error("无法完成错误状态", ex); } } } // 清理会话 cleanupSession(sessionId); } // 保存取消时的聊天记忆 private void saveChatMemoryOnCancel(String sessionId, String userId, UserMessage userMessage) { try { // redis key String memoryIdWithUserId = userId + ":u:" + sessionId; // 1. 获取AI已生成的部分响应 String aiPartialResponse = getCompleteResponse(sessionId); log.info("终止会话时AI已生成的响应: {}", aiPartialResponse); // 2. 构造消息序列 List<Message> messagesToAdd = new ArrayList<>(); log.info("添加用户消息到记忆: {}", userMessage.getText()); // 添加AI的已生成内容 if (StrUtil.isNotBlank(aiPartialResponse)) { AssistantMessage aiMessage = new AssistantMessage(aiPartialResponse + "[对话被用户终止]"); messagesToAdd.add(aiMessage); log.info("添加AI部分响应到记忆: {}", aiPartialResponse); } else { // 即使没有AI响应,也要添加一个终止标记 AssistantMessage aiMessage = new AssistantMessage("[对话被用户终止,无AI响应]"); messagesToAdd.add(aiMessage); log.info("添加终止标记到记忆"); } // 调用ChatMemory保存记忆 chatMemory.add(memoryIdWithUserId, messagesToAdd); log.info("成功保存终止会话的记忆,会话ID: {}, 消息数量: {}", sessionId, messagesToAdd.size()); } catch (Exception e) { log.error("保存终止会话记忆失败,会话ID: {}", sessionId, e); } } }

关键点解析:

  • 使用ConcurrentHashMap保证线程安全,避免并发问题
  • 通过Disposable对象管理响应式流的生命周期
  • partialResponses集合实时收集AI生成的部分内容
  • 会话取消时,添加特殊标记[对话被用户终止],明确标识对话状态

2. 服务层:会话终止业务逻辑

Service层的stop方法处理会话终止的核心业务流程:

@Override public String stop(String requestId) { AssertUtil.notNull("会话id不能为空!", requestId); try { // 1. 从Redis获取会话信息 String sessionData = stringRedisTemplate.opsForValue().get("session:" + requestId); if (StrUtil.isBlank(sessionData)) { throw new ServiceException("会话不存在或已过期"); } // 2. 获取当前用户 LoginUser currentUser = UserUtil.getUser(); if (currentUser == null) { throw new ServiceException("用户未登录"); } Integer userId = currentUser.getId(); // 3. 获取用户消息 UserMessage userMessage = getUserMessageFromSession(userId, requestId); // 4. 检查并取消会话 if (chatSessionHandler.isSessionActive(requestId)) { chatSessionHandler.cancelSession(requestId, userId.toString(), userMessage); } else { log.info("会话 {} 不处于活动状态,直接清理资源", requestId); // 即使会话不活跃,也尝试保存记忆 try { String memoryId = userId + ":u:" + requestId; String aiPartialResponse = chatSessionHandler.getCompleteResponse(requestId); if (StrUtil.isNotBlank(aiPartialResponse)) { List<Message> messagesToAdd = new ArrayList<>(); messagesToAdd.add(userMessage); messagesToAdd.add(new AssistantMessage(aiPartialResponse)); chatMemory.add(memoryId, messagesToAdd); log.info("为非活动会话保存了部分响应,会话ID: {}", requestId); } } catch (Exception e) { log.warn("保存非活动会话记忆失败,会话ID: {}", requestId, e); } } // 5. 清理Redis中的会话数据 stringRedisTemplate.delete("session:" + requestId); stringRedisTemplate.delete("chat:message:" + requestId); log.info("会话已成功终止,ID: {}", requestId); return sessionData; } catch (Exception e) { log.error("终止会话失败,ID: {}", requestId, e); return null; } }

关键点解析:

  • 全面的参数校验和空值处理
  • 智能判断会话状态,处理已结束会话的边缘情况
  • 完整的资源清理策略,防止内存泄漏
  • 详细的日志记录,便于问题追踪

3. 获取用户消息:处理复杂上下文

在会话终止时,需要获取原始用户消息,这需要从多个可能的数据源检索:

private UserMessage getUserMessageFromSession(Integer userId, String sessionId) { try { // 1. 从聊天记忆中获取该会话的完整历史 String memoryId = userId + ":u:" + sessionId; List<Message> messages = chatMemory.get(memoryId, -1); // -1 表示获取所有历史 // 2. 从历史记录中找到最后一条用户消息 if (messages != null && !messages.isEmpty()) { // 逆序遍历,找到最后一条用户消息 for (int i = messages.size() - 1; i >= 0; i--) { Message message = messages.get(i); if (message instanceof UserMessage) { return (UserMessage) message; } } } // 3. 如果没有找到用户消息,尝试从Redis中获取原始请求 String messageKey = "chat:message:" + sessionId; String messageData = stringRedisTemplate.opsForValue().get(messageKey); if (StrUtil.isNotBlank(messageData)) { try { // 尝试解析存储的原始消息 return new ObjectMapper().readValue(messageData, UserMessage.class); } catch (Exception e) { log.warn("解析Redis中存储的用户消息失败,使用默认消息", e); } } // 4. 作为最后的备选,返回一个通用的用户消息 UserMessage defaultMessage = new UserMessage("[已终止的对话]"); return defaultMessage; } catch (Exception e) { log.error("获取用户消息失败,会话ID: {}", sessionId, e); // 出错时返回一个安全的默认消息 UserMessage fallbackMessage = new UserMessage("对话在" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "被终止"); return fallbackMessage; } }

关键点解析:

  • 多层回退机制确保消息可用性
  • 从对话历史中精准定位最后一条用户消息
  • 异常情况下的优雅降级策略
  • 详细的时间戳记录增强可追溯性

4. Controller端点:提供终止接口

最后,我们在Controller中暴露一个简单的REST端点:

@Operation(summary = "会话终止") @GetMapping(value = "/stop") public Result<?> stop(@RequestParam(value = "chatId") String requestId) { String emitter = chatService.stop(requestId); return R.successWithData("会话已成功终止", emitter); }

四、实现难点与解决方案

1. 并发安全问题

挑战:多个请求可能同时操作同一个会话。解决方案:使用ConcurrentHashMapAtomicBoolean确保线程安全,避免并发修改异常。

2. 资源泄漏风险

挑战:流式响应不正确关闭会导致资源泄漏。解决方案:在SSE emitter上注册onTimeoutonCompletion回调,确保会话清理;显式调用dispose()方法释放响应式流。

3. 对话上下文完整性

挑战:会话中断后,对话历史不完整,影响后续对话质量。解决方案:收集并保存已生成的部分响应,添加明确的终止标记,保证上下文连贯性。

4. 边界情况处理

挑战:会话可能已自然结束,或用户多次触发终止。解决方案:通过isSessionActive()方法检查会话状态,针对不同状态采用不同处理策略。

五、优化建议

  1. 超时机制增强:为会话设置合理的超时时间,自动清理长时间无活动的会话
  2. 前端集成:设计优雅的UI反馈,让用户明确知道会话已终止
  3. 监控指标:添加会话终止率等监控指标,分析用户行为
  4. 资源回收优化:定期扫描并清理陈旧的会话数据,减少内存占用

六、总结

本文详细介绍了基于Spring AI实现流式对话中会话终止功能的技术方案。通过合理设计会话生命周期管理、部分响应收集和资源清理机制,我们能够为用户提供流畅的对话体验,同时确保系统资源的高效利用。这个功能的实现不仅提升了用户体验,也为构建更智能、更响应式的AI对话系统奠定了基础。

在AI应用日益普及的今天,关注用户体验的每个细节,包括如何优雅地结束对话,都是构建成功产品的关键。希望本文的分享能为同样在AI应用开发道路上探索的开发者提供有价值的经验。


技术栈:Spring AI、Spring Boot 3.x、Redis、Reactor、SSE
适用场景:AI聊天应用、智能客服系统、对话式AI产品

http://www.jsqmd.com/news/249244/

相关文章:

  • 数据 “活” 起来!宏智树 AI 解锁论文数据分析零门槛通关秘籍
  • AI质检驱动质量革命:从被动救火到主动免疫的体系重构
  • 宏智树 AI:ChatGPT 学术版驱动的智能论文写作全流程解决方案
  • 量子计算+AI测试:质量保障的降维打击时代来临
  • 基于深度学习的过敏原食品检测系统(YOLOv8+YOLO数据集+UI界面+Python项目+模型)
  • 当测试用例撞上伦理高墙:AI质量保障体系的致命缺口
  • 投稿不再石沉大海!宏智树 AI 解锁期刊论文录用密码
  • 独家解读:OpenAI内部测试体系的致命缺陷
  • 【论文自动阅读】X-VLA: Soft-Prompted Transformer as Scalable Cross-Embodiment Vision-Language-Action Model
  • 当DevOps遇上AI:持续测试的核动力引擎已点火
  • 基于深度学习的条形码检测系统(YOLOv8+YOLO数据集+UI界面+Python项目+模型)
  • LU,智能冷板仪 冷板仪 大小鼠冷热板仪
  • LabVIEW实现网口TCP通讯西门子PLC全系列,超神玩法
  • Java毕设项目:基于Java+SpringBoot的药店药品库存销售管理系统设计与实现基于SpringBoot的药店管理系统设计与实现(源码+文档,讲解、调试运行,定制等)
  • Java计算机毕设之基于SpringBoot的药店药品管理系统的设计与实现基于SpringBoot的药店管理系统设计与实现(完整前后端代码+说明文档+LW,调试定制等)
  • 电荷流分析
  • LDO补偿方法学习
  • 基于深度学习的学生课堂行为检测系统(YOLOv8+YOLO数据集+UI界面+Python项目+模型)
  • 知识图谱(二)之doccano的使用
  • 别等被攻击才重视!不懂黑客技术也能下手,SQL 注入 + ARP 防护实操指南!
  • 【毕业设计】基于SpringBoot的药店销售管理系统设计与实现基于SpringBoot的药店管理系统设计与实现(源码+文档+远程调试,全bao定制等)
  • 知识图谱(三)之知识查询语言
  • 救命!挖到零基础转网安捷径!超详细建议 + 分步骤教学,从入门到精通不踩坑!
  • 【毕业设计】基于Java的小区旧衣物回收与捐赠系统设计与实现基于SpringBoot的社区旧衣物回收与捐赠系统设计与实现(源码+文档+远程调试,全bao定制等)
  • 学霸同款2026 10款一键生成论文工具测评:本科生毕业论文必备清单
  • 【课程设计/毕业设计】基于SpringBoot+Vue的西医药店药品管理系统的设计与实现基于SpringBoot的药店管理系统设计与实现【附源码、数据库、万字文档】
  • 无人机降噪技术及应用分析
  • 恐怖!不懂黑客技术也能发起攻击?SQL 注入 / ARP 防护等关键防御必学!
  • 2026大模型完全指南:从入门到实战,程序员必备AI学习资源包
  • 收藏!AI小白程序员必看:构建可靠AI Agent系统全指南+大模型学习路线