当前位置: 首页 > news >正文

[拆解LangChain执行引擎]一个实例理解LangChain的几种流模式

invoke/ainvoke方法看起来是采用简单的请求/回复消息交换模式,客户端需等待整个流程执行完毕后才能得到结果,其实方法背后还是会调用stream/astream方法以流的方式进行交互。如果我们直接调用调用这两个方法,并采用相应的流模式,我们就能有效解决客户端长时间无响应的问题,实时地得到对方的反馈。

class Pregel(PregelProtocol[StateT, ContextT, InputT, OutputT],Generic[StateT, ContextT, InputT, OutputT]): def stream(self,input: InputT | Command | None,config: RunnableConfig | None = None,*,context: ContextT | None = None,stream_mode: StreamMode | Sequence[StreamMode] | None = None,print_mode: StreamMode | Sequence[StreamMode] = (),output_keys: str | Sequence[str] | None = None,interrupt_before: All | Sequence[str] | None = None,interrupt_after: All | Sequence[str] | None = None,durability: Durability | None = None,subgraphs: bool = False,debug: bool | None = None,**kwargs: Unpack[DeprecatedKwargs],
) -> Iterator[dict[str, Any] | Any]  async def astream(self,input: InputT | Command | None,config: RunnableConfig | None = None,*,context: ContextT | None = None,stream_mode: StreamMode | Sequence[StreamMode] | None = None,print_mode: StreamMode | Sequence[StreamMode] = (),output_keys: str | Sequence[str] | None = None,interrupt_before: All | Sequence[str] | None = None,interrupt_after: All | Sequence[str] | None = None,durability: Durability | None = None,subgraphs: bool = False,debug: bool | None = None,**kwargs: Unpack[DeprecatedKwargs],) -> AsyncIterator[dict[str, Any] | Any]

stream/astream方法的众多参数中,表示流模式的stream_mode参数最为重要,其对应类型StreamMode以字符串字面量的形式定义了七个选项。流是Pregel引擎向调用者提供数据的基本工作方法,它采用订阅发布的形式。Pregel对象发布的内容由对它订阅决定,因为发布客户端不敢兴趣的内容不但毫无意义,而且还会对影响造成极大的影响。

StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]

在调用stream/astream方法时,我们可以根据需要指定一个或者多个流模式(以StreamMode序列的形式)。如果没有显式设置(None),Pregel对象自身的stream_mode字段会作为兜底,该字典的默认值为“values。如果当前Pregel对象以子图的形式被调用,会默认使用values模式,subgraphs参数用于控制是否希望得到子图的输出。下面列出了七种流模式对应的输出内容:

  • values:在每个Superstep结束后输出全部Channel的值;
  • updates:针对每个Node输出由它更新的Channel值;
  • checkpoints:在创建新的Checkpoint的时候,输出与get_state方法返回值具有相同结构的内容;
  • tasks:在Node任务开始和结束的时候输出任务ID、Node名称和其他相关信息;
  • debug:可以简单认为是tasks + checkpoints;
  • messages:输出语言模型产生的Token和相关元数据;
  • 开发者在Node处理函数中利用StreamWriter自行输出的内容;

