当前位置: 首页 > news >正文

流式输出(Streaming)原理与踩坑经验

本人在日常开发中,遇到流式输出相关的问题,一般都需要靠大模型协助定位问题,归其根本是因为我对流式输出的原理认识不足。所以本篇文章记录我学习流式输出的原理,以及在实际开发中遇到的问题。

整体流程:

大模型生成 token
打包成 chunk(一个或多个 token)
SSE 协议封装(data: {...}\n\n)
FastAPI StreamingResponse 转发
前端 fetch + ReadableStream

有时候为了统一调用方式,还需要把非openai 的调用封装成统一格式。流式输出的本质,是看清大模型生成的数据如何被’切块’和’封装’,像流水线一样在网络中流转,最终在屏幕上实时解析、逐字显现,形成我们看到的连贯对话。下面从底层协议开始,逐个环节拆开讲。

流式输出的原理

流式输出(Streaming Output)指数据不是一次性全部返回,而是像水流一样逐段、逐部分传输到接收端。大模型生成内容时,通常按"token-by-token"方式逐步生成,每生成一部分就可以立即发送给客户端,而非等待整个内容生成完成。这种方式通常依赖持久化 HTTP 连接或Server-Sent Events (SSE) 协议,服务端在生成每个文本块(chunk)后立即推送,客户端可实时处理和展示,适用于长文本生成、实时交互场景。

token 与 chunk 的区别:token 是模型生成的最小单位,chunk 是网络传输的最小单位。模型逐个 token 生成,但服务端会把若干 token 攒成一个 chunk 再发送,减少网络请求次数。

模型逐个生成 token:
[你] [好] [世] [界] [,] [今] [天]
服务端攒成 chunk 发送(每次发送的 token 数量不固定):
┌─────────┐ ┌──────┐ ┌──────────────┐
│ 你 好 世 │ → │ 界 ,│ → │ 今 天 │ → ...
└─────────┘ └──────┘ └──────────────┘
chunk 1 chunk 2 chunk 3
(3个token) (2个token) (2个token)

一个 chunk 可能包含 1 个 token,也可能包含多个 token,取决于服务端的缓冲策略。

SSE 协议详解

上面讲的 chunk 要在网络上传输,需要一套协议来承载,这就是 SSE。流式输出的底层协议一般是 SSE(Server-Sent Events),它是 W3C 标准,基于 HTTP 长连接实现服务端推送。

SSE 数据格式标准

SSE 的消息由若干行组成,每行以key: value的格式组织,支持的字段:

字段含义示例
data数据内容(可多行拼接)data: {"text": "你好"}
event事件类型,默认为messageevent: error
id事件 ID,用于断线重连id: 12345
retry重连间隔(毫秒)retry: 3000

消息之间用空行\n\n分隔。标准 SSE 格式示例:

data: {"choices":[{"delta":{"content":"你好"}}]}
data: {"choices":[{"delta":{"content":",世"}}]}
data: [DONE]

最后一行data: [DONE]是流结束标记,但要注意某些非标准平台可能不发这个标记,或者把它拼到前一条数据的末尾。


核心 1:大模型 API 的流式输出格式详解

要理解整条链路,首先要搞清楚"大模型到底返回了什么"。

这部分以 OpenAI 格式为主线讲解,原因有三:GPT-3.5/4 是最早大规模商用的 LLM,其 API 格式被开发者最先熟悉;LangChain、LlamaIndex 等主流框架默认以 OpenAI 格式设计,生态绑定深;后来的云厂商(阿里通义、火山引擎、DeepSeek、智谱)为降低开发者迁移成本,主动提供 OpenAI 兼容接口,进一步巩固了其事实标准的地位。OpenAI 格式本身没有技术上不可替代的优势,只是市场选择了它作为"最大公约数"。

OpenAI 标准格式

当设置stream=True调用 OpenAI 兼容 API 时,服务端返回的是 SSE 格式的流,每个 chunk 被包在data: ...\n\n里,data:后面是一个 JSON 对象:

data: {"choices":[{"delta":{"content":"你好"},"index":0,"finish_reason":null}],"id":"chatcmpl-xxx","object":"chat.completion.chunk"}
data: {"choices":[{"delta":{"content":",世"},"index":0,"finish_reason":null}],"id":"chatcmpl-xxx","object":"chat.completion.chunk"}
data: [DONE]

其中每个 JSON 的结构如下:

