多Agent系统设计模式:从单体Agent到企业级协作架构
单个 Agent 能完成的任务是有限的。当问题足够复杂,多个专业化 Agent 协同工作才是答案。本文系统梳理多 Agent 系统的核心设计模式,以及何时选择何种架构。
单个 Agent 能完成的任务是有限的。当问题足够复杂,多个专业化 Agent 协同工作才是答案。本文系统梳理多 Agent 系统的核心设计模式,以及何时选择何种架构。
pythonfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessage, SystemMessageimport asyncioimport jsonllm = ChatOpenAI(model="gpt-4o", temperature=0)# 工作 Agent:代码编写async def coding_agent(task: str) -> dict: messages = [ SystemMessage(content="你是一个专业的Python开发工程师,专注于编写高质量、可测试的代码。"), HumanMessage(content=task) ] response = await llm.ainvoke(messages) return {"agent": "coding", "result": response.content}# 工作 Agent:代码审查async def review_agent(code: str) -> dict: messages = [ SystemMessage(content="你是代码审查专家,专注于发现潜在缺陷、安全问题和性能隐患。"), HumanMessage(content=f"请审查以下代码:\n\n{code}") ] response = await llm.ainvoke(messages) return {"agent": "review", "result": response.content}# 工作 Agent:文档编写async def doc_agent(code: str) -> dict: messages = [ SystemMessage(content="你是技术文档编写专家,专注于生成清晰、准确的API文档和使用说明。"), HumanMessage(content=f"请为以下代码编写文档:\n\n{code}") ] response = await llm.ainvoke(messages) return {"agent": "documentation", "result": response.content}# 主控 Agentasync def orchestrator_agent(user_request: str) -> dict: """主控 Agent:理解需求,协调工作 Agent""" # Step 1:任务分解 decompose_prompt = f""" 用户请求:{user_request} 请将任务分解为以下JSON格式的子任务列表: {{"tasks": [{{"id": 1, "type": "coding", "description": "..."}}, ...]}} 可用任务类型:coding(编写代码), review(审查代码), documentation(编写文档) """ decompose_response = await llm.ainvoke([HumanMessage(content=decompose_prompt)]) task_plan = json.loads(decompose_response.content) results = {} # Step 2:执行子任务(有依赖关系的串行,无依赖的并行) for task in task_plan["tasks"]: if task["type"] == "coding": results["code"] = await coding_agent(task["description"]) elif task["type"] == "review" and "code" in results: results["review"] = await review_agent(results["code"]["result"]) elif task["type"] == "documentation" and "code" in results: results["docs"] = await doc_agent(results["code"]["result"]) # Step 3:整合结果 return { "task_plan": task_plan, "results": results, "summary": "任务完成" }### 模式二:流水线模式(Pipeline)任务从一个 Agent 流向下一个,每个 Agent 加工后传递给下一个。pythonfrom dataclasses import dataclassfrom typing import List, Callable, Any@dataclassclass PipelineStage: name: str agent_fn: Callable input_key: str output_key: strclass AgentPipeline: def __init__(self, stages: List[PipelineStage]): self.stages = stages async def run(self, initial_input: dict) -> dict: state = initial_input.copy() for stage in self.stages: print(f"执行阶段:{stage.name}") input_data = state.get(stage.input_key) if input_data is None: raise ValueError(f"阶段 {stage.name} 缺少输入:{stage.input_key}") output = await stage.agent_fn(input_data) state[stage.output_key] = output print(f"完成阶段:{stage.name}") return state# 使用示例:内容生产流水线async def research_agent_fn(topic: str) -> str: """研究阶段:搜集信息""" response = await llm.ainvoke([ SystemMessage(content="你是研究员,负责搜集和整理指定主题的关键信息"), HumanMessage(content=f"请研究以下主题,整理出5个关键要点:{topic}") ]) return response.contentasync def writing_agent_fn(research_notes: str) -> str: """写作阶段:基于研究撰写文章""" response = await llm.ainvoke([ SystemMessage(content="你是专业技术作家,负责将研究笔记转化为高质量文章"), HumanMessage(content=f"基于以下研究笔记,撰写一篇2000字的技术文章:\n{research_notes}") ]) return response.contentasync def editing_agent_fn(draft: str) -> str: """编辑阶段:润色和优化""" response = await llm.ainvoke([ SystemMessage(content="你是资深编辑,负责改进文章的清晰度、逻辑性和可读性"), HumanMessage(content=f"请对以下文章进行编辑优化:\n{draft}") ]) return response.contentpipeline = AgentPipeline([ PipelineStage("research", research_agent_fn, "topic", "research_notes"), PipelineStage("writing", writing_agent_fn, "research_notes", "draft"), PipelineStage("editing", editing_agent_fn, "draft", "final_article"),])result = await pipeline.run({"topic": "Context Engineering"})final_article = result["final_article"]### 模式三:专家团模式(Expert Panel)多个专业 Agent 并行分析同一问题,最后汇总各方观点形成综合判断。pythonimport asyncioclass ExpertPanel: def __init__(self, experts: dict): """ experts: {"expert_name": "expert_system_prompt", ...} """ self.experts = experts async def consult_expert(self, expert_name: str, system_prompt: str, question: str) -> dict: response = await llm.ainvoke([ SystemMessage(content=system_prompt), HumanMessage(content=question) ]) return { "expert": expert_name, "opinion": response.content } async def get_panel_opinions(self, question: str) -> list: """并行获取所有专家意见""" tasks = [ self.consult_expert(name, prompt, question) for name, prompt in self.experts.items() ] opinions = await asyncio.gather(*tasks) return list(opinions) async def synthesize(self, question: str) -> str: """汇总专家意见""" opinions = await self.get_panel_opinions(question) opinions_text = "\n\n".join([ f"**{op['expert']}**:\n{op['opinion']}" for op in opinions ]) synthesis_response = await llm.ainvoke([ SystemMessage(content="你是综合分析师,负责汇总多位专家的观点,形成平衡、全面的结论"), HumanMessage(content=f"""问题:{question}各专家意见:{opinions_text}请综合以上意见,给出全面、客观的分析结论:""") ]) return synthesis_response.content# 技术选型专家团tech_panel = ExpertPanel({ "性能专家": "你是系统性能专家,从吞吐量、延迟、资源消耗角度评估技术方案", "安全专家": "你是信息安全专家,从安全漏洞、权限控制、数据保护角度评估技术方案", "工程专家": "你是高级工程师,从可维护性、测试难度、学习曲线角度评估技术方案", "业务专家": "你是产品经理,从业务价值、用户体验、实现周期角度评估技术方案",})analysis = await tech_panel.synthesize("是否应该将我们的RAG系统从FAISS迁移到Qdrant?")### 模式四:竞争评估模式(Generator-Critic)一个 Agent 生成方案,另一个 Agent 批判和优化,循环迭代至高质量输出。pythonclass GeneratorCriticLoop: def __init__(self, max_iterations: int = 3): self.max_iterations = max_iterations async def run(self, task: str) -> str: current_solution = None for iteration in range(self.max_iterations): # 生成阶段 if current_solution is None: gen_prompt = f"请解决以下问题:{task}" else: gen_prompt = f"""问题:{task}上一版本方案:{current_solution['solution']}批评意见:{current_solution['critique']}请根据批评意见改进方案:""" gen_response = await llm.ainvoke([ SystemMessage(content="你是创意方案设计专家,专注于生成高质量、创新性的解决方案"), HumanMessage(content=gen_prompt) ]) new_solution = gen_response.content # 评估阶段 critique_response = await llm.ainvoke([ SystemMessage(content="""你是严格的批评者。你的工作是找出方案中所有的问题、 漏洞、低效之处,并给出具体的改进建议。不要客气。"""), HumanMessage(content=f"请批评以下解决方案:\n\n{new_solution}") ]) critique = critique_response.content # 判断是否足够好 quality_check = await llm.ainvoke([ HumanMessage(content=f"""方案:{new_solution}批评:{critique}这个方案是否已经足够好(评分7/10以上)?只回答"是"或"否"。""") ]) current_solution = { "solution": new_solution, "critique": critique, "iteration": iteration + 1 } if "是" in quality_check.content: break return current_solution["solution"]## 多 Agent 系统的通信协议pythonfrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Any, Optionalclass MessageType(Enum): TASK_ASSIGNMENT = "task_assignment" # 分配任务 TASK_RESULT = "task_result" # 返回结果 STATUS_UPDATE = "status_update" # 状态更新 ERROR_REPORT = "error_report" # 错误报告 CLARIFICATION_REQUEST = "clarification" # 请求澄清@dataclassclass AgentMessage: message_id: str sender: str recipient: str message_type: MessageType content: Any metadata: dict = field(default_factory=dict) timestamp: float = field(default_factory=time.time) in_reply_to: Optional[str] = Noneclass AgentMessageBus: """简单的 Agent 消息总线""" def __init__(self): self.queues = {} self.handlers = {} def register_agent(self, agent_id: str): self.queues[agent_id] = asyncio.Queue() async def send(self, message: AgentMessage): recipient_queue = self.queues.get(message.recipient) if recipient_queue: await recipient_queue.put(message) else: print(f"⚠️ 未知接收方:{message.recipient}") async def receive(self, agent_id: str, timeout: float = 30) -> AgentMessage: queue = self.queues.get(agent_id) if not queue: raise ValueError(f"Agent {agent_id} 未注册") return await asyncio.wait_for(queue.get(), timeout=timeout)## 何时选择哪种模式| 场景 | 推荐模式 | 理由 ||------|---------|------|| 任务有明确的子任务分解 | 主从模式 | 清晰的责任边界 || 任务是顺序加工流程 | 流水线模式 | 每步专精,前后依赖清晰 || 需要多视角分析 | 专家团模式 | 降低单点偏见 || 需要高质量输出 | 竞争评估模式 | 迭代优化,逼近最优解 || 任务复杂,以上皆需 | 混合架构 | 大任务用主从,子任务内部用其他模式 |## 工程注意事项避免无限循环:Agent 之间互相调用很容易形成死循环。必须设置最大调用深度和超时机制。成本控制:多 Agent 意味着多倍的 LLM 调用次数。在设计阶段就要估算成本,避免生产环境成本爆炸。结果一致性:并行 Agent 的结果合并时可能产生冲突,需要设计明确的冲突解决策略。可观测性:多 Agent 系统的调试难度远高于单 Agent,从一开始就要建立完整的链路追踪。多 Agent 架构不是"越多越好",而是"够用就好"。从单 Agent 开始,在确实遇到瓶颈时才引入多 Agent 架构,这才是工程上的正确路径。