当前位置: 首页 > news >正文

[拆解LangChain执行引擎]三种持久化模式的差异

当我们在调用Pregel对象的invoke方法的时候,可以利用参数(durability)指定采用的持久化模式,它决定了在Superstep N成功结束之后,针对Checkpoint的持久化与开始Superstep N+1之间的关系。具体具有如下三种选择:

  • sync:完成了针对Superstep N的Checkpoint持久化后才执行Superstep N+1,这是最“安全”的执行方式。如果在执行过程中发生崩溃,由于状态已同步写入,系统可以确保从最新的Checkpoint完全恢复。但由于需要等待磁盘或网络写入完成,其执行延迟相对较高。
  • async: Superstep N的Checkpoint持久化和Superstep N+1同时开始。这种异步方式达到了性能与安全性的平衡。它通过重叠计算和 I/O 来提高吞吐量,减少等待时间。这是系统的默认配置。
  • exit:不在每个Superstep完成后持久化Checkpoint,而是等到整个调用结束或者遇到中断是才进行持久化。这种方式能够带来最高的性能,因为中间步骤没有持久化开销。但其风险最大,如果程序在图执行结束前崩溃,该次运行的所有中间状态和最终结果都将丢失。

1. Checkpoint持久化

接下来通过几个简单的实例演示来进一步加强对上述三种持久化模式的理解。为了确定Checkpoint持久化的时机,我们定义了如下这个派生于InMemorySaver的ExtendedInMemorySaver类。重写的方法在返回基类的同名方法的调用结果前,模拟了一秒的演示,并做了相应的输出。

from langgraph.checkpoint.memory import InMemorySaver
import time
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersionsclass ExtendedInMemorySaver(InMemorySaver):   def put(self,config: RunnableConfig,checkpoint: Checkpoint,metadata: CheckpointMetadata,new_versions: ChannelVersions,) -> RunnableConfig:time.sleep(1)  # Simulate some delayprint(f"put called with checkpoint for step {metadata['step']}")return super().put(config, checkpoint, metadata, new_versions)

我们构建了如下这个由四个Node组成的Pregel,它的Checkpointer使用的正是上面这个ExtendedInMemorySaver。我们通过写入通道foo驱动节点foo1和foo2并行执行,foo1和foo2在执行结束分别写入对应的Channel驱动bar1和bar2执行。为了确定Node执行的时机,我们也在对应的处理函数中做了相应的输出。

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue, BinaryOperatorAggregate
import operator,time
from functools import partial
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersionsdef handle(node: str, arg: dict):print(f"node '{node}' is called.")return [node]foo1 = (NodeBuilder().subscribe_to("foo",read = False).do(partial(handle, "foo1")).write_to("bar1")
)
foo2 = (NodeBuilder().subscribe_to("foo",read = False).do(partial(handle, "foo2")).write_to("bar2")
)
bar1 = (NodeBuilder().subscribe_to("bar1",read = False).do(partial(handle, "bar1")).write_to("output")
)
bar2 = (NodeBuilder().subscribe_to("bar2",read = False).do(partial(handle, "bar2")).write_to("output")
)
app = Pregel(nodes={"foo1": foo1, "foo2": foo2, "bar1": bar1, "bar2": bar2},channels={"foo": LastValue(str),"bar1": LastValue(str),"bar2": LastValue(str),"output": BinaryOperatorAggregate(list, operator.add),},  input_channels=["foo"],output_channels=["output"],checkpointer=ExtendedInMemorySaver(),
)config = {"configurable": {"thread_id": "123"}}
result = app.invoke(input={"foo": "start"}, config=config, durability="sync")
assert result["output"] == ["bar1", "bar2"]

我们调用Pregel对象的invoke方法时显式地将durability参数设置为sync。从如下所示的输出可以看出,当节点foo1和foo2完成执行后,对应Superstep 0的Checkpoint被持久化之后,Superstep 1中的bar1和bar2才开始执行。输出结果还反映了另一个现象:虽然我们采用了sync持久化模式,但是针对Superstep -1针对原始输入的持久化并不能保证在Superstep 0(最先驱动的节点foo1和foo2执行所在的Superstep)开始之前完成。

node 'foo1' is called.
node 'foo2' is called.
put called with checkpoint for step -1
put called with checkpoint for step 0
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step 1

如下所示的是采用async持久化模式的输出结果,可以看出Superstep 1针对节点bar1和bar2的执行和针对Superstep 0的基于Checkpoint持久化是同步进行的。由于put方法模拟了1秒的延时,所以持久化最后才结束。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step -1
put called with checkpoint for step 0
put called with checkpoint for step 1

