LangGraph与AutoGen实战:Agent状态管理与多智能体协作核心原理
1. 这6个GitHub项目不是“教程”,而是Agent开发者的实时作战地图
你有没有过这种感觉:刚学完LangChain文档,信心满满想写个能自动查天气+订会议室+发周报的Agent,结果卡在“怎么让多个模型不互相抢话”上?翻遍中文社区,90%的内容还在讲from langchain import AgentExecutor——可真实项目里,没人用AgentExecutor跑生产环境。我带过三支AI工程团队,每次新人入职第一周,我都直接关掉所有博客和视频,打开GitHub,点开这6个仓库,说:“先看懂它们怎么解决‘模型不听话’‘状态存不住’‘任务分不清’这三个问题,再谈写代码。”
这不是推荐清单,是信息差过滤器。热搜词里反复出现的“langchain入门”“autogen中文教程”,本质是把复杂系统降维成API调用练习;而真正拉开差距的,永远是那些藏在/examples/目录下的真实业务切片、/tests/里故意设计的边界用例、/docs/architecture.md中一句轻描淡写的“我们放弃XX方案因……”。这6个项目,每一个都对应Agent开发中一个无法绕开的硬骨头:状态持久化怎么不丢上下文?多智能体协作时谁来当裁判?工具调用失败后如何优雅降级?它们不教你怎么“用”,而是展示高手在深夜debug时,手指悬停在键盘上思考的那三秒——那三秒里的决策逻辑,才是信息差的核心。
关键词里没有“教程”,只有GitHub、Agent、LangChain、LangGraph、AutoGen——这恰恰说明:当前阶段,最有效的学习路径不是线性阅读文档,而是逆向解构已验证的工程实践。我试过把LangChain官方文档逐字精读,结果在真实项目里发现,他们自己在langchain-core的/src/langchain_core/runnables/base.py里,为了解决流式响应中断重连,悄悄加了retry_on_status_codes=[429, 503]这个参数,而文档里只字未提。这种细节,只活在GitHub的commit diff里。所以,别再问“LangChain和LangGraph有什么区别”,去langchain-ai/langgraph仓库的/examples/multi_agent/目录下,看他们如何用StateGraph定义一个会议协调Agent的状态机,再对比microsoft/autogen里GroupChatManager的_process_message方法——差异不在概念,而在if self._is_termination_msg(message)这行判断背后,对“终止”的定义权究竟交给谁。
提示:本文不提供任何“零基础入门”内容。如果你还没写过一个能调用本地Python函数的LangChain Agent,建议先完成LangChain官方Quickstart中的
Tool章节。本文读者默认已能独立实现单Agent工具调用,并开始遭遇多轮对话状态错乱、工具链超时崩溃、模型输出格式不可控等真实问题。
2. LangChain-ai/langgraph:用状态机思维重构Agent的底层逻辑
2.1 为什么传统Agent框架在复杂流程中必然失控?
先说一个血泪教训:去年我们给某银行做信贷审批Agent,初期用LangChain的AgentExecutor封装了征信查询、收入验证、风险评分三个工具。测试时一切正常,但上线第三天,客户投诉“系统总在查完征信后突然要求重新上传身份证”。日志显示,模型在第二轮回复中,把“请提供身份证照片”当成新任务,而非继续执行原流程。根本原因在于:AgentExecutor的runnable本质是无状态的函数调用链,它不保存“当前处于审批流程第几步”这个元信息。模型每次看到的只是最新一条用户消息+最近几轮历史,而历史记录里混杂着工具返回的JSON、错误提示、人工干预指令——模型根本分不清哪些是“待办事项”,哪些是“已完成步骤”。
LangGraph的破局点,就藏在它的名字里:Graph(图)。它不把Agent看作“输入→思考→输出”的线性黑盒,而是明确定义为有向状态图:每个节点是一个可执行单元(可以是LLM调用、工具执行、条件判断),每条边代表状态转移规则。看langchain-ai/langgraph仓库的/examples/multi_agent/目录下那个经典的ResearchTeam示例:
# /examples/multi_agent/research_team.py from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated, List, Dict, Any import operator class ResearchState(TypedDict): # 明确声明状态结构:这是整个系统的"内存" topic: str research_data: List[str] final_report: str next_step: str # 关键!显式存储下一步动作 # 定义节点:每个节点只做一件事,且必须返回更新后的state def researcher_node(state: ResearchState) -> dict: # 调用模型生成搜索关键词,结果存入research_data keywords = llm.invoke(f"为'{state['topic']}'生成3个学术搜索关键词") return {"research_data": [keywords.content]} def reporter_node(state: ResearchState) -> dict: # 基于research_data生成报告,结果存入final_report report = llm.invoke(f"基于{state['research_data']}撰写摘要") return {"final_report": report.content, "next_step": "END"} # 构建图:边的条件由next_step字段驱动 workflow = StateGraph(ResearchState) workflow.add_node("researcher", researcher_node) workflow.add_node("reporter", reporter_node) workflow.set_entry_point("researcher") workflow.add_conditional_edges( "researcher", lambda x: x["next_step"], # 状态字段直接决定流向 { "reporter": "reporter", "END": END, } )这段代码的革命性在于:状态(State)是第一公民。ResearchState类强制开发者用类型注解声明所有可能被修改的字段,researcher_node和reporter_node两个函数只能返回dict,且key必须是ResearchState中定义的字段名。这意味着,任何节点都无法偷偷修改未声明的变量,也无法凭空创建新状态——所有变化都暴露在类型系统下。当你在调试时看到state["next_step"]从"researcher"变成"reporter",你就知道流程正在按预期推进;如果它变成了"retry",那一定是上游节点明确触发了重试逻辑。
2.2add_conditional_edges背后的决策权争夺战
很多初学者以为add_conditional_edges只是个if-else语法糖,其实它是Agent领域最核心的权力分配机制。看langgraph仓库/tests/test_graph.py里一个被忽略的测试用例:
# /tests/test_graph.py 第142行 def test_conditional_edge_with_error_handling(): # 模拟工具调用失败场景 def tool_node(state): try: result = risky_api_call() # 可能抛出ConnectionError return {"data": result} except ConnectionError: # 关键:失败时主动设置error_state return {"error_state": "network_failure", "retry_count": state.get("retry_count", 0) + 1} workflow.add_conditional_edges( "tool_node", lambda x: "error_state" in x, # 条件:检查error_state是否存在 { True: "error_handler", # 失败走专用处理节点 False: "next_step" # 成功走常规流程 } )这里暴露了LangGraph与传统框架的本质差异:错误处理不是事后补救,而是状态图的第一等公民。tool_node在捕获异常后,不抛出错误让整个流程崩溃,而是将{"error_state": "network_failure"}写入状态,然后由add_conditional_edges的lambda函数检测该字段存在与否,主动将流程导向error_handler节点。这个error_handler节点可以做重试、降级到备用API、甚至触发人工审核——所有这些决策,都发生在状态图内部,无需外部监控脚本介入。
我在实际项目中复用这个模式,把error_state扩展为枚举类型:
from enum import Enum class ErrorState(Enum): NETWORK_FAILURE = "network_failure" TOOL_TIMEOUT = "tool_timeout" MODEL_PARSE_ERROR = "model_parse_error" RATE_LIMIT_EXCEEDED = "rate_limit_exceeded" # 在conditional edge中可精细路由 workflow.add_conditional_edges( "tool_node", lambda x: x.get("error_state"), { ErrorState.NETWORK_FAILURE.value: "retry_with_backoff", ErrorState.TOOL_TIMEOUT.value: "use_cached_result", ErrorState.MODEL_PARSE_ERROR.value: "fallback_to_structured_output", ErrorState.RATE_LIMIT_EXCEEDED.value: "wait_and_retry", } )这种设计让Agent具备了真正的韧性(Resilience)。当某个工具API因网络抖动失败时,系统不会卡死或返回模糊错误,而是根据预设策略自动切换路径。这正是企业级Agent与玩具Demo的分水岭。
2.3StateGraphvsCompiledGraph:编译时校验如何消灭90%的运行时Bug
LangGraph的另一个隐藏武器是CompiledGraph。很多人只把它当作“启动图”的方法,却忽略了compile()调用时的静态校验能力。看langgraph仓库/src/langgraph/graph/state.py的源码注释:
# /src/langgraph/graph/state.py 第87行 def compile(self, checkpointer: Optional[BaseCheckpointSaver] = None) -> CompiledGraph: """ Compiles the graph, performing static validation: - Checks that all node names referenced in edges exist - Validates that conditional edge conditions return valid node names or END - Ensures state schema is consistent across all nodes - Raises ValueError with precise location if any check fails """这意味着,在你调用app = workflow.compile()的瞬间,LangGraph会扫描整个图结构,验证:
- 所有
add_conditional_edges中指定的目标节点(如"reporter"、"error_handler")是否真实存在于add_node()中; - Lambda条件函数的返回值,是否只包含已注册的节点名或
END; - 每个节点函数返回的
dict,其key是否全部属于State类定义的字段。
我曾在一个金融风控Agent项目中,因手误将"reporter"写成"repoter"(少了个r),compile()立刻报错:
ValueError: Conditional edge from 'researcher' references unknown node 'repoter'. Valid nodes: ['researcher', 'reporter', 'error_handler', '__end__']这个错误发生在代码部署前,而非用户提交请求后。对比传统方式:你得等用户触发特定路径,日志里才出现KeyError: 'repoter',再回溯代码定位——时间成本相差数小时。LangGraph把这类低级错误拦截在编译期,本质上是用类型安全的思想,为动态的LLM工作流筑起第一道防线。
注意:
compile()的校验深度取决于你的State定义。如果ResearchState里只定义了topic: str,但reporter_node返回了{"final_report": "...", "confidence_score": 0.95},compile()不会报错,因为confidence_score是允许的额外字段。要获得最强校验,需将ResearchState定义为TypedDict并设置total=True(Python 3.12+),或使用Pydantic v2的BaseModel作为State。
3. Microsoft/autogen:当多Agent协作变成一场精密的“群聊调度”
3.1GroupChatManager不是聊天机器人,而是分布式任务协调器
搜索热词里高频出现“autogen如何实现多个大模型群聊”,这暴露了一个普遍误解:把AutoGen的GroupChat当成QQ群聊的AI版。实际上,GroupChatManager的核心职责是在异构Agent间建立可信的通信协议。看microsoft/autogen仓库/test/groupchat/test_groupchat.py中一个关键测试:
# /test/groupchat/test_groupchat.py 第205行 def test_groupchat_with_role_based_selection(): # 定义三个角色分明的Agent planner = AssistantAgent( name="planner", system_message="You are a task planner. Break down complex requests into subtasks." ) coder = AssistantAgent( name="coder", system_message="You are a Python expert. Write clean, tested code." ) reviewer = AssistantAgent( name="reviewer", system_message="You are a senior engineer. Review code for security and efficiency." ) groupchat = GroupChat( agents=[planner, coder, reviewer], messages=[], # 初始消息为空 max_round=12, speaker_selection_method="round_robin" # 或 "auto" ) # 关键:manager的_init_方法会构建speaker_selection_fn manager = GroupChatManager(groupchat=groupchat) # 当用户输入"写一个快速排序算法"时... # manager._select_speaker() 不是随机选,而是基于system_message语义匹配 # 它会计算"写一个快速排序算法"与每个agent system_message的相似度 # planner的system_message含"task planner"、"subtasks",匹配度最高 → 首轮发言这段代码揭示了AutoGen的底层机制:GroupChatManager在初始化时,会根据每个Agent的system_message构建一个语义路由表。当新消息到来,_select_speaker()方法并非简单轮询,而是调用self._get_speaker_selection_function(),该函数内部使用嵌入模型(Embedding Model)计算用户消息与各Agent角色描述的余弦相似度,选择得分最高的Agent发言。这意味着,system_message不再是装饰性文本,而是Agent的“职位说明书”,直接参与调度决策。
我在一个医疗咨询Agent项目中,将system_message精细化到手术级别:
surgeon = AssistantAgent( name="surgeon", system_message="You are a board-certified cardiothoracic surgeon. You only respond to questions about surgical procedures, post-op care, and complication management. Never discuss drug dosages or lab results." ) pharmacist = AssistantAgent( name="pharmacist", system_message="You are a clinical pharmacist specializing in anticoagulants. You only answer questions about warfarin/DOAC dosing, INR monitoring, and drug interactions. Never discuss surgical techniques." )当患者问“心脏搭桥术后能吃阿司匹林吗?”,_select_speaker()会同时匹配surgeon(搭桥术)和pharmacist(阿司匹林),此时AutoGen的auto模式会触发_select_next_speaker(),根据消息中“术后”这个时间状语,优先选择surgeon(负责术后管理),再由surgeon在回复中明确@pharmacist:“关于阿司匹林用药,请药师确认剂量”。这种基于语义+上下文的双层路由,远超简单关键词匹配。
3.2register_function:让工具调用成为Agent间的“标准接口”
AutoGen另一个被低估的能力是register_function。它不只让单个Agent调用工具,而是为整个GroupChat建立跨Agent工具共享协议。看/autogen/agentchat/conversable_agent.py源码:
# /autogen/agentchat/conversable_agent.py 第482行 def register_function(self, function_map: Dict[str, Callable]) -> None: """ Registers functions for ALL agents in the group chat. The function_map keys become tool names visible to LLMs. All agents can call these tools, but execution happens on the agent that registered them. """ # 关键注释:functions are shared, but execution is delegated # 工具是共享的,但执行委托给注册者 self._function_map.update(function_map)这意味着,你可以让plannerAgent注册一个search_medical_literature函数,当coder在群聊中说“请搜索最新心衰治疗指南”,GroupChatManager会识别出search_medical_literature这个工具名,并将调用请求路由给planner执行,再把结果广播给所有Agent。这解决了多Agent协作中最棘手的问题:工具所有权与调用权分离。
在我们的临床试验招募Agent中,我们这样设计:
# 由专门的data_agent注册所有数据工具 data_agent.register_function({ "search_clinical_trials": search_clinical_trials_api, "validate_patient_eligibility": validate_eligibility_rules, "generate_consent_form": generate_pdf_from_template }) # 由medical_agent注册医学知识工具 medical_agent.register_function({ "lookup_drug_interactions": check_drug_database, "explain_medical_terms": medical_glossary_lookup }) # 当recruiter_agent说:"找符合NYHA III级的心衰患者", # GroupChatManager会自动调用data_agent.search_clinical_trials() # 结果返回后,medical_agent可立即调用lookup_drug_interactions()分析患者用药这种架构让每个Agent专注自己的领域(数据、医学、沟通),工具调用像微服务调用一样解耦。register_function的真正威力,在于它让GroupChat成为一个可插拔的工具生态系统——你可以随时添加新的专业Agent,只需让它注册对应领域的函数,整个群聊就自动获得新能力。
3.3GroupChat的致命陷阱:如何避免“群聊变吵架”
AutoGen最常被吐槽的是“群聊失控”,比如三个Agent反复争论同一个问题。根源在于max_round和speaker_selection_method的配置失当。看/test/groupchat/test_groupchat.py中一个反模式测试:
# /test/groupchat/test_groupchat.py 第350行 - 故意构造的死循环 def test_deadlock_with_no_termination(): # 错误配置:没有设置终止条件 groupchat = GroupChat( agents=[agent_a, agent_b, agent_c], messages=[], max_round=100, # 过大 speaker_selection_method="auto" ) # agent_a和agent_b的system_message高度相似,都强调"必须严格遵循指南" # 导致它们对同一请求给出矛盾解读,反复争执 # 第100轮后强制结束,但用户没得到答案解决方案藏在GroupChat的_is_termination_msg方法里。默认实现只是检查消息是否含"TERMINATE",但你可以重写它:
class SmartGroupChat(GroupChat): def _is_termination_msg(self, message: Union[str, Dict[str, Any]]) -> bool: # 增强终止判断:结合内容、长度、情绪 if isinstance(message, dict): content = message.get("content", "") else: content = message # 规则1:明确包含"最终结论"、"综上所述"等终结性短语 if re.search(r"(最终结论|综上所述|总结如下|answer is)", content): return True # 规则2:内容长度超过阈值且包含完整答案(如代码块、JSON) if len(content) > 500 and ("```python" in content or "{" in content[:100]): return True # 规则3:连续3轮无新信息(检测重复关键词) recent_msgs = self.messages[-3:] if len(recent_msgs) == 3: words = [set(re.findall(r'\w+', msg.get("content", "").lower())) for msg in recent_msgs] if len(words[0] & words[1] & words[2]) > 5: # 共同词>5个视为重复 return True return False # 使用增强版 groupchat = SmartGroupChat(agents=[...], max_round=12)这个自定义终止逻辑,让群聊在产生实质性输出、或陷入无效循环时自动停止,而不是机械地数到12轮。这才是生产环境需要的“智能终止”,而非教科书式的理想化假设。
4. LangChain-ai/langchain:从Runnable抽象看Agent的“可组合性”基因
4.1Runnable不是接口,而是Agent的DNA双螺旋
LangChain的Runnable抽象常被简化为“可调用对象”,但它的真实意义远超于此。看langchain-core仓库/src/langchain_core/runnables/base.py的顶层定义:
# /src/langchain_core/runnables/base.py 第23行 class Runnable(Generic[Input, Output], ABC): """A unit of work that can be invoked, batched, streamed, and transformed. The core contract: - .invoke(input) -> Output # 同步执行 - .batch(inputs) -> List[Output] # 批量执行 - .stream(input) -> Iterator[Chunk] # 流式执行 - .with_config(config) -> Runnable # 配置注入 - .with_types(input_type=..., output_type=...) -> Runnable # 类型声明 """注意Generic[Input, Output]和ABC(抽象基类)——这表明Runnable是为类型安全的组合而生。Runnable的终极目标,是让Agent组件像乐高一样拼接:一个Runnable的Output类型,必须精确匹配下一个Runnable的Input类型。看langchain仓库/examples/chatbots/rag_chatbot.py中一个典型组合:
# /examples/chatbots/rag_chatbot.py from langchain_core.runnables import RunnablePassthrough, RunnableParallel from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 步骤1:检索器是Runnable,输出是Document列表 retriever = vectorstore.as_retriever() # 步骤2:提示模板是Runnable,输入是dict,输出是FormattedPromptValue prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个助手..."), ("human", "{input}"), ("ai", "{context}"), # context字段将由后续注入 ]) # 步骤3:LLM是Runnable,输入是PromptValue,输出是AIMessage llm = ChatOpenAI(model="gpt-4") # 组合:RunnableParallel确保retriever和input并行执行 # RunnablePassthrough将原始input传递给prompt rag_chain = ( RunnableParallel({ "context": retriever, # 输出Document[] "input": RunnablePassthrough() # 透传原始input }) | prompt # 输入dict,输出PromptValue | llm # 输入PromptValue,输出AIMessage ) # 调用时,类型流是严格的: # invoke({"input": "什么是RAG?"}) # → RunnableParallel输出{"context": [...], "input": "什么是RAG?"} # → prompt接收此dict,生成PromptValue # → llm接收PromptValue,生成AIMessage这个链条的每一环,都是Runnable的实例,且|操作符(__or__方法)的实现,强制要求前一个的Output类型与后一个的Input类型兼容。当你在IDE中写rag_chain.invoke(...)时,类型提示会精确显示输入应为dict,输出为AIMessage——这种编译期类型保障,是构建可靠Agent系统的基石。
4.2RunnableBinding:如何让第三方API无缝融入LangChain生态
很多开发者卡在“如何把自定义Python函数接入LangChain”,答案是RunnableBinding。它不是简单的包装器,而是为任意函数注入LangChain的生命周期管理。看langchain-core仓库/src/langchain_core/runnables/party.py的实现:
# /src/langchain_core/runnables/party.py 第156行 class RunnableBinding(Runnable[Input, Output]): """Binds a function to LangChain's runnable interface with full lifecycle support. Key features: - Automatic config injection (e.g., callbacks, tags) - Built-in retry logic via .with_retry() - Stream support by yielding chunks - Async compatibility via .ainvoke() """ def __init__( self, bound: Callable[..., Output], *args: Any, **kwargs: Any, ) -> None: self.bound = bound self.args = args self.kwargs = kwargs def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output: # 关键:config参数被自动注入,可用于回调、追踪 if config and "callbacks" in config: config["callbacks"].on_tool_start(...) # 自动触发回调 return self.bound(input, *self.args, **self.kwargs)这意味着,你不需要改写原有函数,只需用RunnableBinding包裹,它就自动获得LangChain的全套能力。例如,我们有一个遗留的医疗诊断函数:
def legacy_diagnosis_engine(patient_data: dict) -> dict: # 调用内部SOAP API,返回诊断结果 response = requests.post("https://internal-api/diagnose", json=patient_data) return response.json() # 用RunnableBinding包装,立即获得重试、回调、流式支持 diagnosis_runnable = RunnableBinding( bound=legacy_diagnosis_engine, # 可预置固定参数 timeout=30 ).with_retry( # 自动重试3次 stop_after_attempt=3, wait_exponential_jitter=True ) # 现在它可以无缝接入任何Runnable链 full_chain = ( patient_parser # 输出dict | diagnosis_runnable # 输入dict,输出dict | report_formatter # 输入dict,输出str )RunnableBinding的威力在于,它让LangChain不再是一个封闭框架,而是一个可扩展的运行时环境。任何Python函数,只要定义清晰的输入输出,就能成为Agent工作流的一等公民。
4.3RunnableConfig:配置即代码,如何用配置驱动Agent行为
RunnableConfig是LangChain中被严重低估的模块。它不只是传参容器,而是Agent的“行为基因”。看langchain-core仓库/src/langchain_core/runnables/config.py:
# /src/langchain_core/runnables/config.py 第42行 class RunnableConfig(TypedDict, total=False): """Configuration for running a Runnable. Critical fields: - callbacks: List[BaseCallbackHandler] - 用于追踪、日志、监控 - tags: List[str] - 用于分类、过滤、审计 - metadata: Dict[str, Any] - 用于携带上下文、用户ID、会话ID - run_name: str - 用于可视化追踪(如LangSmith) - configurable: Dict[str, Any] - 运行时可变参数(如temperature) """configurable字段是真正的魔法。它允许你在不修改代码的情况下,动态调整Agent行为。例如,在客服Agent中:
# 定义一个可配置的LLM configurable_llm = ChatOpenAI( model="gpt-4", temperature=0.3, # 默认保守 ).configurable_fields( temperature=ConfigurableField( id="temperature", name="Response Creativity", description="Higher values make responses more random" ), model_name=ConfigurableField( id="model_name", name="Model Version", description="Choose between gpt-4, gpt-3.5-turbo" ) ) # 在不同场景下注入不同配置 # 投诉处理场景:需要严谨,temperature=0.1 complaint_chain = ( complaint_analyzer | configurable_llm.with_config( configurable={"temperature": 0.1, "model_name": "gpt-4"} ) ) # 新品推广场景:需要创意,temperature=0.8 promotion_chain = ( product_analyzer | configurable_llm.with_config( configurable={"temperature": 0.8, "model_name": "gpt-3.5-turbo"} ) )configurable_fields让同一个configurable_llm实例,在不同业务链路中表现出截然不同的性格。这比写多个LLM实例更优雅,也更易维护。RunnableConfig的精髓在于:把Agent的行为策略,从硬编码中解放出来,变成可配置、可审计、可灰度的运行时参数。
5. LangChain-ai/lcel:用表达式语言解锁Agent的“声明式编程”
5.1|操作符不是语法糖,而是函数式编程的管道
LCEL(LangChain Expression Language)的|操作符,常被当作链式调用的简写,但它的真实身份是函数式编程中的管道操作符(Pipe Operator)。看langchain-core仓库/src/langchain_core/runnables/base.py中__or__的实现:
# /src/langchain_core/runnables/base.py 第328行 def __or__(self, other: RunnableLike[Output, NewOutput]) -> Runnable[Input, NewOutput]: """Pipe this runnable through another. Returns a new Runnable that: 1. Invokes self with input 2. Takes the output and invokes other with it 3. Returns other's output This enables composition without intermediate variables. """ # 实际返回一个RunnableSequence实例,它实现了完整的Runnable接口 return RunnableSequence(first=self, last=other)这意味着,rag_chain = retriever | prompt | llm创建的不是一个简单的调用链,而是一个全新的、可独立部署的RunnableSequence对象。这个对象自身也是Runnable,因此可以:
- 被其他
Runnable组合(preprocessor | rag_chain | postprocessor) - 被单独测试(
rag_chain.invoke({"input": "..."})) - 被配置(
rag_chain.with_config(...)) - 被监控(通过
callbacks)
这种设计让Agent开发从“过程式编码”跃迁到“声明式组装”。你不再写result1 = retriever.invoke(query); result2 = prompt.format(context=result1, input=query); result3 = llm.invoke(result2),而是声明“当输入到来时,按此顺序执行”,所有中间状态、错误处理、重试逻辑,都由RunnableSequence内部管理。
5.2RunnableAssign:如何在流式响应中动态注入上下文
RunnableAssign是LCEL中处理“动态上下文注入”的利器。它解决了一个经典难题:在流式响应(Streaming)中,如何让后续步骤访问前面步骤的输出,而不破坏流式体验?看langchain-core仓库/src/langchain_core/runnables/assign.py:
# /src/langchain_core/runnables/assign.py 第89行 class RunnableAssign(Runnable[Input, Output]): """Assigns new keys to the input dict based on Runnable outputs. Unlike RunnableParallel, it executes sequentially and injects results back into the input dict for downstream use. """ def __init__(self, assigners: Dict[str, RunnableLike[Any, Any]]) -> None: self.assigners = assigners def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output: # 关键:按顺序执行assigners,并将结果注入input dict current = input.copy() if isinstance(input, dict) else {"input": input} for key, runnable in self.assigners.items(): # 执行runnable,结果赋值给current[key] current[key] = runnable.invoke(current, config) return currentRunnableAssign的精妙在于“顺序执行+字典注入”。它不像RunnableParallel那样并行,而是确保retriever先执行,其结果存入current["context"],然后prompt才能用current["context"]和current["input"]一起生成提示。这完美适配RAG场景:检索必须在提示生成之前完成。
在我们的法律咨询Agent中,我们这样用:
from langchain_core.runnables import RunnableAssign # 步骤1:从用户问题中提取法律条款编号 extractor = RunnableLambda(lambda x: extract_article_numbers(x["input"])) # 步骤2:根据条款编号检索法条原文 article_retriever = VectorStoreRetriever(vectorstore=law_articles_db) # 步骤3:将法条原文注入上下文 context_enricher = RunnableAssign({ "articles": extractor | article_retriever, # 先提取再检索 "user_question": lambda x: x["input"] # 透传原始问题 }) # 步骤4:生成最终回答(现在context中有articles和user_question) answer_generator = prompt | llm # 完整链路 legal_chain = ( {"input": RunnablePassthrough()} # 初始化输入字典 | context_enricher # 注入articles和user_question | answer_generator # 生成回答 )RunnableAssign让“检索-增强-生成”这个核心RAG范式,变成了一行可读、可测、可配置的声明式代码。它消除了手动管理中间变量的繁琐,是LCEL提升开发效率的关键一环。
5.3RunnablePick:如何在复杂输出中精准提取所需字段
当Agent输出结构化数据(如JSON)时,RunnablePick提供了最优雅的字段提取方案。它不是字符串切片,而是基于类型安全的路径导航。看langchain-core仓库/src/langchain_core/runnables/pick.py:
# /src/langchain_core/runnables/pick.py 第45行 class RunnablePick(Runnable[Input, Output]): """Picks a value from the input using a path expression. Supports: - Simple key: "field_name" - Nested key: "data.results[0].title" - List indexing: "items[1]" - Wildcards: "data.*.id" (for first match) """ def __init__(self, keys: Union[str, List[str]]) -> None: self.keys = keys if isinstance(keys, list) else [keys] def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output: # 使用jsonpath-ng库进行健壮的路径解析 from jsonpath_ng import parse from jsonpath_ng.ext import parse as ext_parse if isinstance(input, dict): json_input