当前位置: 首页 > news >正文

AI 编程4:LangGraph 实战:动态并行 Worker 编排器模式,让 AI 多任务并行生成报告-test7

作者:WangQiaomei版本:1.0(2026/3/17)

本文实战:基于 LangGraph + 阿里云百炼(通义千问)实现动态并行 Worker编排架构,解决串行执行效率低的问题,适合 AI 报告生成、多任务并行处理场景。

一、前言

在之前的 test6 中,我们实现了 ** 编排器 - 工作者(Orchestrator-Worker)** 模式,但存在一个问题:Worker 是串行处理所有任务的,效率较低。

本文带来升级版 test7:使用 LangGraph Send () API 动态创建多个 Worker,实现真正的并行执行,多个 AI 同时干活,最后统一汇总输出,速度大幅提升。


二、整体架构与流程

2.1 核心工作流

plaintext

用户输入报告主题 ↓ 编排器(Orchestrator) AI 自动拆分 N 个章节 ↓ Send() 动态派发任务 ╱ │ ╲ Worker1 Worker2 Worker3 【并行执行】 ╲ │ ╱ ↓ 合成器(Synthesizer) 合并所有章节 → 最终报告

2.2 核心亮点

  • 动态创建 Worker:根据章节数量自动生成对应 Worker
  • 真正并行执行:多 AI 同时写作,效率翻倍
  • 状态自动合并:无需手动拼接,LangGraph 自动汇总结果
  • 结构化输出:AI 返回固定格式章节,流程更稳定

三、环境依赖安装

bash

运行

python.exe -m pip install --upgrade pip pip install --upgrade langchain langchain-core langchain-openai langgraph

模型使用:阿里云百炼(通义千问),兼容 OpenAI 格式调用。


四、完整代码实现

4.1 全局配置 & LLM 初始化

python

运行

# -*- coding: utf-8 -*- import os import warnings warnings.filterwarnings("ignore", category=UserWarning) from openai import OpenAI from pydantic import BaseModel, Field from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from IPython.display import Image, display from langchain_openai import ChatOpenAI from langchain.messages import HumanMessage, SystemMessage from typing import Annotated, List import operator from langgraph.types import Send # 阿里云百炼 LLM 配置 llm = ChatOpenAI( model="qwen-plus", api_key="sk-xxxxxxxxxxxxxxxxxxxxxxxx", # 替换成你的key base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) def safe_print(text): """GBK 兼容打印""" print(text.encode('gbk', errors='ignore').decode('gbk'), end="", flush=True)

4.2 步骤 1:定义结构化数据模型

python

运行

class Section(BaseModel): """单个章节结构""" name: str = Field(description="章节名称") description: str = Field(description="章节描述") class Sections(BaseModel): """章节列表""" sections: List[Section] = Field(description="报告所有章节") # AI 输出结构化数据 planner = llm.with_structured_output(Sections)

4.3 步骤 2:定义状态(State)

python

运行

# 主状态:全局共享 class State(TypedDict): topic: str sections: List[Section] completed_sections: Annotated[list, operator.add] # 自动合并列表 final_report: str # Worker 独立状态:每个 worker 只处理一个章节 class WorkerState(TypedDict): section: Section completed_sections: Annotated[list, operator.add]

Annotated[list, operator.add]是 LangGraph 多节点并行输出自动合并的核心。


4.4 步骤 3:定义三大核心节点

3.1 编排器:AI 自动拆分章节

python

运行

def orchestrator(state: State): report_sections = planner.invoke([ SystemMessage(content="Generate a plan for the report."), HumanMessage(content=f"Here is the report topic: {state['topic']}"), ]) return {"sections": report_sections.sections}
3.2 Worker:并行生成章节内容

python

运行

def llm_call(state: WorkerState): section = llm.invoke([ SystemMessage(content="Write a report section. Use markdown. No extra prefix."), HumanMessage(content=f"Name: {state['section'].name}, Desc: {state['section'].description}") ]) return {"completed_sections": [section.content]}
3.3 合成器:汇总最终报告

python

运行

def synthesizer(state: State): parts = state["completed_sections"] final = "\n\n---\n\n".join(parts) return {"final_report": final}

4.5 步骤 4:动态派发任务(核心)

python

运行

def assign_workers(state: State): # 为每个章节创建一个独立 Worker,并行执行 return [Send("llm_call", {"section": s}) for s in state["sections"]]

Send () 是核心:相当于老板给多个员工同时派活,而不是一个人挨个干。


4.6 步骤 5:构建 LangGraph 工作流

python

运行

builder = StateGraph(State) builder.add_node("orchestrator", orchestrator) builder.add_node("llm_call", llm_call) builder.add_node("synthesizer", synthesizer) builder.add_edge(START, "orchestrator") builder.add_conditional_edges("orchestrator", assign_workers, ["llm_call"]) builder.add_edge("llm_call", "synthesizer") builder.add_edge("synthesizer", END) # 编译 workflow = builder.compile() # 展示流程图(可选) try: display(Image(workflow.get_graph().draw_mermaid_png())) except: pass

4.7 步骤 6:运行测试

python

运行

if __name__ == "__main__": result = workflow.invoke({ "topic": "Create a report on LLM scaling laws" }) safe_print(result["final_report"])

