当前位置: 首页 > news >正文

LangGraph实战:构建可控、可调试的复杂AI工作流

LangChain 适合快速原型,但生产级 AI 工作流需要更精细的状态管理和流程控制。LangGraph 的图结构为复杂 Agent 提供了清晰的架构范式。本文通过实战案例,掌握 LangGraph 的核心工程模式。

为什么需要 LangGraph传统 LangChain Chain 的执行是线性的:A → B → C。这对简单场景够用,但真实的 AI 应用往往需要:-条件分支:根据判断结果走不同路径-循环重试:失败时重试,成功时跳出-并行执行:多个子任务同时进行-人工介入:关键决策时暂停等待人工确认这些需求用线性 Chain 实现会让代码极其复杂且难以维护。LangGraph 用有向图替代线性链,让复杂工作流的实现变得清晰。## 核心概念LangGraph 的三个核心概念:State(状态):图中流转的数据结构。所有节点共享同一个状态,每个节点读取并更新它。pythonfrom typing import TypedDict, Annotated, Listfrom langgraph.graph import add_messagesclass AgentState(TypedDict): messages: Annotated[List, add_messages] # 对话历史(自动追加) task: str # 当前任务描述 artifacts: List[dict] # 生成的产出物 error: str # 错误信息(如果有) retry_count: int # 重试次数Node(节点):处理状态的函数。每个节点接收状态,返回更新后的状态片段。Edge(边):节点间的连接。可以是固定连接(无条件流转)或条件边(根据状态决定下一个节点)。## 基础:Research Agent 实战下面实现一个研究助手 Agent,能搜索、分析、生成报告:pythonfrom langgraph.graph import StateGraph, ENDfrom langchain_openai import ChatOpenAIfrom langchain_community.tools.tavily_search import TavilySearchResultsfrom langchain_core.messages import HumanMessage, AIMessage, ToolMessageimport json# 初始化工具llm = ChatOpenAI(model="gpt-4o", temperature=0)search_tool = TavilySearchResults(max_results=5)tools = [search_tool]llm_with_tools = llm.bind_tools(tools)# ──── 节点定义 ────def research_agent(state: AgentState) -> dict: """主 Agent 节点:分析任务,决定下一步行动""" system_prompt = """你是一个研究助手。分析用户的研究任务, 使用搜索工具收集信息,然后综合生成高质量报告。 当你认为收集的信息足够生成报告时,直接生成报告内容。""" messages = [{"role": "system", "content": system_prompt}] + state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}def tool_executor(state: AgentState) -> dict: """工具执行节点:执行 Agent 调用的工具""" last_message = state["messages"][-1] tool_results = [] for tool_call in last_message.tool_calls: if tool_call["name"] == "tavily_search_results_json": result = search_tool.invoke(tool_call["args"]) tool_results.append( ToolMessage( content=json.dumps(result, ensure_ascii=False), tool_call_id=tool_call["id"] ) ) return {"messages": tool_results}def generate_report(state: AgentState) -> dict: """报告生成节点:将研究结果整理为结构化报告""" report_prompt = f"""基于以下研究任务和收集的信息,生成一份结构化报告:任务:{state["task"]}请生成包含以下部分的报告:1. 执行摘要(200字)2. 主要发现(3-5点)3. 详细分析4. 结论与建议""" messages = state["messages"] + [HumanMessage(content=report_prompt)] response = llm.invoke(messages) artifact = { "type": "report", "content": response.content, "task": state["task"] } return { "messages": [response], "artifacts": state.get("artifacts", []) + [artifact] }# ──── 条件边函数 ────def should_use_tools(state: AgentState) -> str: """判断是否需要调用工具""" last_message = state["messages"][-1] # 如果最后一条消息有工具调用,进入工具执行节点 if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "use_tools" # 否则,进入报告生成节点 return "generate_report"# ──── 构建图 ────workflow = StateGraph(AgentState)# 添加节点workflow.add_node("research_agent", research_agent)workflow.add_node("tool_executor", tool_executor)workflow.add_node("generate_report", generate_report)# 设置入口workflow.set_entry_point("research_agent")# 添加条件边:research_agent 根据是否有工具调用决定走向workflow.add_conditional_edges( "research_agent", should_use_tools, { "use_tools": "tool_executor", "generate_report": "generate_report" })# 工具执行后回到 research_agentworkflow.add_edge("tool_executor", "research_agent")# 报告生成后结束workflow.add_edge("generate_report", END)# 编译图app = workflow.compile()# 运行示例initial_state = { "messages": [HumanMessage(content="请研究2026年LLM推理优化的最新进展")], "task": "2026年LLM推理优化最新进展", "artifacts": [], "error": "", "retry_count": 0}result = app.invoke(initial_state)print(result["artifacts"][-1]["content"])## 进阶:带人工介入的审批流程生产级 Agent 往往需要"暂停并等待人工确认"的能力:pythonfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import interrupt_before# 使用 Checkpointer 支持暂停恢复memory = MemorySaver()def draft_action(state: AgentState) -> dict: """生成拟执行的操作(高风险操作,需审批)""" response = llm.invoke(state["messages"]) return { "messages": [response], "pending_action": response.content }def execute_action(state: AgentState) -> dict: """执行已审批的操作""" action = state.get("pending_action", "") # 实际执行逻辑... return {"messages": [AIMessage(content=f"已执行:{action}")]}# 构建带审批的工作流approval_workflow = StateGraph(AgentState)approval_workflow.add_node("draft", draft_action)approval_workflow.add_node("execute", execute_action)approval_workflow.set_entry_point("draft")approval_workflow.add_edge("draft", "execute") # draft → execute(中间会暂停)approval_workflow.add_edge("execute", END)# interrupt_before 指定在哪个节点前暂停app_with_approval = approval_workflow.compile( checkpointer=memory, interrupt_before=["execute"] # 在 execute 前暂停等待人工确认)# 运行到暂停点thread_id = "task-001"config = {"configurable": {"thread_id": thread_id}}result = app_with_approval.invoke(initial_state, config)# 此时图已暂停在 execute 节点之前# 人工审查后,恢复执行print("请审查以下操作:", result["pending_action"])user_approve = input("是否批准?(y/n): ")if user_approve == "y": # 从断点继续执行 final_result = app_with_approval.invoke(None, config)## 并行执行:多任务同时处理pythonfrom langgraph.graph import Senddef parallel_research_router(state: AgentState): """将研究任务拆分为多个并行子任务""" topics = state.get("topics", []) # 使用 Send 将任务分发给并行执行的节点 return [ Send("research_subtask", {"topic": topic, "messages": []}) for topic in topics ]def research_subtask(state: dict) -> dict: """单个子任务的研究执行""" topic = state["topic"] result = search_tool.invoke({"query": topic}) return {"subtask_result": {"topic": topic, "data": result}}def aggregate_results(state: AgentState) -> dict: """聚合所有子任务结果""" # subtask_results 会包含所有并行子任务的输出 all_results = state.get("subtask_results", []) aggregated = "\n\n".join([ f"## {r['topic']}\n{r['data']}" for r in all_results ]) return {"aggregated_research": aggregated}## 错误处理与重试pythondef robust_agent_node(state: AgentState) -> dict: """带错误处理的健壮 Agent 节点""" max_retries = 3 retry_count = state.get("retry_count", 0) if retry_count >= max_retries: return { "error": f"超过最大重试次数({max_retries})", "status": "failed" } try: response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "error": "", "retry_count": 0 # 成功后重置重试计数 } except Exception as e: return { "error": str(e), "retry_count": retry_count + 1 }def should_retry_or_fail(state: AgentState) -> str: if state.get("error") and state.get("retry_count", 0) < 3: return "retry" elif state.get("error"): return "fail" else: return "continue"## 可观测性:调试复杂图LangGraph 提供内置的追踪能力:python# 启用 LangSmith 追踪import osos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your-api-key"os.environ["LANGCHAIN_PROJECT"] = "my-agent-project"# 流式输出每个节点的状态变化for event in app.stream(initial_state): for node_name, node_output in event.items(): print(f"\n{'='*40}") print(f"节点: {node_name}") if "messages" in node_output: last_msg = node_output["messages"][-1] print(f"输出: {last_msg.content[:200]}...")# 获取图的可视化from IPython.display import ImageImage(app.get_graph().draw_mermaid_png())## 生产部署:LangGraph ServerLangGraph 提供 Server 模式用于生产部署:bash# 安装 LangGraph CLIpip install langgraph-cli# 创建配置文件 langgraph.jsoncat > langgraph.json << 'EOF'{ "dependencies": ["."], "graphs": { "research_agent": "./my_agent.py:app" }}EOF# 启动服务langgraph up服务器提供 REST API 和 WebSocket 接口,支持:- 创建和管理 Thread(对话线程)- 流式获取执行状态- 暂停和恢复执行- 完整的执行历史查询## 最佳实践总结1.状态设计先行:花时间设计好 State 结构,它是整个图的基础2.节点保持单一职责:每个节点只做一件事,便于测试和复用3.条件边逻辑外置:路由逻辑放在独立的判断函数中,不要嵌入节点4.使用 Checkpointer:生产环境必须启用,支持暂停恢复和错误恢复5.流式输出:用app.stream()代替app.invoke(),提升用户体验6.错误不应静默失败:在状态中维护 error 字段,在条件边中处理错误路径LangGraph 的学习曲线比 LangChain 陡,但一旦掌握,你会发现复杂 Agent 系统的开发效率和可维护性都有质的飞跃。

