当前位置: 首页 > news >正文

《AI大模型应用开发实战从入门到精通共60篇》029、流式输出:实现类ChatGPT的逐字回复效果

029 流式输出:实现类ChatGPT的逐字回复效果

从一次线上事故说起

去年冬天凌晨两点,我被值班电话吵醒。用户反馈我们刚上线的AI助手“卡住了”——点击发送后,页面白屏整整30秒,然后突然吐出一整段文字。更糟的是,如果网络抖动,用户会看到半个句子就断掉,刷新后对话记录里只剩残缺的JSON。

排查后发现,问题出在API调用方式上。我们当时用的是最朴素的同步请求:response = requests.post(url, json=payload),等模型推理完所有token才返回。对于GPT-3.5-turbo这种模型,生成500个token大约需要5-8秒,用户就得盯着旋转菊花干等。更致命的是,如果后端超时设置是10秒,生成到一半就被nginx截断了。

那晚我改了三行代码,把stream=True加上,问题解决了80%。剩下的20%,是前端逐字渲染、中断恢复、错误处理这些坑。今天这篇笔记,就把这些坑的坐标和填坑方法写清楚。

流式输出的核心逻辑

大模型API的流式输出,本质上是HTTP长连接下的Server-Sent Events(SSE)。模型每生成一个token,就通过chunked transfer encoding推送到客户端。客户端收到后立即渲染,而不是等全部生成完。

以OpenAI API为例,关键参数就一个:

# 别这样写:response = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages)# 这样写才是流式:response=openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=messages,stream=True# 这里踩过坑:忘记加这个参数,前端等得骂娘)

返回的response对象不再是完整的JSON,而是一个迭代器。每个chunk长这样:

{"choices":[{"delta":{"content":"你"},"index":0}]}{"choices":[{"delta":{"content":"好"},"index":0}]}{"choices":[{"delta":{"content":"!"},"index":0}]}

注意看,每个chunk只包含一个token的增量(delta),而不是完整句子。最后一个chunk会包含"finish_reason":"stop"

后端实现:别用同步循环

很多新手会这样写:

# 反面教材,别这样写defchat_stream(messages):response=openai.ChatCompletion.create(stream=True,...)full_text=""forchunkinresponse:token=chunk["choices"][0]["delta"].get("content","")full_text+=tokenreturnfull_text# 等全部生成完才返回,流式了个寂寞

正确的做法是使用Python生成器(generator),配合Flask或FastAPI的StreamingResponse:

# 正确的流式后端(FastAPI示例)fromfastapiimportFastAPIfromfastapi.responsesimportStreamingResponseimportopenai app=FastAPI()defgenerate_stream(messages):response=openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=messages,stream=True,temperature=0.7# 这里踩过坑:temperature和stream不冲突,但max_tokens要设大一点)forchunkinresponse:# 注意:chunk可能没有delta字段(比如第一个chunk是角色信息)delta=chunk["choices"][0].get("delta",{})token=delta.get("content","")iftoken:yieldf"data:{token}\n\n"# SSE格式要求:data: 内容\n\n# 检查是否结束ifchunk["choices"][0].get("finish_reason"):yield"data: [DONE]\n\n"# 结束标记,前端靠这个判断break@app.post("/chat")asyncdefchat(messages:list):returnStreamingResponse(generate_stream(messages),media_type="text/event-stream"# 别写错成application/json)

关键细节

  • SSE协议要求每条消息以data:开头,双换行结尾
  • 结束标记[DONE]是OpenAI的约定,其他模型可能不同
  • 生成器函数里不要用try...except吞掉异常,否则前端永远收不到错误信号

前端实现:从EventSource到fetch的进化

早期我直接用EventSource,但发现它不支持POST请求(只能GET),而且无法自定义headers。对于需要传API Key的场景,必须用fetch+ReadableStream