五、test6(串行) vs test7(并行)对比

表格

对比项test6 串行test7 动态并行
执行方式单个 Worker 依次处理多 Worker 同时处理
核心 API普通 for 循环Send () 动态创建
效率高(N 倍提升)
适用场景简单小任务报告生成、多模块处理
代码复杂度中等(工程化更强)

六、核心知识点总结

  1. Send():LangGraph 动态创建并行节点的核心 API
  2. Annotated[list, operator.add]:多节点输出自动合并列表
  3. Orchestrator:负责任务拆分与规划
  4. Worker:负责具体执行(可并行 N 个)
  5. Synthesizer:负责结果汇总

七、适用场景

  • AI 多章节报告自动生成
  • 多维度数据分析并行处理
  • 多语言翻译 / 内容生成
  • 企业级自动化 AI 工作流

八、注意事项

  1. 阿里云百炼 API Key 建议使用环境变量,不要硬编码
  2. Windows 控制台输出需处理 GBK 编码
  3. 章节数量不宜过多,避免 Token 超限
  4. 可扩展:增加重试、异常捕获、持久化存储

九、运行结果


```result
## Introduction

Scaling laws in large language models (LLMs) describe empirically observed

---

## Foundations and Key Concepts

**Compute (FLOPs):** Total floating-point operations required to train a model

**Model Size (Parameters):** The total number of trainable parameters

**Dataset Size (Tokens):** The total number of tokens processed during training—i.e.

**Loss (e.g., Cross-Entropy Loss):** A scalar objective function quantifying the discrepancy


**Interrelationships in Empirical Scaling Laws:**
These four quantities are linked through power-law relationships observed across diverse model families and datasets.

---

## Empirical Scaling Laws: Formulations and Findings

Empirical scaling laws describe the predictable, power-law relationship between a large language model’s test loss $L$ and key resource variables

- **Kaplan et al. (2020)** established the foundational

- **Chinchilla (Hoffmann et al., 2022)** introduced the *joint*) \propto C^{-\gamma}$ with $\gamma = \alpha\beta/(\alpha+\beta) \approx 0.15$.

## Conclusion

Scaling laws offer a powerful empirical framework for understanding how language model performance evolves with increases in compute, dataset size, and parameter count.
```

十、总结

本文实现了LangGraph 动态并行 Worker 编排模式,是企业级 AI 工作流的经典架构:规划 → 并行执行 → 汇总

相比串行模式,并行执行效率提升非常明显,非常适合报告生成、多任务处理等场景。

http://www.jsqmd.com/news/495206/

相关文章:

  • 〘 9-1 〙软考高项 | 第16章:项目采购管理(上)
  • 基于分解的多目标优化算法(MOEA/D) —— Matlab实现 测试函数包括:ZDT、DTL...
  • 电动压铆螺柱:高效安装,稳固可靠新选择
  • 大模型联网难题破解!数眼智能(DataEyes)全解析,5分钟解锁实时数据能力
  • 一键生成论文的软件推荐!2026年精选6款AI论文生成神器指南,为你打造高质量论文 - 掌桥科研-AI论文写作
  • 【嵌入式】外部中断的学习小坑记录
  • Git误删急救:30秒拯救你的代码
  • 深度解析贪心算法
  • 【程序员转型】开发者转型成为 AI 工程师指南,大模型入门到精通,收藏这篇就足够了!
  • 分析鲨鱼速装性价比好不好,和同行比价格贵不贵 - 工业设备
  • 8.4通过延迟补偿来提高实时性
  • 选 PyQt6 还是 PySide6?这可能是 Python GUI 开发中最“纠结”的问题
  • 3分钟搞定!OpenClaw 龙虾 + Kimi 联网搜索,小白也能上手
  • PHP搭建开发环境(Windows系统)
  • 2026年无锡碳纤维废气焚烧炉选购指南,源头厂家宜业环保分析 - mypinpai
  • 工厂生产 PLC ip 的都是一样的怎么才能避免冲突进行组网呢?
  • 全栈vue/react+node.js,云服务器windows部署全流程
  • 2026年实力强的聚氨酯瓦壳源头厂家排名,哪家更靠谱 - 工业推荐榜
  • 赛博朋克2077弹窗vcruntime140_1.dll丢失怎么办?安全修复步骤详解
  • 校园外卖软件
  • 代差级突破|2026 女性经期新选择:专属特殊膳食饮品深度评测
  • 收藏!2026春招AI风口爆发:岗位暴涨12倍、月薪超6万,程序员/小白必看学习指南
  • Java 网络爬虫笔记
  • 2026盘点重庆家具采购优质公司,源点宜联购优势突出 - 工业品网
  • Axure RP 9的初使用
  • 互联网大厂Java面试实战:以智慧物流场景为例深入探讨Spring Boot、微服务与Redis缓存
  • 2026年环氧防火涂料价格多少,怎么选靠谱品牌 - 工业品牌热点
  • BG3启动报错dll缺失终极修复指南:从平台验证到运行库安装
  • 用Web Components原生技术构建可复用的UI组件
  • 拿下36K的AI产品经理offer,他是如何实现职业转型的?