当前位置: 首页 > news >正文

【LangGraph】LangGraph 协调者-工作者模式完全解析:从零构建一个智能报告生成系统

目录

  • LangGraph 协调者-工作者模式完全解析:从零构建一个智能报告生成系统
    • 一、这个系统是做什么的?
    • 二、整体流程图
    • 三、核心概念:协调者-工作者模式
      • 3.1 和普通并行化的区别
      • 3.2 生活例子
    • 四、状态定义(State)
      • 4.1 字段说明
      • 4.2 重要:`operator.add` 的作用
    • 五、节点详解
      • 5.1 节点1:`orchestrator`(协调者)
      • 5.2 分配函数:`assign_workers`(关键!)(边)
        • 重点:`Send` 是什么?
      • 5.3 节点2:`llm_call`(工作者)
      • 5.4 节点3:`synthesizer`(合成器)
    • 六、图的构建(核心难点)
      • 6.1 完整代码
      • 6.2 关键:`add_conditional_edges` 的两种用法
    • 七、执行过程详解
    • 八、完整可运行代码
    • 九、总结

LangGraph 协调者-工作者模式完全解析:从零构建一个智能报告生成系统


一、这个系统是做什么的?

一句话:输入一个主题,自动生成一份完整的报告。

比如你输入“中国近代史”,系统会自动:

  1. 分析主题,决定报告分几个章节
  2. 为每个章节分配一个写作任务
  3. 多个“员工”同时写作不同章节
  4. 把所有章节拼成完整报告

二、整体流程图

用户输入: {"topic": "中国近代史"} │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 节点1: orchestrator(协调者/领导) │ │ │ │ 作用:分析主题,制定章节计划 │ │ 输入:{"topic": "中国近代史"} │ │ 输出:{"sections": [章1, 章2, 章3]} │ │ │ │ 内部逻辑:调用 LLM,强制输出固定格式的章节列表 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 函数: assign_workers(分配任务)(边) │ │ │ │ 作用:为每个章节创建一个 Send 对象 │ │ 输入:{"sections": [章1, 章2, 章3]} │ │ 输出:[Send("llm_call", {"section": 章1}), │ │ Send("llm_call", {"section": 章2}), │ │ Send("llm_call", {"section": 章3})] │ └─────────────────────────────────────────────────────────────┘ │ │ 动态创建 3 条边 │ ┌───────────┼───────────┐ │ │ │ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │ 节点2 │ │ 节点2 │ │ 节点2 │ │llm_call│ │llm_call│ │llm_call│ │ (员工) │ │ (员工) │ │ (员工) │ │ │ │ │ │ │ │ 写章1 │ │ 写章2 │ │ 写章3 │ └───┬────┘ └───┬────┘ └───┬────┘ │ │ │ └──────────┼──────────┘ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 节点3: synthesizer(合成器/文员) │ │ │ │ 作用:把所有章节拼成完整报告 │ │ 输入:{"completed_sections": ["章1内容", "章2内容", ...]} │ │ 输出:{"final_report": "章1内容\n---\n章2内容\n---\n..."} │ └─────────────────────────────────────────────────────────────┘ │ ▼ 输出报告

三、核心概念:协调者-工作者模式

3.1 和普通并行化的区别

对比普通并行化协调者-工作者模式
任务数量设计时就固定运行时动态决定
任务内容预先知道协调者分析后才知道
代码中的体现add_edge写死Send动态创建

3.2 生活例子

角色类比代码中的节点
协调者领导:决定写哪几章orchestrator
工作者员工:每人写一章llm_call
合成器文员:把章拼成书synthesizer

关键:领导可能决定写3章,也可能写5章。员工数量取决于领导的决定。


四、状态定义(State)

classState(TypedDict):topic:str# 用户输入的主题sections:list# 协调者生成的章节列表completed_sections:Annotated[List,operator.add]# 员工写好的内容final_report:str# 最终报告

4.1 字段说明

字段类型作用谁写入
topicstr用户输入的主题调用者
sectionslist章节计划orchestrator
completed_sectionslist员工写好的内容llm_call(多个)
final_reportstr最终报告synthesizer

4.2 重要:operator.add的作用

completed_sections:Annotated[List,operator.add]
没有operator.addoperator.add
后返回的结果覆盖前面的所有结果追加到列表
只有一个员工的结果所有员工的结果都在列表里

如果不加这个,多个员工同时交稿,只有最后一个会被保留。


五、节点详解

5.1 节点1:orchestrator(协调者)

deforchestrator(state:State):response=planner.invoke(f"为主题'{state['topic']}'制定报告大纲,包含3个章节")return{"sections":response.sections}
项目说明
输入{"topic": "中国近代史"}
输出{"sections": [Section对象, Section对象, ...]}
作用分析主题,制定章节计划

5.2 分配函数:assign_workers(关键!)(边)