http://www.jsqmd.com/news/866694/

相关文章:

  • 抖音下载器:如何永久保存你喜欢的短视频内容?
  • 开源项目功能扩展技术方案:实现多账户管理与配置优化的完整指南
  • 抖音无水印下载终极指南:douyin-downloader让内容保存变得如此简单
  • 深入Linux调度器心跳:scheduler_tick原理、性能影响与调优实践
  • 网盘直链下载助手实战指南:八大平台免登录高速下载完整方案
  • 基于Linux内核list.h思想实现高效C语言单向链表
  • 专业鼠标加速配置指南:Raw Accel内核级驱动深度解析与实战优化策略
  • OpenRGB终极指南:一个软件统一控制所有RGB设备,告别厂商软件依赖
  • iOS 17.6.1系统更新深度解析:错误修复、安全加固与升级指南
  • Windows 10 21H1更新解析:聚焦混合办公安全与IT管理优化
  • Windows下OpenCore引导盘制作:5步打造完美Hackintosh启动盘
  • Python 爬虫实战:京东商品价格监控爬取与分析
  • 短剧出海AI工具推荐:翻译配音一站搞定
  • C语言字符串与指针核心函数手写实现与底层原理剖析
  • 深入解析Linux system()调用:从原理到安全实践
  • 汽车电子高效模型测试驱动开发:从需求到合规的零缺陷实践
  • 树莓派CM5工业应用实战:从核心模块到边缘AI系统构建
  • Barlow字体终极指南:用54种样式打造专业设计
  • KMS智能激活终极指南:一键永久激活Windows和Office的完整教程
  • 基于模型的测试驱动开发:实现功能安全与ASPICE合规的高效实践
  • 通过用量看板与成本管理功能精细化控制AI支出
  • 大麦网自动化抢票脚本:高效抢票解决方案指南
  • 外包项目的知识产权归属:甲方和乙方都该知道的底线
  • SpringBoot核心原理与实践:从配置地狱到约定大于配置的救赎
  • 模拟IC设计实战:误差放大器失调电压对带隙基准精度的影响与优化
  • 用if…else…end语句计算分段函数
  • MultiHighlight插件深度解析:JetBrains IDE智能代码高亮实战指南
  • 嵌入式开发板100g/2000Hz振动试验:工业可靠性验证与加固实战
  • 在企业内部知识库问答系统中集成大模型搜索增强
  • 3分钟掌握:B站缓存视频永久保存的完整免费方案