// 前端流式接收(React示例)asyncfunctionfetchStream(messages){constresponse=awaitfetch('/api/chat',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({messages})});// 这里踩过坑:忘记检查response.ok,网络错误时直接崩溃if(!response.ok){thrownewError(`HTTP error! status:${response.status}`);}constreader=response.body.getReader();constdecoder=newTextDecoder();letbuffer='';while(true){const{done,value}=awaitreader.read();if(done)break;buffer+=decoder.decode(value,{stream:true});// SSE消息可能被拆分成多个chunk,需要按\n\n分割constlines=buffer.split('\n\n');buffer=lines.pop()||'';// 最后一个不完整,留到下次for(constlineoflines){if(line.startsWith('data: ')){constdata=line.slice(6);if(data==='[DONE]'){// 流结束,清理工作return;}// 这里直接追加到显示区域appendToken(data);}}}}

血泪教训

  1. TextDecoderstream: true参数必须加,否则中文会乱码(UTF-8多字节字符被拆分时)
  2. buffer处理不能少,TCP分包可能导致一条SSE消息被切成两半
  3. 不要用JSON.parse解析每个chunk,因为SSE的data字段就是纯文本token,不是JSON

中断恢复:用户点击停止怎么办

用户等得不耐烦点了“停止”,前端需要发送中断信号。后端收到后要立即停止模型推理,而不是等它自然结束。

# 后端中断处理(使用asyncio)importasyncioasyncdefgenerate_stream_with_cancel(messages,cancel_event):response=openai.ChatCompletion.create(stream=True,...)forchunkinresponse:ifcancel_event.is_set():# 检查中断信号# 这里踩过坑:直接break会导致openai连接泄漏response.close()# 显式关闭连接breakyieldchunk

前端通过AbortController发送中断:

constcontroller=newAbortController();// 点击停止按钮时functionstopGeneration(){controller.abort();}// fetch时传入signalconstresponse=awaitfetch('/api/chat',{signal:controller.signal,// ...其他参数});

注意:AbortController中断的是HTTP连接,后端需要配合cancel_event才能真正停止模型推理。否则模型会在后台继续生成直到用完max_tokens,浪费算力。

错误处理:流中断了怎么办

网络不稳定时,流可能在中途断开。用户看到一半的回复,需要能恢复。

我的方案是:后端每生成一个token,就写入Redis缓存(key=session_id+message_id,value=已生成的文本)。前端检测到连接断开后,自动重连,并带上last_token_index参数,后端从断点处继续生成。

# 断点续传逻辑defgenerate_with_resume(messages,session_id,message_id,last_index=0):# 先从缓存读取已生成的内容cache_key=f"stream:{session_id}:{message_id}"cached=redis.get(cache_key)or""# 跳过已生成的部分response=openai.ChatCompletion.create(stream=True,...)fori,chunkinenumerate(response):ifi<last_index:continuetoken=chunk["choices"][0]["delta"].get("content","")# 写入缓存redis.append(cache_key,token)redis.expire(cache_key,3600)# 1小时过期yieldtoken

这个方案有个坑:如果用户连续点击“停止-重试”,Redis里会积累多个不完整的缓存。需要加一个版本号,每次重试递增,旧版本自动失效。

性能优化:别让流式变成卡顿

流式输出的核心体验是“逐字显示”,但如果每个token都触发一次DOM更新,浏览器会卡死。需要做节流(throttle)。

// 节流渲染,每50ms更新一次DOMletrenderTimer=null;letpendingTokens=[];functionappendToken(token){pendingTokens.push(token);if(!renderTimer){renderTimer=setTimeout(()=>{consttext=pendingTokens.join('');updateDisplay(text);pendingTokens=[];renderTimer=null;},50);}}

对于长文本,还可以考虑虚拟滚动,只渲染可视区域内的内容。不过大多数对话场景下,50ms的节流已经足够流畅。

