当前位置: 首页 > news >正文

[拆解LangChain执行引擎]非常规Pending Write的持久化

PendingWrite三元组的第二部分表示写入的Channel,但是对于一些特殊的场景,比如出错、无写入、中断和恢复,它们的值不再是一个普通的Channel名称,而是使用如下的值:

  • __error__:执行Node对应的任务出现异常;
  • __no_writes__:Node任务成功执行,但是没有执行针对Channel的输出;
  • __interrupt__:任务中断;
  • __resume__:表示恢复执行提供的数据;

接下来我们两个例子来产生上述这几种特殊的Pending Write。我们先来模拟出错的场景,如下面的代码片段所示,我们执行的Pregel对象具有一个唯一的Node,它的处理函数直接抛出一个异常。

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.checkpoint.memory import InMemorySaver
from typing import Anydef handle(args: dict[str, Any])->None:raise Exception("manllually raised exception")node = NodeBuilder().subscribe_to("start").do(handle)
app = Pregel(nodes={"body": node},channels={"start": LastValue(str)},checkpointer=InMemorySaver(),input_channels=["start"],output_channels=[],
)
config = {"configurable": {"thread_id": "123"}}
try:result = app.invoke({"start": "begin"}, config=config)
except Exception as ex:print(f"Caught exception:{ex}" )(_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
print(pending_writes)

我们在try/except块中完成针对Pregel的调用,并捕捉和输出得到的异常信息。接下来我们调用Checkpointer(一个InMemorySaver对象)的get_tuple方法得到对应的CheckpointTuple元组,然后将pending_writes部分输出出来。从如下所示的输出结果可以看出,这个Pending Write三元组的Channel名称被设置为 __error__ ,整个Exception对象成为了写入的内容。

Caught exception:manllually raised exception
[('f9ff1e88-4d82-f417-ad11-8fd870bfe647', '__error__', "Exception('manllually raised exception')")]

由于并不是所有的Node都有向Channel写入执行结果的需求,所以只要处理函数成功执行,即使没有Channel输出的行为,该任务的状态也会被视为成功,Checkpointer只是采用不同的形式来记录这种不需要写入的Pending Write。如下的这个程序不仅仅演示了这种无输出写入的场景,还同时模拟了中断和恢复。

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.checkpoint.memory import InMemorySaver
from typing import Any
from langgraph.types import Command, interruptdef foo(args: dict[str, Any]) -> list[str]:resume1 = interrupt("1st interrupt")assert resume1 == "1st resume"resume2 = interrupt("2nd interrupt")assert resume2 == "2nd resume"resume3 = interrupt("3rd interrupt")assert resume3 == "3rd resume"return [resume1, resume2, resume3]def bar(args: dict[str, Any]) -> None:passapp = Pregel(nodes={"foo": NodeBuilder().subscribe_only("start").do(foo).write_to("output"),"bar": NodeBuilder().subscribe_only("start").do(bar),},channels={"start": LastValue(str),"output": LastValue(list[str]),},input_channels=["start"],output_channels=["output"],checkpointer=InMemorySaver(),
)config = {"configurable": {"thread_id": "123"}}
result = app.invoke(input={"start": "begin"}, config=config, stream_mode="tasks")
(_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
print(f"After invoke:\n{pending_writes}")app.invoke(input=Command(resume="1st resume"), config=config)
(_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
print(f"\nAfter resume 1:\n{pending_writes}")app.invoke(input=Command(resume="2nd resume"), config=config)
(_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
print(f"\nAfter resume 2:\n{pending_writes}")result = app.invoke(input=Command(resume="3rd resume"), config=config)
assert result == {"output": ["1st resume", "2nd resume", "3rd resume"]}
(_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
print(f"\nAfter resume 3:\n{pending_writes}")

如上面的代码片段所示,我们为Pregel提供了两个并行执行的节点foo和bar,其中bar对应的函数并未执行任何有效操作,也没有任何的输出。我们为节点foo对应的处理函数制造了三次人为中断,所以需要至少四次调用才能结束。

我们在创建的RunnableConfig对象中提供了统一的Thread ID,并将它作为后续方法调用的参数。针对Pregel的三次调用,第一次是为常规调用,后面两次分别是针对两次中断的恢复调用。我们在每次调用后,得到并输出Checkpointer记录下来的Pending Write。

从如下的输出结果可以看出,第一次常规调用后, 节点foo停在第一个中断处,节点bar成功执行但没有输出,所以Checkpointer将它们作为Pending Write记录下来,Channel名称分别是__interrupt____no_writes__,前者的写入内容是一个Interrupt对象,它具有我们指定的值“1st interrupt”。我们也看到了Interrupt对象具有一个唯一标识,在恢复调用时我们可以利用此标识为其指定针对性的恢复数据(Command(resume={"id":"resume value"))。

After invoke:
[('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='1st interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None)]After resume 1:
[('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='2nd interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None), ('00000000-0000-0000-0000-000000000000', '__resume__', '1st resume'), ('8d407c25-02f6-9101-d1b8-5a99c247edde', '__resume__', ['1st resume'])]After resume 2:
[('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='3rd interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None), ('00000000-0000-0000-0000-000000000000', '__resume__', '2nd resume'), ('8d407c25-02f6-9101-d1b8-5a99c247edde', '__resume__', ['1st resume', '2nd resume'])]After resume 3:
[]

针对第一个中断的恢复调用后,节点foo停在第二个中断处,此时Checkpointer会创建两个新的Pending Write持久化我们提供的Resume Value(“1st resume”),它的Channel名称就是__resume__,但为什么是两个呢?

这实际上反映了 Pregel 处理外部指令注入Node内部消费的同步机制。第一个被称为全局Resume Value(Global Resume Value), 它代表从外部(通过Command(resume=...))注入到图中的原始指令。由于它不是由图内Node产生的,因此 Task ID 为空,它是唤醒整个暂停状态的“总开关”。第二个节点foo对全局Resume Value的消费记录,所以具有一个明确的Taks ID。当节点foo被唤醒并执行到interrupt行时,它会从全局Resume Value读取数据。为了保证幂等性和可回溯性,系统会将拿走了哪个Resume Value记录在它的任务路径下。

针对Resume的冗余设置是为了解决重入与回溯问题。全局记录证明了用户确实提供了这个值。Node记录证明了这个值确实被这个特定的interrupt函数调用消费了。一个Node内部可以连续调用多次interrupt函数,系统需要按顺序记录该Node消费过的所有Resume Value,以便在“时间旅行”或重试时能够精确对齐。

当我们调用interrupt函数实施人为中断时,底层实际上会抛出一个GraphInterrupt异常,Pregel通过捕获这个异常进而生成针对性的PendingWrite,所以针对同一个任务有可能有一个唯一的中断类型的PendingWrite。由于恢复执行总是会从头执行Node函数,所以基于中断的PendingWrite并不会恢复执行造成任何影响。所以当我们完成第二次恢复调用后,持久化的中断PendingWrite反映的是针对第二次interrupt函数的调用,对应Interrupt对象的值为2nd interrupt

Resume Value必须按照顺序提供,因为每遇到一个interrpt函数的调用,都会利用前面介绍过的计算器提供的索引,从Resume Value列表中读取Resume Value作为该调用的返回值,所以持久化的第二个基于恢复的PendingWrite对应的值变成了包含两个Resume Value的列表(['1st resume', '2nd resume'])。

在针对第三个中断的恢复执行结束后,fooNode完成了它的执行任务,而bar对应的任务本身就是成功状态,所以整个Superstep顺利结束,自然也就不存在Pending Write了。

http://www.jsqmd.com/news/395988/

相关文章:

  • 电商运营必备:AI净界RMBG-1.4商品主图优化方案
  • 【复现】基于双向反激变换器锂电池SOC主动均衡控制 1、拓扑:双向反激变换器 2、目标:六节电...
  • 2026年专业的购物网站谷歌优化/谷歌优化服务精选推荐 - 品牌宣传支持者
  • 2026年靠谱的试剂级乙醚/试剂乙醚生产商采购建议怎么选 - 品牌宣传支持者
  • Face Analysis WebUI部署教程:systemd服务化管理WebUI启停与异常自恢复
  • 中文语义检索神器BGE-Large-Zh:开箱即用的向量化工具
  • gemma-3-12b-it效果实测:128K上下文下多轮图像分析与逻辑推理展示
  • Qwen2-VL-2B-Instruct多场景落地:政务服务平台用其匹配政策文件与办事流程示意图
  • 教学视频必备!QWEN-AUDIO语音讲解快速生成
  • Pi0具身智能实战:无需硬件实现烤面包机取物模拟
  • 超越维度存在(能力)
  • OFA图像语义蕴含模型入门:从安装到推理的完整指南
  • LoRA训练助手实战案例:为100张角色图自动生成多维度训练标签
  • 2026年评价高的KNX智能家居控制系统/KNX智能家居解决方案哪家强生产厂家实力参考 - 品牌宣传支持者
  • nlp_gte_sentence-embedding_chinese-large在舆情分析系统中的应用
  • Super Qwen实时变声效果:基于Token的声纹转换技术
  • 2026年降AI率工具安全性评测:你的论文数据安全吗
  • Fish Speech 1.5音色克隆功能实测:效果惊艳的语音合成体验
  • 实用指南:八段锦练习注意要点
  • Git-RSCLIP遥感AI应用:国土空间规划中用地类型文本辅助判读
  • 答辩老师真的会看AI检测报告吗?知情人告诉你真相
  • 2026年质量好的中心供氧站房/中心供氧直销厂家价格参考怎么选 - 品牌宣传支持者
  • 弦音墨影步骤详解:视频上传→关键帧采样→Qwen2.5-VL编码→Grounding解码全流程
  • 千问图像生成16Bit(Qwen-Turbo-BF16)多场景落地:AIGC工作室降本提效实践
  • 万象熔炉 | Anything XLGPU优化:max_split_size_mb=128减少OOM概率实测报告
  • Agent Skills:让 Agent 具备真实世界能力
  • 一文讲透|继续教育必备AI论文工具 —— 千笔写作工具
  • 2026年知名的自闭症特教设备/特教设备感统教室销售厂家推荐哪家好(真实参考) - 品牌宣传支持者
  • 上下文工程:Agent 的记忆与注意力管理
  • 2026年知名的弥散供氧分子筛制氧机/弥散供氧制氧系统哪家质量好厂家推荐(实用) - 品牌宣传支持者