当前位置: 首页 > news >正文

[拆解LangChain执行引擎] __pregel_tasks通道——成就“PUSH任务”的功臣

除了我们显式声明的用于存储业务数据或驱动信号的Channel之外,Pregel自身也会维护一些系统Channel,其中最重要的莫过于一个名为__pregel_tasks的Channel。通过前面针对BSP的介绍,我们知道当Superstep进入同步屏障并应用所有更新后,引擎会根据Node针对Channel的订阅情况和Channel自身的更新状态生成下一步待执行的任务,其实待执行的任务不限于此。

1. 两种任务创建方式

我们将根据Node针对Channel的订阅来驱动任务执行的模式称为 “Pull模式”,与之相对的则是解决“__pregel_tasks”这个Channel实现的“Push模式”。具体来说,这是一个关闭“累积模式”的Topic类型的Channel,它存储的“Topic”体现为具有如下定义的Send对象。当某个Node执行之后,可以像这个Channel中写入一个Send来驱动某个Node在下一Superstep中执行。除了利用Send对象的node字段指定待执行的Node名称外,还可以利用arg字段提供输入参数。

class Send:   node: strarg: Anydef __init__(self, /, node: str, arg: Any) -> None

由于关闭了“累积模式”,在Topic类型Channel中写入的内容只会在下一个Superstep中生效,并且“阅后即焚”。对于执行引擎来说,这个名为“__pregel_tasks”的Channel存储的就是下一Superstep以“Push模式”驱动执行的任务列表,两者完美契合。

2. 确认__pregel_tasks通道的存在

__pregel_tasks通道的存在可以通过如下的演示实例来验证。如代码片段所示,在采用常规方式将Pregel对象创建出来后,我们根据Channel名称从它的channels字段中将此Channel提取出来。断言揭示了该Channel自身的类型、存储的数据类型和“累积模式”开关。

from langgraph.channels import LastValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequencenode = (NodeBuilder().subscribe_only("input_channel").do(lambda args: args).write_to("output_channel")
)
app = Pregel(nodes={"node": node},channels={"input_channel": LastValue(str), "output_channel": LastValue(str)},input_channels=["input_channel"],output_channels=["output_channel"],
)tasks: Topic[Send] = app.channels["__pregel_tasks"]
assert isinstance(tasks, Topic)
assert tasks.ValueType == Sequence[Send]
assert tasks.accumulate == False

3. 被保护起来的通道

虽然“__pregel_tasks”就是一个普通的Topic类型的Channel,但是它并未开发对外部使用,Pregel把它“保护”的非常好。我们不能声明一个与之同名的Channel,否则就会像如下的方式一样抛出一个ValueError,并提示“Channel '__pregel_tasks' is reserved and cannot be used in the graph.”。

from langgraph.channels import Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequencetry:app = Pregel(nodes={"node": NodeBuilder().subscribe_only("__pregel_tasks")},channels={"__pregel_tasks": Topic[Sequence[Send]]},input_channels=["input_channel"],output_channels=["output_channel"],)assert False, "Expected an error due to reserved channel name"
except Exception as e:assert isinstance(e, ValueError)assert str(e) == "Channel '__pregel_tasks' is reserved and cannot be used in the graph."

我们也不能采用常规的方式将向其发送Send对象。比如在如下的演示程序中,节点foo试图向此Channel发送一个驱节点bar执行的Send对象,最终抛出一个InvalidUpdateError异常,并提示“Cannot write to the reserved channel TASKS”。除此之外,由于Pregel在利用它将基于“PUSH模式”的任务创建出来后就会将其清空,所以我们也无法读取其中的任务。

from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send
from langgraph.errors import InvalidUpdateErrorfoo = (NodeBuilder().subscribe_to("start",read= False).do(lambda _: Send(node="bar", arg="foobar")).write_to("__pregel_tasks"))
bar = (NodeBuilder().do(lambda args:args).write_to("output"))app = Pregel(nodes={ "foo": foo, "bar": bar},channels={"start": LastValue(str),"output": LastValue(str),},input_channels=["start"],output_channels=["output"])
try: app.invoke({"start": None})assert False, "Should have raised InvalidUpdateError"
except Exception as e:assert isinstance(e, InvalidUpdateError)assert str(e) == "Cannot write to the reserved channel TASKS"

4. 唯一的解决方案