如果混合使用多种流模式,stream/astream方法会返回一个字典,自带的Key表示当前输出采用的流模式。对于单一模式的调用,会直接返回输出的内容。如果采用custom模式,Node处理方法可以利用SteamWriter向客户端实时输出自定义的内容。StreamWriter和静态上下文一样,都属于当前Runtime的一部分,后者可以利用注入Node处理函数的RunnableConfig提取。下面演示程序会使用所有的流模式,我们在每个Node的处理函数中利用StreamWriter输出当前的Node名称。

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue, BinaryOperatorAggregate
import operator
from functools import partial
from langchain_core.runnables import RunnableConfig
from typing import Any,Sequence
from langgraph.runtime import Runtime
from langgraph.types import StreamWriter,StreamMode
from collections import defaultdictdef handle(node: str, inputs: dict[str, Any], config: RunnableConfig) -> list[str]:  runtime:Runtime = config["configurable"].get("__pregel_runtime")writer:StreamWriter = runtime.stream_writerwriter(f"node '{node}' is called.")return [node]foo = (NodeBuilder().subscribe_to("foo",read = False).do(partial(handle, "foo")).write_to(bar="triggered by foo")
)
bar1 = (NodeBuilder().subscribe_to("bar",read = False).do(partial(handle, "bar1")).write_to("output")
)
bar2 = (NodeBuilder().subscribe_to("bar",read = False).do(partial(handle, "bar2")).write_to("output"))app = Pregel(nodes={"foo": foo, "bar1": bar1, "bar2": bar2},channels={"foo": LastValue(str),"bar": LastValue(str),"output": BinaryOperatorAggregate(list, operator.add),},  input_channels=["foo"],output_channels=["output"],checkpointer=InMemorySaver(),
)config={"configurable": {"thread_id": "123"}}
stream_mode: Sequence[StreamMode] = ["values", "updates","checkpoints","tasks","debug","custom"]
result: defaultdict[str, list[str]] = defaultdict(list)
for (mode,chunk) in app.stream(input={"foo": None}, stream_mode= stream_mode, config=config):result[mode].append(chunk)for mode,chunks in result.items():index = 1for chunk in chunks:print(f"{index}.[{mode}] {chunk}")index += 1print()

创建的Pregel由节点foo、bar1和bar2构成。节点foo率先执行,bar1和bar2随后并行执行。我们将调用stream方法收集到的内容根据流模式分组进行输出。我们来分析一下如下的输出结果:

  • 由于三个Node都会涉及到针对Channel的更新,所以会有三个updates模式的输出。
  • 整个流程涉及三个Supperstep,两个values模式的输出的全量的状态对应于后两个Superstep。
  • 三个Node对应三个任务,所以具有六个tasks模式的输出反映这三个任务的开始和结束。
  • 经历的三个Superstep对应三次Checkpoint的创建,所以我们能看到三个checkpoints模式的输出。
  • 六个tasks加三个checkpoints,所以有九个debug模式的输出。
  • 三个Node中针对StreamWriter的调用对应三个custom模式的输出。
1.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'parent_config': None, 'values': {'foo': None, 'output': []}, 'metadata': {'source': 'input', 'step': -1, 'parents': {}}, 'next': ['foo'], 'tasks': [{'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'interrupts': (), 'state': None}]}
2.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': []}, 'metadata': {'source': 'loop', 'step': 0, 'parents': {}}, 'next': ['bar1', 'bar2'], 'tasks': [{'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'interrupts': (), 'state': None}, {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'interrupts': (), 'state': None}]}  
3.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9f3-6cbf-8001-e4219a2f4b58'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}, 'metadata': {'source': 'loop', 'step': 1, 'parents': {}}, 'next': [], 'tasks': []}1.[debug] {'step': -1, 'timestamp': '2026-01-27T00:08:26.865918+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'parent_config': None, 'values': {'foo': None, 'output': []}, 'metadata': {'source': 'input', 'step': -1, 'parents': {}}, 'next': ['foo'], 'tasks': [{'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'interrupts': (), 'state': None}]}}
2.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.865933+00:00', 'type': 'task', 'payload': {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'input': {}, 'triggers': ('foo',)}}
3.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.866516+00:00', 'type': 'task_result', 'payload': {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'error': None, 'result': {'bar': 'triggered by foo'}, 'interrupts': []}}
4.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.867326+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': []}, 'metadata': {'source': 'loop', 'step': 0, 'parents': {}}, 'next': ['bar1', 'bar2'], 'tasks': [{'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'interrupts': (), 'state': None}, {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'interrupts': (), 'state': None}]}}
5.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.867338+00:00', 'type': 'task', 'payload': {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'input': {}, 'triggers': ('bar',)}}
6.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.867343+00:00', 'type': 'task', 'payload': {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'input': {}, 'triggers': ('bar',)}}
7.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868043+00:00', 'type': 'task_result', 'payload': {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'error': None, 'result': {'output': ['bar2']}, 'interrupts': []}}
8.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868117+00:00', 'type': 'task_result', 'payload': {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'error': None, 'result': {'output': ['bar1']}, 'interrupts': []}}
9.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868533+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9f3-6cbf-8001-e4219a2f4b58'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}, 'metadata': {'source': 'loop', 'step': 1, 'parents': {}}, 'next': [], 'tasks': []}}1.[tasks] {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'input': {}, 'triggers': ('foo',)}
2.[tasks] {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'error': None, 'result': {'bar': 'triggered by foo'}, 'interrupts': []}
3.[tasks] {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'input': {}, 'triggers': ('bar',)}
4.[tasks] {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'input': {}, 'triggers': ('bar',)}
5.[tasks] {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'error': None, 'result': {'output': ['bar2']}, 'interrupts': []}
6.[tasks] {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'error': None, 'result': {'output': ['bar1']}, 'interrupts': []}1.[custom] node 'foo' is called.
2.[custom] node 'bar2' is called.
3.[custom] node 'bar1' is called.1.[updates] {'foo': {'bar': 'triggered by foo'}}
2.[updates] {'bar2': {'output': ['bar2']}}
3.[updates] {'bar1': {'output': ['bar1']}}1.[values] {'foo': None, 'bar': 'triggered by foo', 'output': []}
2.[values] {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}
http://www.jsqmd.com/news/413067/