defassign_workers(state:State):worker_tasks=[]forsectioninstate["sections"]:worker_tasks.append(Send("llm_call",{"section":section}))returnworker_tasks
项目说明
输入{"sections": [章1, 章2, 章3]}
输出[Send(...), Send(...), Send(...)]
作用为每个章节创建一个工作者任务
重点:Send是什么?
Send("llm_call",{"section":section})
参数含义
第一个参数目标节点的名字
第二个参数传给该节点的state

Send的作用:动态创建一条边,并携带数据给目标节点。
注意:

  • llm_call收到的state只有Send传递的{"section": ...}

5.3 节点2:llm_call(工作者)

defllm_call(state:dict):section=state["section"]result=model.invoke(f"编写报告章节:{section.name}, 内容要求:{section.description}")return{"completed_sections":[result.content]}
项目说明
输入{"section": 章1}(来自Send
输出{"completed_sections": ["章1内容"]}
作用写一个章节的内容

5.4 节点3:synthesizer(合成器)

defsynthesizer(state:State):completed_sections=state["completed_sections"]final_report="\n\n---\n\n".join(completed_sections)return{"final_report":final_report}
项目说明
输入{"completed_sections": ["章1内容", "章2内容", ...]}
输出{"final_report": "章1内容\n---\n章2内容\n---\n..."}
作用把所有章节拼成完整报告

"\n\n---\n\n".join(completed_sections):把列表里的多个字符串,用 \n\n—\n\n 作为分隔符,拼接成一个字符串。


六、图的构建(核心难点)

6.1 完整代码

builder=StateGraph(State)# 添加节点builder.add_node("orchestrator",orchestrator)builder.add_node("llm_call",llm_call)builder.add_node("synthesizer",synthesizer)# 边1:开始 → 协调者builder.add_edge(START,"orchestrator")# 边2:条件边(关键!)builder.add_conditional_edges("orchestrator",assign_workers,["llm_call"]# 用于类型检查,声明可能去的节点。实际去多少次、每次带什么数据,由 `assign_workers` 返回的 `Send` 列表决定。)# 边3:工作者 → 合成器builder.add_edge("llm_call","synthesizer")builder.add_edge("synthesizer",END)

6.2 关键:add_conditional_edges的两种用法

普通用法本代码的用法
路由函数返回一个节点名assign_workers返回多个Send对象
只去一个节点可以去多个节点(同一个节点多次)
不能携带数据每个Send可以携带不同数据
# 普通用法defroute(state):ifstate["type"]=="A":return"node_A"else:return"node_B"builder.add_conditional_edges("node",route,["node_A","node_B"])# 本代码的用法(动态创建多个)defassign_workers(state):return[Send("llm_call",{"section":s})forsinstate["sections"]]builder.add_conditional_edges("orchestrator",assign_workers,["llm_call"])

七、执行过程详解

第1步:用户输入

result=workflow.invoke({"topic":"中国近代史"})

第2步:orchestrator执行

# 输入state={"topic":"中国近代史"}# 调用 LLM,生成3个章节response=planner.invoke("为主题'中国近代史'制定报告大纲,包含3个章节")# response 是 Sections 对象# response.sections = [章1, 章2, 章3]# 返回return{"sections":response.sections}

第3步:assign_workers执行

# 输入state={"topic":"中国近代史","sections":[1,2,3]}# 为每个章节创建 Sendreturn[Send("llm_call",{"section":1}),Send("llm_call",{"section":2}),Send("llm_call",{"section":3})]

第4步:llm_call执行(3次并行)

# 第一次调用state={"section":1}# 写第一章内容return{"completed_sections":["第一章内容..."]}# 第二次调用state={"section":2}return{"completed_sections":["第二章内容..."]}# 第三次调用state={"section":3}return{"completed_sections":["第三章内容..."]}

由于completed_sections使用了operator.add,三个结果自动合并成:

{"completed_sections":["第一章内容...","第二章内容...","第三章内容..."]}

第5步:synthesizer执行

# 输入state={"completed_sections":["第一章内容...","第二章内容...","第三章内容..."]}# 拼接final_report="第一章内容...\n\n---\n\n第二章内容...\n\n---\n\n第三章内容..."# 返回return{"final_report":final_report}

第6步:输出

print(result["final_report"])# 输出完整的报告

八、完整可运行代码

fromtypingimportAnnotated,List,TypedDictimportoperatorfromlangchain.chat_modelsimportinit_chat_modelfromlanggraph.constantsimportSTART,ENDfromlanggraph.graphimportStateGraphfromlanggraph.typesimportSendfrompydanticimportBaseModel# ============================================================# 1. 定义状态# ============================================================classState(TypedDict):topic:strsections:listcompleted_sections:Annotated[List,operator.add]final_report:str# ============================================================# 2. 定义结构化输出# ============================================================classSection(BaseModel):name:strdescription:strclassSections(BaseModel):sections:List[Section]# ============================================================# 3. 创建模型# ============================================================model=init_chat_model("gpt-4o-mini")# 普通模型,用于生成内容planner=model.with_structured_output(Sections)# 结构化模型,用于生成大纲# ============================================================# 4. 协调者节点# ============================================================deforchestrator(state:State):response=planner.invoke(f"为主题'{state['topic']}'制定报告大纲,包含3个章节")return{"sections":response.sections}# ============================================================# 5. 工作者节点# ============================================================defllm_call(state:dict):section=state["section"]result=model.invoke(f"编写报告章节:{section.name}, 内容要求:{section.description}")return{"completed_sections":[result.content]}# ============================================================# 6. 合成器节点# ============================================================defsynthesizer(state:State):completed_sections=state["completed_sections"]final_report="\n\n---\n\n".join(completed_sections)return{"final_report":final_report}# ============================================================# 7. 任务分配函数# ============================================================defassign_workers(state:State):worker_tasks=[]forsectioninstate["sections"]:worker_tasks.append(Send("llm_call",{"section":section}))returnworker_tasks# ============================================================# 8. 构建图# ============================================================builder=StateGraph(State)builder.add_node("orchestrator",orchestrator)builder.add_node("llm_call",llm_call)builder.add_node("synthesizer",synthesizer)builder.add_edge(START,"orchestrator")builder.add_conditional_edges("orchestrator",assign_workers,["llm_call"])builder.add_edge("llm_call","synthesizer")builder.add_edge("synthesizer",END)workflow=builder.compile()# ============================================================# 9. 运行# ============================================================if__name__=="__main__":result=workflow.invoke({"topic":"中国近代史"})print("\n"+"="*50)print("最终报告:")print("="*50)print(result["final_report"])

九、总结

概念解释
协调者-工作者模式领导动态分配任务,员工并行执行
Send动态创建边,可以携带不同数据
operator.add让多个结果自动合并到列表
with_structured_output强制 LLM 输出固定格式
add_conditional_edges可以返回Send列表,动态创建多条边
工作者的state不是全局状态,只是Send传递的数据

一句话总结:协调者动态决定任务数量,Send动态创建边并携带数据,工作者并行执行,结果自动合并,最终合成完整报告。

http://www.jsqmd.com/news/913456/

相关文章:

  • BeeWorks:以安全专属与AI原生,重新定义企业即时通讯的智能入口
  • 杆塔型太阳能供电系统亲测分享:哪家公司最靠谱?
  • Agent Teams 多代理协作
  • CRNN中文文字识别完整工程包:含360CC数据集、训练模型与PyTorch可运行源码
  • LIO-SAM 优化方向综述:从因子图到多模态SLAM
  • 模型幻觉频发、收敛极慢、资源耗尽——Claude优化问题全链路诊断,今天必须修复的4个致命配置
  • 用 AI 写自媒体文案,再也不用熬夜
  • 业主做门窗定制,到底在定制什么?从安全、舒适到交付的真实需求分析
  • DOM ProcessingInst: 深入解析与高效实践
  • 选装修公司别瞎跑,靠谱张工教你几招辨好坏
  • vue3 + ts reactive方式清空表单对象
  • Unity里用WebView插件播放WebRTC视频流,我踩过的坑和完整配置流程
  • 微信如何群发文件与PDF?2026合规批量分发完整解决方案
  • Uni-Dock批量对接实战:从SMILES到结果分析,一条龙避坑指南(附完整Python脚本)
  • 从“增程之王”到“纯电标杆”,理想汽车击碎偏见
  • Claude头脑风暴辅助实战手册(企业级思维加速器):覆盖创意发散、逻辑收敛、方案落地全链路
  • LSD-SLAM 完整安装教程(Ubuntu 20.04 + ROS Noetic + OpenCV)
  • 网络工程- 如何组件一个小型办公室网络
  • macOS微信防撤回终极指南:3分钟搞定WeChatIntercept完整安装教程
  • 如何用WeChatMsg打造个人专属的微信聊天记录档案馆
  • Wechatsync CSDN 草稿同步源码分析:为什么当前只能保存草稿,不能自动公开发布
  • 开福区标书制作哪家靠谱
  • MongoDB事务处理实战
  • 别再死记硬背了!用这3个方法,让你的Mac快捷键记忆效率翻倍(附实用工具推荐)
  • AI智能日志异常检测告警平台:告别人工排查,秒级定位线上故障
  • 打破Windows与Linux数据壁垒:5分钟掌握NTFS-3G跨平台文件互通
  • Claude情感响应失真诊断手册(工业级情感熵值测算首次公开)
  • 在HermesAgent项目中自定义Provider接入Taotoken多模型服务
  • 2026最新华为OD机试新系统 机考真题考点分类 + 备考策略
  • 2026年AI服务统一入口横评,主流平台技术硬实力究竟谁领先?