个人经验总结

  1. 流式输出不是银弹。如果模型推理速度极慢(比如本地跑7B模型),流式输出只会让用户看到“一个字等两秒”的糟糕体验。这时候应该先优化推理速度,再考虑流式。

  2. SSE vs WebSocket。很多人问我为什么不用WebSocket。我的回答是:对于“服务器单向推送”的场景,SSE更轻量,浏览器原生支持,不需要额外库。WebSocket适合双向实时通信,比如协同编辑、游戏。别杀鸡用牛刀。

  3. 测试要覆盖边界。写单元测试时,一定要模拟网络中断、chunk拆分、中文乱码这些场景。我见过太多项目上线后才发现“用户说日语时流式输出会卡住”,因为日语UTF-8编码更长,更容易被拆分。

  4. 监控流式质量。在日志里记录每个流的持续时间、token数、中断次数。如果某个用户的流频繁中断,可能是他网络差,也可能是你的服务器扛不住了。用这些数据指导扩容。

  5. 最后一条,也是最重要的:流式输出改变了用户对“等待”的感知。即使总耗时一样,逐字显示也比白屏等待感觉快得多。这是心理学,不是技术。但技术是实现心理学的基础。

下次遇到用户抱怨“AI回复太慢”,先检查stream=True有没有写对。这个坑,我替你踩过了。

http://www.jsqmd.com/news/718542/

相关文章:

  • 人生如戏-让短板变成优势-让长处变得更赚钱
  • ELN 升级:π 级数自动生成器全域数理架构
  • 2026年5月国内十大GEO厂商和服务商和公司综合竞争力全景扫描 - 速递信息
  • PyWxDump技术剖析:数据解密工具的合规边界与安全启示
  • 2026年免费降AI率工具实测:多款降AI工具对比,哪款效果最佳? - 降AI实验室
  • 桂林瓷砖空鼓修复全百科:工艺、材料与靠谱服务商指引 - 奔跑123
  • 超白熊保暖材料常见问题解答(2026最新专家版) - 速递信息
  • 多模态大模型评估:挑战、框架与实战策略
  • 抖音下载终极指南:5分钟搞定无水印批量采集的免费神器
  • 网络安全学习第97天
  • 2026年全国对讲机十大优选品牌:工业/户外/商用场景采购推荐指南 - 速递信息
  • 培训机构可以包就业的真相来了
  • 终极指南:5分钟掌握KMS智能激活工具,永久告别Windows和Office激活烦恼
  • Java向量化编程进阶必修课(硬件加速失效的7个隐性陷阱全曝光)
  • 数字孪生数控螺旋槽铣床状态监测与故障诊断【附代码】
  • 你用一个正确的方式做扭曲的市场-只会失败
  • 外卖有什么新出的奶茶好喝?上美团外卖必点榜一键get当季爆款 - 资讯焦点
  • 碰见事儿-千万不要用自己的逻辑去思考-反思-容易内耗
  • 桂林防水补漏公司选购指南:资质工艺售后全维度解析 - 奔跑123
  • 五月全新升级!2026GEO 优化服务商 TOP5 实力排名,多维度专业深度分析 - 速递信息
  • 你在做商业-但其实还在打工-还在赚辛苦钱的原因是什么
  • 《AI大模型应用开发实战从入门到精通共60篇》030、Function Calling:让大模型调用外部函数与数据库
  • **发散创新:用Julia实现高性能科学计算的矩阵分解实战与优化技巧**在现代科学计算领域,**高效、简洁且
  • SpringBoot 消息顺序性保证:分区与顺序消费
  • 屁股决定脑袋-不同的视角看到的落地是不同的
  • 2026年家用呼吸机怎么选?这三点教你避坑找专业 - 天涯视角
  • 一分钟看懂!塑料管浮子流量计生产厂家怎么选?(附TOP3名单) - 品牌推荐大师
  • 你真的理解盈利这个事儿么
  • 青岛婚纱摄影排名:拍摄婚纱照定制、透明与品质的决策时代 - charlieruizvin
  • 多模态AI与哈密顿力学的融合:Akasha 2架构解析