LangGraph 12节点智能体工作流编排:从状态设计到条件路由的实战拆解
写在前面
LangChain/LangGraph生态里最常被问到的问题是:"Agent和工作流到底怎么选?"
我的答案是:大多数生产级NL2SQL场景,不需要自由的Agent Loop,需要的是结构化的状态图工作流。Agent(ReAct模式)适合开放性工具调用场景,但NL2SQL的流程是确定的——关键词提取、召回、过滤、生成、校验,步骤固定,只是每步的"决策"由LLM完成。
这篇文章完整拆解小欧问数项目中12节点LangGraph工作流的设计:状态怎么定义、节点怎么拆分、并行怎么编排、条件路由怎么做。
一、整体拓扑:一眼看懂12节点
START│▼
extract_keywords(关键词提取)│├──▶ recall_column(字段召回)────┐│ │├──▶ recall_value(值召回)────────┼──▶ merge_retrieved_info(信息融合)│ │└──▶ recall_metric(指标召回)──┘│▼┌──────────────────────┐│ filter_table(表过滤)──┐│ filter_metric(指标过滤)┼──▶ add_extra_context(注入上下文)└──────────────────────┘│▼generate_sql(SQL生成)│▼validate_sql(SQL校验)/ \error=None error!=None│ │▼ ▼execute_sql correct_sql(SQL修正)│ ││ ▼│ execute_sql▼END
核心特征:
- 前三路(column/value/metric)并行执行
- filter_table和filter_metric 并行执行
- validate_sql后接条件路由:成功直接执行,失败走修正节点
二、状态设计:DataAgentState
LangGraph的核心是状态(State)——每个节点的输入和输出都是State的一部分。设计State的原则是:节点之间只通过State通信,不直接调用。
class DataAgentState(TypedDict):query: str # 原始查询(全程只读)keywords: list[str] # 关键词(extract_keywords写,三路召回读)retrieved_columns: list[ColumnInfo] # 召回字段(三路召回写,merge读)retrieved_values: list[ValueInfo] # 召回值retrieved_metrics: list[MetricInfo] # 召回指标table_infos: list[TableInfoState] # 融合后的表结构(merge写,filter读)metric_infos: list[MetricInfoState] # 融合后的指标(merge写,filter读)date_info: DateInfoState # 日期上下文(add_extra_context写,generate_sql读)db_info: DBInfoState # 数据库环境sql: str # 生成的SQL(generate_sql写,validate读)error: str # 校验错误(validate写,correct读)
设计原则:
- 字段命名即数据流:从字段名就能看出哪个节点写、哪个节点读
- TypedDict而非dataclass:LangGraph要求State是TypedDict,支持部分更新
- 节点只返回增量:每个节点只返回它负责更新的字段,LangGraph自动合并
三、Context设计:依赖注入
State是"流转的数据",Context是"固定的依赖"。所有Repository、Client通过Context注入,节点不直接创建连接。
class DataAgentContext(TypedDict):embedding_client: HuggingFaceEndpointEmbeddingscolumn_qdrant_repository: ColumnQdrantRepositoryvalue_es_repository: ValueESRepositorymetric_qdrant_repository: MetricQdrantRepositorymeta_mysql_repository: MetaMySQLRepositorydw_mysql_repository: DWMySQLRepository
这种设计的好处:
- 测试时可替换为Mock
- 节点不关心连接管理,生命周期由外层控制
- 符合LangGraph的
context_schema设计范式
四、节点设计原则
4.1 单职责
每个节点只做一件事,通过函数签名就能看出它读什么、写什么:
async def extract_keywords(state: DataAgentState, runtime: Runtime[DataAgentContext]):query = state["query"] # 读:query# ...分词逻辑return {"keywords": keywords} # 写:keywordsasync def recall_column(state: DataAgentState, runtime: Runtime[DataAgentContext]):query = state["query"] # 读:querykeywords = state["keywords"] # 读:keywords# ...召回逻辑return {"retrieved_columns": columns} # 写:retrieved_columns
4.2 统一错误处理模式
每个节点内部try-except,错误向上抛出(由LangGraph框架处理),同时通过SSE通知前端:
async def recall_column(state, runtime):writer = runtime.stream_writerwriter({"type": "progress", "step": "召回字段", "status": "running"})try:# ...业务逻辑writer({"type": "progress", "step": "召回字段", "status": "success"})return {"retrieved_columns": retrieved_columns}except Exception as e:writer({"type": "progress", "step": "召回字段", "status": "error"})raise
4.3 SSE实时推送
每个节点通过runtime.stream_writer推送进度,客户端收到的是实时的事件流:
{"type": "progress", "step": "抽取关键字", "status": "running"}
{"type": "progress", "step": "抽取关键字", "status": "success"}
{"type": "progress", "step": "召回字段", "status": "running"}
{"type": "progress", "step": "召回字段", "status": "success"}
...
{"type": "result", "data": [...]}
这让前端可以展示"正在召回字段...正在过滤表格...正在生成SQL..."的实时进度条。
五、并行编排:LangGraph的并行边
LangGraph支持扇出(Fan-out)/ 扇入(Fan-in)模式,实现多路并行:
# 关键词提取完成后,三路并行
graph_builder.add_edge("extract_keywords", "recall_column")
graph_builder.add_edge("extract_keywords", "recall_value")
graph_builder.add_edge("extract_keywords", "recall_metric")# 三路汇入同一个merge节点(扇入)
graph_builder.add_edge("recall_column", "merge_retrieved_info")
graph_builder.add_edge("recall_value", "merge_retrieved_info")
graph_builder.add_edge("recall_metric", "merge_retrieved_info")
LangGraph的行为:
- 当
extract_keywords完成后,三个recall节点同时启动 merge_retrieved_info会等待三个前驱节点全部完成后才执行- 并行节点的State更新是原子合并的,不存在竞态
同样的模式用在filter阶段:
graph_builder.add_edge("merge_retrieved_info", "filter_table")
graph_builder.add_edge("merge_retrieved_info", "filter_metric")
graph_builder.add_edge("filter_table", "add_extra_context")
graph_builder.add_edge("filter_metric", "add_extra_context")
六、条件路由:validate_sql后的分支
这是整个工作流里唯一的"动态决策点":
graph_builder.add_conditional_edges("validate_sql",lambda state: "execute_sql" if state["error"] is None else "correct_sql",{"execute_sql": "execute_sql", "correct_sql": "correct_sql"}
)
路由逻辑:
validate_sql节点用EXPLAIN {sql}验证SQL语法和表字段存在性- 验证通过:
error=None,直接走execute_sql - 验证失败:
error=具体错误信息,走correct_sql让LLM根据错误信息修正
# validate_sql.py
async def validate_sql(state, runtime):try:await dw_mysql_repository.validate_sql(sql) # EXPLAINreturn {"error": None}except Exception as e:return {"error": str(e)}
为什么不直接重试? 因为correct_sql节点有完整的上下文(表结构、指标定义、原始查询、错误信息),能做有针对性的修正,而不是盲目重试。
七、节点拆解:filter_table的LLM过滤
filter_table是一个典型的"LLM做决策、代码做执行"的节点。
7.1 问题背景
三路召回后可能有10+张表、50+个字段。直接全部塞给generate_sql,不仅Token爆炸,LLM还会被无关字段干扰。
7.2 LLM过滤
# 让LLM判断:回答这个问题,只需要哪些表的哪些字段?
prompt = PromptTemplate(template=load_prompt("filter_table_info"), ...)
result = await chain.ainvoke({"query": query, "table_infos": yaml.dump(table_infos)})# result格式:{"fact_order": ["order_amount", "region_id"], "dim_region": ["region_id", "region_name"]}
LLM输出的是一个精简的表-字段映射,代码再据此裁剪State里的table_infos:
for table_info in table_infos[:]:if table_info["name"] not in result:table_infos.remove(table_info) # 整张表不需要,移除else:selected_columns = result[table_info["name"]]for column_info in table_info["columns"][:]:if column_info["name"] not in selected_columns:table_info["columns"].remove(column_info) # 字段不需要,移除
效果:过滤前可能15张表60个字段,过滤后只剩2-3张表10个字段,generate_sql的Prompt Token减少60%+。
八、上下文注入:add_extra_context
这个节点容易被忽略,但对SQL质量影响很大。它注入两类信息:
8.1 时间信息
today = datetime.today()
date_info = DateInfoState(date=today.strftime("%Y-%m-%d"),weekday=today.strftime("%A"),quarter=f"Q{(today.month - 1) // 3 + 1}"
)
用户说"去年"、"上个季度"、"本月",LLM需要知道"现在是哪年哪月"才能正确推断时间范围。
8.2 数据库环境信息
db_info = await dw_mysql_repository.get_db_info()
# {"dialect": "mysql", "version": "8.0.32"}
不同数据库的SQL语法有差异(MySQL的LIMIT vs SQL Server的TOP vs Oracle的ROWNUM),告知LLM具体的数据库类型和版本,能避免生成不兼容的SQL。
九、SSE流式输出:从工作流到前端
整个工作流的输出是SSE(Server-Sent Events)流:
# query_service.py
async def query(self, query: str):async for chunk in graph.astream(input=state, context=context, stream_mode="custom"):yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"# query_router.py
@query_router.post("/api/query")
async def query(query: QuerySchema, query_service=Depends(get_query_service)):return StreamingResponse(query_service.query(query.query),media_type="text/event-stream")
stream_mode="custom"意味着只有节点主动调用stream_writer的数据才会被推送,State的内部变更不会自动推送——这保证了推送内容是可控的、面向前端的。
十、为什么不用ReAct Agent?
这是架构选型时最常见的疑问。对比一下:
| 维度 | ReAct Agent | 状态图工作流(本方案) |
|---|---|---|
| 流程可控性 | 低,LLM自主决策下一步 | 高,节点拓扑固定 |
| 延迟可预测性 | 差,循环次数不确定 | 好,固定深度(最多12步) |
| 并行能力 | 弱,通常是串行thought-action | 强,扇出扇入原生支持 |
| 生产稳定性 | LLM可能陷入死循环 | 不存在循环(correct_sql→execute_sql是终点) |
| 适用场景 | 开放域工具调用 | 流程确定的数据处理 |
核心结论:当流程是确定的、只是节点内部的"决策"需要LLM时,用状态图工作流比Agent更稳定、更可控。
十一、总结
这套12节点工作流的核心设计思想:
- 状态即数据流:State字段命名体现数据流向,节点只返回增量更新
- 并行降低延迟:三路召回并行、双路过滤并行,总延迟取决于最慢的一路
- 条件路由做异常处理:validate→correct不是"重试",而是"有上下文的修正"
- LLM做决策、代码做执行:LLM负责"哪些表需要"、"SQL怎么写"等判断,代码负责"裁剪State"、"执行SQL"等确定性操作
- SSE实时透明:每个节点的进度实时推送,用户看到的不是"加载中..."而是具体到哪一步了
LangGraph最大的价值不是"让LLM调用工具",而是让复杂的多步骤AI流程变得可工程化、可测试、可监控。这才是生产级AI系统需要的。