我们能够想到的常规方法针对此Channel的写入基本都绕不开引擎针对它的保护机制。我们将在下部分介绍Pregel另一个核心组成部分Node,Node会利用ChannelWriter对象实现针对Channel的写入,我们可以将针对Channel的写入意图封装成ChannelWriteTupleEntry,并以此来创建ChannelWriter,这应该是唯一能够“欺骗”引擎写入验证的唯一手段。如代码片段所示,率先执行的节点foo会返回一个驱动节点bar指定的Send对象,为了将它写入“__pregel_tasks”,我们创建了一个ChannelWriter,针对该Channel的写入定义在ChannelWriteTupleEntry对象中,具体体现在调用构造函数指定的mapper参数上,它提供一个映射将Node的执行结果转成成Channel名称和值的映射关系。

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.pregel._read import PregelNode
from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry
from langgraph.types import Sendfoo: PregelNode = (NodeBuilder().subscribe_to("foo").do(lambda _: Send(node="bar", arg="foo"))
).build()
entry = ChannelWriteTupleEntry(mapper= lambda args: [("__pregel_tasks", args)])
foo.writers.append(ChannelWrite(writes=[entry]))bar = (NodeBuilder().do(lambda args: f"bar is triggered by {args}.").write_to("output"))app = Pregel(nodes={"foo": foo, "bar": bar},channels={"foo": LastValue(None),"output": LastValue(str),},input_channels=["foo"],output_channels=["output"],
)result = app.invoke(input={"foo": None})
assert result == {"output": "bar is triggered by foo."}
http://www.jsqmd.com/news/377135/

相关文章:

  • 2026深圳留学机构推荐:如何选择专业的留学规划服务 - 品牌排行榜
  • 告别设计烦恼!漫画脸描述生成让你的角色创作更简单
  • 固生堂调理鼻炎效果好吗?从诊疗细节看实际体验 - 品牌排行榜
  • SenseVoice Small轻量模型部署成本测算:A10/A100/T4显卡性价比对比
  • 2026动态膜过滤公司哪家好?行业实力品牌推荐 - 品牌排行榜
  • 2026上海用友代理商哪家靠谱?行业服务能力对比参考 - 品牌排行榜
  • 固生堂中医是正规机构吗?从诊疗规范看其专业资质 - 品牌排行榜
  • 2026年广州看中医调理鼻炎去哪看?中医调理指南 - 品牌排行榜
  • 澜起科技行使超额配售权:额外募资10亿港元 预计2025年利润超20亿
  • 2026鼻炎中医门诊哪家好?中西医结合诊疗机构推荐 - 品牌排行榜
  • 人形机器人Apptronik完成5.2亿美元融资:谷歌与奔驰加持
  • 零代码体验:AI股票分析师镜像快速入门指南
  • 2026上海用友代理推荐:企业软件服务合作方选择参考 - 品牌排行榜
  • 2026鼻炎专业调理中心推荐:中医辨证施治新方向 - 品牌排行榜
  • 上海用友服务哪家好?2026年企业用户真实反馈指南 - 品牌排行榜
  • 2026全屋定制板材品牌哪家靠谱?环保性能与品质解析 - 品牌排行榜
  • 2026最有效的防脱生发精华液怎么选?真实测评推荐 - 品牌排行榜
  • 2026防脱育发精华液哪个牌子好?真实使用体验分享 - 品牌排行榜
  • 2026上海用友代理商选哪家?综合实力与服务能力解析 - 品牌排行榜
  • 2026市场比较好的徐州全包装修企业排名参考 - 品牌排行榜
  • 2026昆明做白内障哪家最好?本地眼科机构实力参考 - 品牌排行榜
  • 2026板材品牌怎么选?从环保技术到全球认证全攻略 - 品牌排行榜
  • 嘿!您的“马年红包皮肤”已上线,快来领取呀~
  • 双料破圈! H131综艺《我的爱播出圈啦》来袭,7天10部短剧铸就成长传奇!
  • AIIA:人工智能赋能应用实践指南 2026
  • AI编程画马(含AI辅助创作)
  • 【SpringCloud】注册中心 服务注册 服务发现 Eureka
  • 2026年私募产品券商推荐:基于专业筛选与长期陪伴维度的服务机构榜单 - 品牌推荐
  • 深度解析消费电子领域安卓开发工程师:技术栈、职责剖析与面试指南
  • 面向物联网领域的 iOS 开发高级工程师核心能力与实战考察