{
"choices": [
{
"delta": {"content": "你好"},
"index": 0,
"finish_reason": null
}
],
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk"
}
  • delta.content:当前 chunk 的增量文本。第一个 chunk 的delta通常不带content,而是带role: "assistant",后续 chunk 才有content。前端或后端拼接文本时如果直接取delta.content,第一个 chunk 拿到空值是正常的,不要因此跳过。
  • finish_reason:最后一个 chunk 为"stop"(模型自然结束)或"length"(max_tokens 截断),其余为null。排查问题时注意区分这两种情况:stop说明模型正常输出完,length说明输出被截断了。

非标准格式的常见差异点

有些平台的 SSE 格式不严格遵循 OpenAI 规范,常见的坑:

  1. data:后面的空格差异

    • 标准:data: {"choices":...}(有空格)
    • 某些平台:data:{"choices":...}(无空格)
    • 解析时若按if line.startswith("data: ")判断,无空格的格式会被跳过
  2. [DONE]标记位置

    • OpenAI:单独一行data: [DONE]
    • 某些平台:可能放在最后一条数据的末尾,或根本不发
  3. chunk 内字段结构不同

    • 非 OpenAI 平台可能使用text而非delta.content,或直接返回纯文本

打印原始 SSE 数据进行对比

import requests
response = requests.post(url, json=payload, stream=True)
for line in response.iter_lines():
if line:
print(repr(line)) # 打印原始字节
  • repr() 是 Python 内置函数,返回对象的精确字符串表示,会把不可见字符(换行、空格等)显示出来

这是我调试流式问题时最常用的手段,直接看原始数据比猜结构要快得多。


核心 2:LangChain 的流式输出原理

LangChain 的ChatOpenAI内部调用 OpenAI API,然后把原始 JSON chunk 转成AIMessageChunk对象,每次 yield 对应 OpenAI 返回的一个 chunk。理解这层映射关系,排查问题时才能知道 LangChain 返回的字段对应原始数据的哪个位置。

AIMessageChunk 的结构

在用 LangChain 处理流式输出时,每个 chunk 是AIMessageChunk对象。它的核心字段:

chunk.content # 文本内容(str),大多数场景直接取这个
chunk.tool_calls # 工具调用信息(list),模型调用 function calling 时用
chunk.additional_kwargs # 额外元信息(dict),如部分平台的特殊字段

一般场景下只用.content就够了。如果模型同时返回工具调用(function calling),就需要处理tool_calls。另外,现在很多模型的输出会将思考内容与非思考内容分开,思考内容存放在additional_kwargs中。

OpenAI 原始 chunk 与 AIMessageChunk 的映射关系:

OpenAI 原始 chunkLangChainAIMessageChunk
文本内容choices[0].delta.content.content
工具调用choices[0].delta.tool_calls.tool_calls(已解析为结构化对象)
额外字段JSON 顶层字段统一放入.additional_kwargs(如思考内容)
结束标记finish_reason: "stop"流结束后单独处理,chunk 本身无标记
跨平台兼容只适用于 OpenAI 格式统一接口,切换模型不改业务代码

AIMessageChunk支持__add__累加,可以用sum_chunk += chunk把所有 chunk 累加得到完整的AIMessage

统一模型调用接口

项目中同时依赖多个大模型平台时,需要统一调用接口,业务代码只写一套,切换模型改配置就行。根据平台是否提供 OpenAI 兼容接口,有两种方案:

方案一:平台提供 OpenAI 兼容接口(推荐)

直接替换base_url即可,无需额外封装:

from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="your-key",
streaming=True
)

很多云厂商(阿里云、火山引擎、DeepSeek)都提供 OpenAI 兼容接口,能直接这样用。

方案二:平台不提供兼容接口

需要封装自定义 LangChain LLM 类,本质是格式转换——把平台返回的自定义 chunk 格式转为 LangChain 的GenerationChunk

from langchain_core.language_models.llms import BaseLLM
from langchain_core.outputs import GenerationChunk
import httpx
class MyCustomLLM(BaseLLM):
async def _astream(self, prompt, **kwargs):
# 1. 调用平台的流式接口(异步调用防阻塞)
async with httpx.AsyncClient() as client:
async with client.stream("POST", api_url, json={"prompt": prompt}) as resp:
async for line in resp.aiter_lines():
if line:
# 2. 解析平台自定义格式,提取文本
chunk_text = self._parse_custom_format(line)
# 3. 转为 LangChain 标准格式
yield GenerationChunk(text=chunk_text)
async def _agenerate(self, prompt, **kwargs):
# 非流式走这里,可以聚合流式结果
return self._astream(prompt, **kwargs)