相关文章:

  • 员工界面显示,以及关键字查询功能
  • RethinkFun深度学习笔记
  • 2026优质FFU厂家有哪些?行业实力企业推荐 - 品牌排行榜
  • Android 语音应用开发工程师(智能座舱方向):核心技术解析与面试宝典
  • 2026年FFU品牌选择建议:技术指标与品牌实力综合分析 - 品牌排行榜
  • 2026高效过滤器哪家最好用?行业热门选择推荐 - 品牌排行榜
  • 2026推荐中效袋式过滤器哪个品牌好 - 品牌排行榜
  • 2026高效过滤器选哪家?行业热门品牌推荐 - 品牌排行榜
  • 2026中效过滤器哪个品牌好?行业实力品牌推荐 - 品牌排行榜
  • 2026手工砖哪家好用又实惠?高性价比品牌推荐 - 品牌排行榜
  • 2026年好的手工瓷砖品牌推荐:精选优质品牌盘点 - 品牌排行榜
  • 2026年HEPA过滤器有哪些品牌?国内外优质品牌推荐 - 品牌排行榜
  • 2026好的手工瓷砖品牌有哪些?这份精选推荐值得关注 - 品牌排行榜
  • 2026佛山复古手工砖工厂盘点:探寻本土特色工艺之选 - 品牌排行榜
  • 2026中暑口服液哪个牌子好一些?口碑推荐 - 品牌排行榜
  • 2026年推荐适合工装设计的马赛克手工砖品牌 - 品牌排行榜
  • 2026年在哪个平台订机票更省心?实测体验分享 - 品牌排行榜
  • 2026哪个平台有特价机票?高性价比选择指南 - 品牌排行榜
  • 2026年中暑口服液哪个牌子好?成分与效果实测参考 - 品牌排行榜
  • 孕妇可以吃金银花口服液吗?2026年科学饮用指南 - 品牌排行榜
  • 2026细胞培养液过滤机构有哪些?行业推荐榜单 - 品牌排行榜
  • 2026年什么中暑口服液效果好?成分与功效解析 - 品牌排行榜
  • 2026年机票在哪个平台买划算?实用购票指南 - 品牌排行榜
  • 金银花口服液有没有用?2026年真实使用场景解析 - 品牌排行榜
  • LuoTianyi and the Floating Islands (Hard Version)
  • 2026哪个平台买机票便宜?实用省钱攻略及平台推荐 - 品牌排行榜
  • Sentence-Transformers 深度解析:超越基础嵌入,构筑现代化语义智能应用
  • 双环节快递分拣流水线——串联排队系统模拟仿真(2+3)
  • 基于Java+SpringBoot+SSM,SpringCloud宠物社区(源码+LW+调试文档+讲解等)/宠物交流平台app/宠物爱好者社区app/宠物社交应用app/宠物互动社区app
  • 移动端开发工程师职位全面解析与面试准备指南