当前位置: 首页 > news >正文

AI聊天助手的SSE流式输出实现过程

SSE流式输出的实现过程

后端处理

在创建流式会话时,我们要对这个请求设置好SSE所需要的请求头,然后再创建会话,返回会话ID,紧接着就把会话ID传给前端,让前端绑定这个会话,开始准备流式输出

controller层处理新建会话请求

如果是对已存在的会话就不需要新建,从请求体里获取会话ID进行绑定

funcCreateStreamSessionAndSendMessage(c*gin.Context){req:=new(CreateSessionAndSendMessageRequest)userName:=c.GetString("userName")// From JWT middlewareiferr:=c.ShouldBindJSON(req);err!=nil{c.JSON(http.StatusOK,gin.H{"error":"Invalid parameters"})return}// 设置SSE头c.Header("Content-Type","text/event-stream")// SSE协议的标识,告诉客户端这个是流式输出c.Header("Cache-Control","no-cache, no-transform")// 让浏览器不缓存响应,每次都获取新的内容,禁止代理/CDN对响应内容做转换c.Header("Connection","keep-alive")// 保持长连接c.Header("Access-Control-Allow-Origin","*")// 允许跨域c.Header("X-Accel-Buffering","no")// 禁止Nginx的缓存区,如果不设置这个请求头就会出现响应数据被堆积随后再发送给前端的情况,设置为no的话就能让数据实时透传c.Header("Content-Encoding","identity")// 禁止对响应内容进行编码或者压缩,没有这个也会导致消息被堆积// 先创建会话并立即把 sessionId 下发给前端,随后再开始流式输出sessionID,code_:=session.CreateStreamSessionOnly(userName,req.UserQuestion)ifcode_!=code.CodeSuccess{c.SSEvent("error",gin.H{"message":"Failed to create session"})return}// 先把 sessionId 通过 data 事件发送给前端,前端据此绑定当前会话,侧边栏即可出现新标签c.Writer.WriteString(fmt.Sprintf("data: {\"sessionId\": \"%s\"}\n\n",sessionID))c.Writer.Flush()// 然后开始把本次回答进行流式发送(包含最后的 [DONE])code_=session.StreamMessageToExistingSession(userName,sessionID,req.UserQuestion,req.ModelType,http.ResponseWriter(c.Writer))ifcode_!=code.CodeSuccess{c.SSEvent("error",gin.H{"message":"Failed to send message"})return}}
service层处理逻辑

利用Flush把服务端响应的数据直接推送给客户端,不需要等待缓冲区满了才推送。

funcStreamMessageToExistingSession(userNamestring,sessionIDstring,userQuestionstring,modelTypestring,writer http.ResponseWriter)code.Code{log.Printf("[Service] StreamMessageToExistingSession Start. User=%s, Session=%s, Model=%s",userName,sessionID,modelType)// 确保writer支持Flushflusher,ok:=writer.(http.Flusher)// 类型断言if!ok{log.Println("不支持Flush")returncode.CodeServerBusy}//2:获取AIHelper并通过其管理消息manager:=aihelper.GetGlobalManager()config:=map[string]interface{}{"apiKey":"your-api-key",// TODO: 从配置中获取"username":userName,// 用于 RAG 模型获取用户文档}log.Println("[Service] Getting AIHelper...")helper,err:=manager.GetOrCreateAIHelper(userName,sessionID,modelType,config)iferr!=nil{log.Println("StreamMessageToExistingSession GetOrCreateAIHelper error:",err)returncode.AIModelFail}log.Println("[Service] AIHelper Obtained. Starting StreamResponse...")// 定义callback函数来实时flush推送消息cb:=func(msgstring){log.Printf("[SSE] Sending chunk: %s (len=%d)\n",msg,len(msg))payload,er:=json.Marshal(map[string]string){"type":"delta","content":msg,}iferr!=nil{log.Println("[SSE] Marshal error:",err)return}_,err=writer.Write([]byte("data: "+string(payload)+"\n\n"))iferr!=nil{log.Println("[SSE] Write error:",err)return}flusher.Flush()log.Println("[SSE] Flushed")}_,err_:=helper.StreamResponse(userName,ctx,cb,userQuestion)iferr_!=nil{log.Println("StreamMessageToExistingSession StreamResponse error:",err_)returncode.AIModelFail}_,err=writer.Write([]byte("data: [DONE]\n\n"))iferr!=nil{log.Println("StreamMessageToExistingSession write DONE error:",err)returncode.AIModelFail}flusher.Flush()returncode.CodeSuccess}

这里manager和aihelper的执行逻辑就不展示了,这篇blog的目的是要了解流式传输的逻辑。每次生成响应的时候回调cb来将AI生成的消息实时推送给客户端,流响应结束后会额外发一条data:[DONE],前端根据这个来判断此次回答是否结束。

前端处理

关键是通过fetch建立一个长连接,后面从response.body里流式读取数据,直到读到done后break

asyncfunctionhandleStreaming(question){// 占一个位置表示正在回答constaiMessage={role:'assistant',content:'',meta:{status:'streaming'}// mark streaming}constaiMessageIndex=currentMessages.value.length currentMessages.value.push(aiMessage)// 决定用哪个URLconstisDev=window.location.hostname==='localhost'||window.location.hostname==='127.0.0.1'constbackendBase=isDev?`http://${window.location.hostname}:9090/api/v1/AI`:'/api/AI'consturl=tempSession.value?`${backendBase}/chat/send-stream-new-session`:`${backendBase}/chat/send-stream`// 构建请求头请求体constheaders={'Content-Type':'application/json','Authorization':`Bearer${localStorage.getItem('token')||''}`}constbody=tempSession.value?{question:question,modelType:selectedModel.value}:{question:question,modelType:selectedModel.value,sessionId:currentSessionId.value}}try{// 创建 fetch 连接读取 SSE 流constresponse=awaitfetch(url,{method:'POST',headers,body:JSON.stringify(body)})if(!response.ok){loading.value=falsethrownewError('Network response was not ok')}constreader=response.body.getReader()constdecoder=newTextDecoder()letbuffer=''// 读取流数据// eslint-disable-next-line no-constant-conditionwhile(true){const{done,value}=awaitreader.read()if(done)breakconstchunk=decoder.decode(value,{stream:true})buffer+=chunk// 按行分割constlines=buffer.split('\n')buffer=lines.pop()||''// 保留未完成的行for(constlineoflines){constnormalizedLine=line.endsWith('\r')?line.slice(0,-1):lineif(!normalizedLine)continue// 处理 SSE 格式:data: <content>if(normalizedLine.startsWith('data:')){letdata=normalizedLine.slice(5)if(data.startsWith(' '))data=data.slice(1)console.log('[SSE] Received:',data)// 调试日志if(data==='[DONE]'){// 流结束console.log('[SSE] Stream done')loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'done'}currentMessages.value=[...currentMessages.value]}elseif(data.startsWith('{')){// 尝试解析 JSON(如 sessionId)try{constparsed=JSON.parse(data)if(parsed.sessionId){constnewSid=String(parsed.sessionId)console.log('[SSE] Session ID:',newSid)if(tempSession.value){lettitle=(question||'').trim()if(!title){title=`会话${newSid}`}elseif(title.length>30){title=`${title.slice(0,30)}...`}sessions.value[newSid]={id:newSid,name:title,messages:[...currentMessages.value]}currentSessionId.value=newSid tempSession.value=false}}elseif(parsed.type==='delta'&&typeofparsed.content==='string'){currentMessages.value[aiMessageIndex].content+=parsed.content}}catch(e){// 不是 JSON,当作普通文本处理currentMessages.value[aiMessageIndex].content+=dataconsole.log('[SSE] Content updated:',currentMessages.value[aiMessageIndex].content.length)}}else{// 普通文本数据,直接追加// 使用数组索引直接更新,强制 Vue 响应式系统检测变化currentMessages.value[aiMessageIndex].content+=dataconsole.log('[SSE] Content updated:',currentMessages.value[aiMessageIndex].content.length)}// 每收到一条数据就立即更新 DOM// 强制更新整个数组以触发响应式currentMessages.value=[...currentMessages.value]// 使用 requestAnimationFrame 强制浏览器重排awaitnewPromise(resolve=>{requestAnimationFrame(()=>{scrollToBottom()resolve()})})}}}// 流读取完成后的处理loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'done'}currentMessages.value=[...currentMessages.value]// 同步到 sessions 存储if(!tempSession.value&&currentSessionId.value&&sessions.value[currentSessionId.value]){constsessMsgs=sessions.value[currentSessionId.value].messagesif(Array.isArray(sessMsgs)&&sessMsgs.length){constlastIndex=sessMsgs.length-1if(sessMsgs[lastIndex]&&sessMsgs[lastIndex].role==='assistant'){sessMsgs[lastIndex].content=currentMessages.value[aiMessageIndex].content}}}}catch(err){console.error('Stream error:',err)loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'error'}currentMessages.value=[...currentMessages.value]ElMessage.error('流式传输出错')}}
http://www.jsqmd.com/news/354867/

相关文章:

  • 如何高效回收永辉超市购物卡?最全的变现指南来了! - 团团收购物卡回收
  • 互联网大厂Java面试实战:Spring Boot、微服务与Kafka在电商场景中的应用
  • 赶deadline必备!顶流之选的降AI率软件 —— 千笔·降AI率助手
  • 2026年度权威发布:最新库存管理系统厂家实力榜单与选型深度解析 - 十大品牌推荐
  • 2026年度库存管理系统厂家推荐榜单:技术适配与降本增效双维度综合评估。 - 十大品牌推荐
  • 2026年广州婚纱影楼公司口碑推荐榜/婚纱影楼优选,婚纱影楼找便宜的,婚纱影楼服务到位婚纱摄影,拍婚纱照 - 品牌策略师
  • 详细介绍:uni-app 原生 App 打包全攻略:Android/iOS 从配置到发布完整流程
  • 2026年度库存管理系统厂家推荐榜单:技术适配与降本增效双维度综合评估 - 十大品牌推荐
  • 2026年中国库存管理系统厂家发布:以网上管家婆为代表的标杆企业深度解析 - 十大品牌推荐
  • 揭秘永辉超市购物卡回收最优选择,教你如何快速变现! - 团团收购物卡回收
  • 《计算机网络》深入学:移动 IP 技术原理与应用
  • 告别付费困扰!免费PPT生成工具大盘点|高效办公必备 - 品牌测评鉴赏家
  • 2026年分析高性能桥梁PVC排水管价格及靠谱企业 - mypinpai
  • 分享一套优质的微信小程序校园志愿者系统(SpringBoot后端+Vue3管理端)
  • 告别繁琐排版!5款操作零门槛的AI PPT生成工具推荐 - 品牌测评鉴赏家
  • 口碑好的桥梁PVC排水管源头厂家有哪些 - 工业推荐榜
  • 2026年广州婚纱工作室公司口碑推荐:实力强婚纱工作室/稳定的婚纱工作室/声誉好的婚纱工作室婚纱影楼/拍婚纱照 - 品牌策略师
  • 手写mybatis
  • 新手PPT救星!5款零门槛生成工具实测:教育党职场人闭眼入,省时80% - 品牌测评鉴赏家
  • 分析2026年别墅门厂家十大排名,看哪家费用更合理 - 工业品牌热点
  • 分析法式装修设计费用多少钱,重庆哪家性价比高 - 工业品网
  • Java 中使用 Alibaba Fastjson 解析泛型类型 JsonResult<SysUserDTO> 的问题
  • 讲讲风机进风口价格合理的供应商,怎么选择有攻略 - mypinpai
  • 完整教程:基于python新闻数据分析可视化系统 Hadoop 新闻平台 爬虫 情感分析 舆情分析 可视化 Django框架 vue框架 机器学习 大数据毕业设计✅
  • 解读全国拉力试验机优质生产商,桌上型拉力试验机怎么收费 - 工业设备
  • 2026重庆装修公司费用对比,十二分装饰能免费上门量房设计吗 - 工业品牌热点
  • 分享ROHS2.0检测仪推荐厂家,高灵敏度产品盘点 - 工业品牌热点
  • 2025年-2026年全屋定制品牌推荐:基于长期稳定性评测,涵盖居家与办公场景痛点分析 - 十大品牌推荐
  • 2026年高新企业审计外部审计推荐,靠谱品牌供应商盘点 - 工业品网
  • 永辉超市购物卡回收变现攻略:快速兑换现金的方法! - 团团收购物卡回收