关键就是三步:调平台接口 -> 解析自定义格式 -> yield 标准GenerationChunk

解析流式输出的 AIMessageChunk 数据

LangChain 的流式调用非常简洁,不需要手动解析 SSE 格式:

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
model = ChatOpenAI(model="gpt-4o-mini", streaming=True)
messages = [HumanMessage(content="你好,介绍一下自己")]
# 同步流式
for chunk in model.stream(messages):
print(chunk.content, end="", flush=True)
# 异步流式(FastAPI 等异步框架中使用)
async for chunk in model.astream(messages):
print(chunk.content, end="", flush=True)

每个chunkAIMessageChunk对象,.content是本次增量文本。如果需要拿到完整响应,可以累加:

full_response = ""
async for chunk in model.astream(messages):
full_response += chunk.content
print(full_response)

核心 3:FastAPI 服务的流式输出原理

FastAPI 是一个基于 Python 的 Web 框架,底层依赖Starlette(一个轻量级 ASGI 框架)。简单理解:FastAPI = Starlette + 参数校验 + API 文档生成。StreamingResponse 实际上是 Starlette 提供的,FastAPI 直接拿过来用。

基本实现

from fastapi.responses import StreamingResponse
async def generate():
async for chunk in model.astream(messages):
yield f"data: {chunk.json()}\n\n" # 注意这里需要手动封装成SSE格式
@app.post("/chat")
async def chat():
return StreamingResponse(generate(), media_type="text/event-stream")

StreamingResponse 的完整流转过程(简单理解一下就行)

为了理解 StreamingResponse 底层发生了什么,我们从头到尾看一条请求的完整路径:

客户端发起 POST /chat
Uvicorn(ASGI 服务器) 接收 TCP 连接,解析 HTTP 请求
http://www.jsqmd.com/news/1086975/

相关文章:

  • VSCODE下verilog-format插件配置全攻略:从零到优雅排版
  • 5个实用技巧让EhViewer漫画阅读体验全面升级
  • macOS NVIDIA显卡驱动终极指南:一键安装与智能管理全解析
  • 世界杯一粒进球被吹掉,背后可能有多少 AI?
  • 如何解决AMD Ryzen硬件调试中的5大难题:高级工具实战指南
  • Translumo:Windows平台终极实时屏幕翻译神器,3分钟开启无障碍游戏体验
  • 分布式系统故障排查自动化实践与DrP平台解析
  • Radeon Software Slimmer:重构AMD显卡驱动的智能精简革新
  • Keccak哈希引擎的轻量级统一架构与容错设计
  • 终极PT站一键转载神器:告别繁琐操作,3分钟快速上手
  • 如何用项目经验打动Java面试官
  • 2026年揭秘!市面上热门的伺服电力测功机工厂口碑究竟如何?
  • 离线漫画收藏的艺术:picacomic-downloader如何重新定义你的数字阅读体验
  • Appium Android自动化测试环境搭建:从原理到实战的完整指南
  • 3个方法有效解决Windows窗口尺寸锁定问题:WindowResizer让你重新掌控屏幕布局
  • RH850/U2C评估板原理图深度解析:从电源设计到调试实战
  • 3分钟颠覆你的聊天记忆管理:让微信对话成为永久数字资产
  • 如何高效使用ACOLITE大气校正工具:完整实战指南
  • 5分钟免费绕过iPhone激活锁:applera1n实用指南
  • WebAssembly AI 推理插件——让浏览器跑起轻量模型的工程方案
  • ChatGPT中文版即将迎来重大更新?内部信源证实:Qwen-ChatGPT双引擎融合计划启动(首批接入试点单位仅剩3个名额)
  • C语言学习笔记20260628:字符串子串查找的三种解法
  • 3步搞定HS2-HF Patch安装:解锁HoneySelect2完整游戏体验的终极指南
  • Playnite游戏库管理器:跨平台游戏统一管理的终极解决方案
  • BetterNCM安装器:让你的网易云音乐秒变智能播放器
  • 3分钟免费AI视频生成:零基础打造专业数字内容
  • SHA-3/SHAKE统一架构设计与容错优化
  • 抖音无水印下载终极指南:5步轻松获取高清视频的完整教程
  • CookieCloud与Playwright集成:实现自动化测试登录态持久化
  • MagicSkin触觉传感器:半透明标记设计实现高精度力与纹理感知