当前位置: 首页 > news >正文

LangChain - 流式传输(Streaming)

一、先搞懂一件事:流式 vs 普通调用

invoke 是"等模型把整段话说完,一次性返回给你"。问题是大模型生成慢,你要干等好几秒才看到结果,体验差。

**流式传输(stream)**就是改成"模型一边生成、一边把字往你这儿送",像打字机一样一个字一个字蹦出来,你立刻就能看到。

▎ 比喻:invoke = 等整桌菜做好才端上;stream = 火锅边涮边吃。

用法就是把 invoke 换成 stream,然后在循环里一段段接收:

forchunkinagent.stream({"messages":[...]},...):# chunk 就是模型刚吐出的一小段处理(chunk)

二、核心难点:stream_mode 是什么?

stream_mode 不是"开不开流式",而是"你想看哪种类型的实时信息"。

一个智能体运行时,同时有好几种信息在产生:

  • 进度信息:现在跑到第几步了?调工具了吗?工具返回啥了?
  • 文字信息:模型正在一个字一个字地说什么?
  • 自定义信息:工具干活时想主动汇报点进度(“已查到 10 条”)

你不可能一次全看,所以让你选要看哪种。这就是 stream_mode。

页面给了三种模式:

stream_mode通俗名字你能看到什么
"updates"看进度每一步结束后的“小结”:模型要调工具了 / 工具返回了 / 模型最终回答了
"messages"看吐字模型正在一个字一个字生成的过程(连工具参数都是一点点拼出来的)
"custom"看自定义你自己在代码里用 writer 主动发的任意消息

三、三种模式逐个用大白话讲

模式 1:“updates” —— 看"步骤进度"

就像看一个任务清单:智能体每完成一步,就给你发一条"这步干完了,结果是xxx"。

比如问"旧金山天气",智能体调了一次天气工具,你会收到三条进度:

step: model → 模型说:我要调 get_weather 工具
step: tools → 工具说:旧金山天气是晴的
step: model → 模型说:最终回答——旧金山天气晴朗

代码:

forchunkinagent.stream({"messages":[{"role":"user","content":"What is the weather in SF?"}]},stream_mode="updates",version="v2",# 推荐用 v2,输出格式统一):ifchunk["type"]=="updates":forstep,datainchunk["data"].items():print(f"step:{step}")

▎ 特点:粒度粗,每步一条,不是逐字。适合"我想知道现在进行到哪了"。

模式 2:“messages” —— 看"逐字吐出"

这就是真正的打字机效果。模型生成的每一个字(token)都立刻送给你,连工具调用的参数都是一块块拼出来的(先 ‘{"’,再 ‘city’,再 ‘“:”’,再 ‘San’…)。

代码:

forchunkinagent.stream({"messages":[{"role":"user","content":"What is the weather in SF?"}]},stream_mode="messages",version="v2",):ifchunk["type"]=="messages":token,metadata=chunk["data"]# token 是那一小段,metadata 是来源信息print(token.content_blocks)

▎ 特点:粒度细,逐字逐块。适合"我想让用户看到字一个个蹦出来"。

模式 3:“custom” —— 看"你自己发的消息"

前两种是系统自动产生的信息,custom 是你自己想发什么就发什么。在工具里用 get_stream_writer() 主动喊话:

fromlanggraph.configimportget_stream_writerdefget_weather(city:str)->str:"""Get weather for a given city."""writer=get_stream_writer()writer(f"开始查{city}的天气...")# 你主动发的进度writer(f"查到了{city}的数据!")returnf"It's always sunny in{city}!"外面接收:forchunkinagent.stream(...,stream_mode="custom",version="v2"):ifchunk["type"]=="custom":print(chunk["data"])# 就会打印你 writer() 发的那些话

▎ 特点:完全自定义。适合"工具干耗时的活,想给用户报进度"。

▎ 注意:用了 get_stream_writer 的工具,不能脱离 LangGraph 执行环境单独调用。


四、能同时开多个模式吗?

能。stream_mode=[“updates”, “custom”] 这种传列表就行。

但记住:不是每种模式各把全部结果吐一遍,而是几种模式的事件按时间顺序"交织"着来,每个事件只出现一次。

就像一条河里混着三种颜色的鱼(updates、custom、messages),它们按游过来的先后顺序一条条到你面前,你用鱼的颜色(chunk[“type”])区分是哪种:

forchunkinagent.stream(...,stream_mode=["updates","custom"],version="v2"):ifchunk["type"]=="updates":...# 处理进度elifchunk["type"]=="custom":...# 处理自定义

实际输出顺序(页面示例)就是交替的:
updates(模型要调工具)→ custom(开始查)→ custom(查到了)→ updates(工具完成)→ updates(最终回答)


五、每个块长啥样?看 type 区分

不管哪种模式,v2 格式下每个块都是这种结构:

{"type":"updates""messages""custom",# 这块是哪种模式来的"data":...# 具体内容}

所以循环里永远先看 chunk[“type”],判断这是哪类信息,再去取 chunk[“data”]。这是看懂流式代码的万能套路。


六、几个进阶用法(知道有这些就行)

  1. 看"模型的思考过程"

有些模型(如 Claude)回答前会先"思考"。用 stream_mode=“messages”,然后从内容块里挑出 type==“reasoning” 的就是思考过程,type==“text” 的才是正式回答:

fortoken,metadatainagent.stream(...,stream_mode="messages"):reasoning=[bforbintoken.content_blocksifb["type"]=="reasoning"]text=[bforbintoken.content_blocksifb["type"]=="text"]ifreasoning:print(f"[思考]{reasoning[0]['reasoning']}")iftext:print(text[0]["text"])

▎ 不管 Anthropic 的 thinking 还是 OpenAI 的 reasoning,LangChain 都统一成 “reasoning” 块,换厂商代码不用改。

  1. 既看逐字、又看完整的工具调用

messages 给你的是"半个半个的 JSON 片段",你想看"拼好的完整工具调用"就再配上 updates:

stream_mode=[“messages”, “updates”]
messages 看逐字片段,updates 看拼好的完整结果

  1. 多智能体时区分"谁在说话"

给每个智能体起 name,流式时开 subgraphs=True,用 metadata[“lc_agent_name”] 就知道当前是哪个智能体在吐字:

weather_agent=create_agent(...,name="weather_agent")supervisor=create_agent(...,name="supervisor",tools=[call_weather_agent])forchunkinsupervisor.stream(...,stream_mode=["messages","updates"],subgraphs=True):ifagent_name:=metadata.get("lc_agent_name"):
  1. 人工审批中断

某些工具调用前先暂停问人(approve/edit),用 HumanInTheLoopMiddleware + checkpointer。流式时收集中断 → 给决策 → 用 Command(resume=decisions) 恢复:

agent=create_agent(...,middleware=[HumanInTheLoopMiddleware(interrupt_on={"get_weather":True})],checkpointer=...)# 第一次流式:会中断,收集 interrupts# 给出决策后,用 Command 恢复:agent.stream(Command(resume=decisions),config=config,...)

七、两个小开关

  • version=“v2”:强烈推荐。让所有模式的输出格式统一成 {type, data},不用再分情况解包。需要 LangGraph ≥ 1.1。
  • 禁用流式:某模型不想流式,设 streaming=False(或 disable_streaming=True)。多智能体里可用来控制"只让某个智能体流式"。

八、一句话总结

▎ 流式传输 = 把 invoke 换成 stream,让结果逐段实时送出。stream_mode 决定"看哪种信息":updates 看步骤进度、messages 看逐字吐出、custom 看自己发的消息。
▎可多选,事件交织着来,每个块靠 chunk[“type”] 区分。


九、看懂流式代码的万能三步

不管页面里哪段代码,都套这个套路:

  1. 看 stream_mode:他开了哪些模式?
  2. 循环里看 chunk[“type”]:这块是哪个模式来的?
  3. 取 chunk[“data”]:这个模式的实际内容是什么?

记住这三步,所有例子都能看懂。


十、和前面学的串起来

  • 流式拿到的"字"就是 AIMessageChunk(消息页讲过),可以累加拼成完整 AIMessage。
  • content_blocks 把各厂商的思考/推理统一成 “reasoning” 块(消息页讲过)。
  • get_stream_writer() 就是工具页讲的 runtime.stream_writer 的全局版本。
  • 人工审批用的 checkpointer + thread_id 是短期记忆页讲的那套。
http://www.jsqmd.com/news/1109036/

相关文章:

  • STM32与IS31FL3731 LED驱动芯片应用指南
  • 基于KMR221与STM32F334R8的高精度电压监测系统设计
  • 零代码是什么?零代码应用平台能干什么?
  • 多工位扫码组网优化方案:XT6202-2 系列多收发器无线扫码枪数据分发技术研究
  • 【JAVA毕设源码分享】基于springboot公园综合服务系统设计与实现小程序的设计与实现(程序+文档+代码讲解+一条龙定制)
  • TranslucentTB:用透明任务栏解锁Windows桌面的无限可能
  • AIMP工具安装教程(附安装包)AIMP音频播放环境配置图文教程
  • 3分钟视频转PPT:智能识别,告别手动截图的繁琐
  • PIC18F8722驱动WS2812打造动态LED系统
  • 终极卡牌批量生成神器:3分钟制作100张专业桌游卡牌,效率提升300%
  • Linux 【01- chmod命令超详细教程】
  • 12款开源渗透测试工具实战指南:从零搭建安全工程师核心能力栈
  • LinkSwift:九大网盘直链下载助手的完整使用指南
  • 手机变身万能键盘鼠标:无需安装软件的跨设备输入方案
  • 【观止·诗史汇 HarmonyOS 实战系列 10】文试默写:从诗词内容包动态生成练习题
  • 微信/QQ 打不开先测什么:网络层与合规层的标准分工
  • GTA5线上小助手:免费开源的终极游戏增强神器
  • 3分钟实现视频PPT智能提取:告别手动截图的效率革命
  • 徐州门店 适合开业的 徐州礼品促销 礼盒厂家 能不能定制
  • 半导体百科 | 湿法清洗与干法清洗详解:金属污染去除实战
  • 6DOF IMU与PIC18微控制器的运动追踪系统设计
  • C++20:理解Concepts:C++泛型编程
  • 双芯片协同信号转换方案:PCF8591与PIC18LF47K42的嵌入式应用
  • 绝地求生罗技鼠标宏完整配置指南:从基础设置到高级优化
  • 10分钟掌握GTA5线上小助手:终极免费游戏增强工具完全指南
  • 刚刚,Anthropic 发布 Claude Sonnet 5:最能「打」的 Sonnet,性能一路逼近 Opus 4.8
  • 如何用extract-video-ppt实现3倍效率提升:视频内容智能提取的终极指南
  • AiToEarn 多平台接入架构深度分析
  • Linux iptables 端口转发:让外部访问内网 SQL Server
  • 终极指南:3步使用免费工具找回遗忘的压缩包密码