如果将持久化模式设置为exit,将会产生如下的输出结果。可以看出,这种模式仅在整个调用结束后对Checkpoint作一次持久化。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step 2

2. Pending Write持久化

持久化包括在Superstep完成后针对Checkpoint的持久化和Superstep过程中针对Pending Write的持久化,但是syncasync持久化模式对后者没有任何区别,当Node执行结束或者遇到中断都会针对当前产生的Pending Write作及时的持久化。为了确认我们的想法,我们修改了ExtendedInMemorySaver,按照如下的方式重写了put_writes方法。

class ExtendedInMemorySaver(InMemorySaver):       def put_writes(self,config: RunnableConfig,writes: Sequence[tuple[str, Any]],task_id: str,task_path: str = "",) -> None:time.sleep(1)  # Simulate some delayprint(f"put_writes called with writes: {writes}")

即使我们采用sync持久化模式,针对四个Node任务的Pending Write都是以异步方式执行的,所以会产生如下的输出结果。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put_writes called with writes: deque([('bar2', ['foo2'])])
put_writes called with writes: deque([('bar1', ['foo1'])])
put_writes called with writes: deque([('output', ['bar1'])])
put_writes called with writes: deque([('output', ['bar2'])])

但是如果持久化模式设置成exit,在不产生中断的情况下,不会有任何的Pending Write被持久化,输出将会是如下的结果。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.

为了模拟exit持久化模式下的中断,我们修改了四个Node最终调用的handle方法,让它在节点bar1中模拟一个人为中断。

def handle(node: str, arg: dict):print(f"node '{node}' is called.")if node == "bar1":interrupt("manual interrupt")return [node]

虽然遇到中断的bar2是在Superstep 1中执行的,但是整个过程中没有任何一个Checkpoint被持久化,这种情况下不得不对整个过程实施回滚。此时会写入如下所示的四个Pending Write,除了针对bar1的中断类型的Pending Write,其他三个都是针对成功执行任务的Channel写入。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put_writes called with writes: deque([('bar1', ['foo1'])])
put_writes called with writes: deque([('bar2', ['foo2'])])
put_writes called with writes: deque([('output', ['bar2'])])
put_writes called with writes: [('__interrupt__', (Interrupt(value='manual interrupt', id='f10d2458e1d1ff38c6b55d008907af52'),))]
http://www.jsqmd.com/news/402444/

相关文章:

  • Vue+python的农副产品商城交易平台的设计与开发_8r0k4x95
  • Vue+python的每日鲜牛奶订购系统的设计与实现 商家
  • Redis数据恢复实战:从RDB/AOF备份文件完整恢复指南
  • 实测对比后 10个降AIGC工具:研究生降AI率必备测评与推荐
  • 2026.2.22:微调resnet50模型训练CIFAR-10,准确率达0.9349
  • 运筹学-运输问题
  • 智能客服选型指南:如何评估比MaxKB更优的解决方案
  • 运筹学-运输问题(伏格尔法)
  • ComfyUI Prompt Outputs Failed Validation:新手避坑指南与解决方案
  • 基于安卓智能家电的毕业设计:从零构建可扩展的 IoT 控制应用
  • 深度测评 9个AI论文工具:继续教育毕业论文写作全攻略
  • Vue+python的毕业生招聘职位推荐系统设计与实现_j3yts8xh
  • Vue+python的图书阅读分享系统的设计与实现_qgl1ls3u
  • 用过才敢说 9个一键生成论文工具:研究生毕业论文+科研写作必备测评
  • CF1578L Labyrinth题解
  • 如何判断盒马鲜生礼品卡回收平台是否正规? - 京顺回收
  • 基本dos操作
  • Vue+python的在线个性化电影推荐与观影社交平台的设计与实现_wl88o05e
  • VS Code中cl.exe编译调试的开发者命令提示符依赖问题解析与解决方案
  • 拖延症福音 10个AI论文网站深度测评,专科生毕业论文写作必备!
  • ChatGPT Exporter 实战:如何高效导出和管理对话数据
  • Conda Prompt界面定位与实战指南:从环境管理到高效开发
  • Chatbot Arena实战入门:从零构建综合AI领域的对话系统
  • 实战指南:如何安全高效地下载与部署 chattts model.safetensors 模型
  • 人工智能 - AI重构企业数字化格局
  • 五金店管理系统毕设:从单体架构到模块化解耦的技术实践
  • Vue+python的旅游信息网站的设计与实现_x0p96alf
  • 城市空气质量预测毕设:从数据获取到模型部署的新手实战指南
  • AI辅助开发实战:如何优化CosyVoice在CPU上的运行效率
  • 基于DeepSeek智能客服的AI辅助开发实战:从对话管理到系统集成