LangGraph实战:构建可控、可调试的复杂AI工作流
LangChain 适合快速原型,但生产级 AI 工作流需要更精细的状态管理和流程控制。LangGraph 的图结构为复杂 Agent 提供了清晰的架构范式。本文通过实战案例,掌握 LangGraph 的核心工程模式。
LangChain 适合快速原型,但生产级 AI 工作流需要更精细的状态管理和流程控制。LangGraph 的图结构为复杂 Agent 提供了清晰的架构范式。本文通过实战案例,掌握 LangGraph 的核心工程模式。
pythonfrom typing import TypedDict, Annotated, Listfrom langgraph.graph import add_messagesclass AgentState(TypedDict): messages: Annotated[List, add_messages] # 对话历史(自动追加) task: str # 当前任务描述 artifacts: List[dict] # 生成的产出物 error: str # 错误信息(如果有) retry_count: int # 重试次数Node(节点):处理状态的函数。每个节点接收状态,返回更新后的状态片段。Edge(边):节点间的连接。可以是固定连接(无条件流转)或条件边(根据状态决定下一个节点)。## 基础:Research Agent 实战下面实现一个研究助手 Agent,能搜索、分析、生成报告:pythonfrom langgraph.graph import StateGraph, ENDfrom langchain_openai import ChatOpenAIfrom langchain_community.tools.tavily_search import TavilySearchResultsfrom langchain_core.messages import HumanMessage, AIMessage, ToolMessageimport json# 初始化工具llm = ChatOpenAI(model="gpt-4o", temperature=0)search_tool = TavilySearchResults(max_results=5)tools = [search_tool]llm_with_tools = llm.bind_tools(tools)# ──── 节点定义 ────def research_agent(state: AgentState) -> dict: """主 Agent 节点:分析任务,决定下一步行动""" system_prompt = """你是一个研究助手。分析用户的研究任务, 使用搜索工具收集信息,然后综合生成高质量报告。 当你认为收集的信息足够生成报告时,直接生成报告内容。""" messages = [{"role": "system", "content": system_prompt}] + state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}def tool_executor(state: AgentState) -> dict: """工具执行节点:执行 Agent 调用的工具""" last_message = state["messages"][-1] tool_results = [] for tool_call in last_message.tool_calls: if tool_call["name"] == "tavily_search_results_json": result = search_tool.invoke(tool_call["args"]) tool_results.append( ToolMessage( content=json.dumps(result, ensure_ascii=False), tool_call_id=tool_call["id"] ) ) return {"messages": tool_results}def generate_report(state: AgentState) -> dict: """报告生成节点:将研究结果整理为结构化报告""" report_prompt = f"""基于以下研究任务和收集的信息,生成一份结构化报告:任务:{state["task"]}请生成包含以下部分的报告:1. 执行摘要(200字)2. 主要发现(3-5点)3. 详细分析4. 结论与建议""" messages = state["messages"] + [HumanMessage(content=report_prompt)] response = llm.invoke(messages) artifact = { "type": "report", "content": response.content, "task": state["task"] } return { "messages": [response], "artifacts": state.get("artifacts", []) + [artifact] }# ──── 条件边函数 ────def should_use_tools(state: AgentState) -> str: """判断是否需要调用工具""" last_message = state["messages"][-1] # 如果最后一条消息有工具调用,进入工具执行节点 if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "use_tools" # 否则,进入报告生成节点 return "generate_report"# ──── 构建图 ────workflow = StateGraph(AgentState)# 添加节点workflow.add_node("research_agent", research_agent)workflow.add_node("tool_executor", tool_executor)workflow.add_node("generate_report", generate_report)# 设置入口workflow.set_entry_point("research_agent")# 添加条件边:research_agent 根据是否有工具调用决定走向workflow.add_conditional_edges( "research_agent", should_use_tools, { "use_tools": "tool_executor", "generate_report": "generate_report" })# 工具执行后回到 research_agentworkflow.add_edge("tool_executor", "research_agent")# 报告生成后结束workflow.add_edge("generate_report", END)# 编译图app = workflow.compile()# 运行示例initial_state = { "messages": [HumanMessage(content="请研究2026年LLM推理优化的最新进展")], "task": "2026年LLM推理优化最新进展", "artifacts": [], "error": "", "retry_count": 0}result = app.invoke(initial_state)print(result["artifacts"][-1]["content"])## 进阶:带人工介入的审批流程生产级 Agent 往往需要"暂停并等待人工确认"的能力:pythonfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import interrupt_before# 使用 Checkpointer 支持暂停恢复memory = MemorySaver()def draft_action(state: AgentState) -> dict: """生成拟执行的操作(高风险操作,需审批)""" response = llm.invoke(state["messages"]) return { "messages": [response], "pending_action": response.content }def execute_action(state: AgentState) -> dict: """执行已审批的操作""" action = state.get("pending_action", "") # 实际执行逻辑... return {"messages": [AIMessage(content=f"已执行:{action}")]}# 构建带审批的工作流approval_workflow = StateGraph(AgentState)approval_workflow.add_node("draft", draft_action)approval_workflow.add_node("execute", execute_action)approval_workflow.set_entry_point("draft")approval_workflow.add_edge("draft", "execute") # draft → execute(中间会暂停)approval_workflow.add_edge("execute", END)# interrupt_before 指定在哪个节点前暂停app_with_approval = approval_workflow.compile( checkpointer=memory, interrupt_before=["execute"] # 在 execute 前暂停等待人工确认)# 运行到暂停点thread_id = "task-001"config = {"configurable": {"thread_id": thread_id}}result = app_with_approval.invoke(initial_state, config)# 此时图已暂停在 execute 节点之前# 人工审查后,恢复执行print("请审查以下操作:", result["pending_action"])user_approve = input("是否批准?(y/n): ")if user_approve == "y": # 从断点继续执行 final_result = app_with_approval.invoke(None, config)## 并行执行:多任务同时处理pythonfrom langgraph.graph import Senddef parallel_research_router(state: AgentState): """将研究任务拆分为多个并行子任务""" topics = state.get("topics", []) # 使用 Send 将任务分发给并行执行的节点 return [ Send("research_subtask", {"topic": topic, "messages": []}) for topic in topics ]def research_subtask(state: dict) -> dict: """单个子任务的研究执行""" topic = state["topic"] result = search_tool.invoke({"query": topic}) return {"subtask_result": {"topic": topic, "data": result}}def aggregate_results(state: AgentState) -> dict: """聚合所有子任务结果""" # subtask_results 会包含所有并行子任务的输出 all_results = state.get("subtask_results", []) aggregated = "\n\n".join([ f"## {r['topic']}\n{r['data']}" for r in all_results ]) return {"aggregated_research": aggregated}## 错误处理与重试pythondef robust_agent_node(state: AgentState) -> dict: """带错误处理的健壮 Agent 节点""" max_retries = 3 retry_count = state.get("retry_count", 0) if retry_count >= max_retries: return { "error": f"超过最大重试次数({max_retries})", "status": "failed" } try: response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "error": "", "retry_count": 0 # 成功后重置重试计数 } except Exception as e: return { "error": str(e), "retry_count": retry_count + 1 }def should_retry_or_fail(state: AgentState) -> str: if state.get("error") and state.get("retry_count", 0) < 3: return "retry" elif state.get("error"): return "fail" else: return "continue"## 可观测性:调试复杂图LangGraph 提供内置的追踪能力:python# 启用 LangSmith 追踪import osos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your-api-key"os.environ["LANGCHAIN_PROJECT"] = "my-agent-project"# 流式输出每个节点的状态变化for event in app.stream(initial_state): for node_name, node_output in event.items(): print(f"\n{'='*40}") print(f"节点: {node_name}") if "messages" in node_output: last_msg = node_output["messages"][-1] print(f"输出: {last_msg.content[:200]}...")# 获取图的可视化from IPython.display import ImageImage(app.get_graph().draw_mermaid_png())## 生产部署:LangGraph ServerLangGraph 提供 Server 模式用于生产部署:bash# 安装 LangGraph CLIpip install langgraph-cli# 创建配置文件 langgraph.jsoncat > langgraph.json << 'EOF'{ "dependencies": ["."], "graphs": { "research_agent": "./my_agent.py:app" }}EOF# 启动服务langgraph up服务器提供 REST API 和 WebSocket 接口,支持:- 创建和管理 Thread(对话线程)- 流式获取执行状态- 暂停和恢复执行- 完整的执行历史查询## 最佳实践总结1.状态设计先行:花时间设计好 State 结构,它是整个图的基础2.节点保持单一职责:每个节点只做一件事,便于测试和复用3.条件边逻辑外置:路由逻辑放在独立的判断函数中,不要嵌入节点4.使用 Checkpointer:生产环境必须启用,支持暂停恢复和错误恢复5.流式输出:用app.stream()代替app.invoke(),提升用户体验6.错误不应静默失败:在状态中维护 error 字段,在条件边中处理错误路径LangGraph 的学习曲线比 LangChain 陡,但一旦掌握,你会发现复杂 Agent 系统的开发效率和可维护性都有质的飞跃。