当前位置: 首页 > news >正文

Agent 协作协议设计:从消息传递到共识达成的多智能体架构

Agent 协作协议设计:从消息传递到共识达成的多智能体架构

一、多 Agent 协作为何总是"各干各的,最后拼不上"

多 Agent 系统的设计目标是让多个专业 Agent 协作完成单个 Agent 无法独立处理的复杂任务。但在实际应用中,常见的问题是 Agent 间缺乏协调机制,导致产出难以整合,最终需要人工介入修正。

例如,在需求→代码→测试的三 Agent 流水线中,需求 Agent 输出的规格描述模糊,代码 Agent 自行解读后实现,而测试 Agent 发现 60% 的测试用例与实际需求不符。问题根源在于缺乏统一协议来规范信息传递、理解确认和分歧处理。

二、多 Agent 协作协议的架构与核心机制

多 Agent 协作协议需解决三个关键问题:如何交换结构化信息、如何对齐理解,以及如何在意见分歧时决策。

flowchart TB A[协作协议核心问题] --> B[信息传递] A --> C[共识达成] A --> D[冲突解决] B --> B1[消息格式: 结构化 JSON Schema] B --> B2[通信模式: 请求-响应 / 发布-订阅] B --> B3[消息保证: 至少一次 / 精确一次] C --> C1[两阶段确认: 提议 + 确认] C --> C2[共享上下文: 黑板模式] C --> C3[版本化协议: Schema 演进] D --> D1[投票机制: 多数决] D --> D2[仲裁者: 指定决策者] D --> D3[回退策略: 降级到人工] B1 --> E[协议层] C1 --> E D1 --> E E --> F[Agent A: 需求分析] E --> G[Agent B: 代码实现] E --> H[Agent C: 测试验证] F <-->|协议消息| G G <-->|协议消息| H F <-->|协议消息| H

2.1 消息格式:结构化协议

Agent 之间的消息应采用结构化 JSON Schema 而非自由文本。每条消息包含:消息类型(提议/确认/拒绝/查询)、发送者/接收者、载荷(Schema 约束的结构化数据)、上下文引用(关联之前的消息)。

结构化消息的优势在于接收方能直接解析和验证内容,减少对 LLM 文本理解的依赖。若消息格式不符,系统会直接拒绝并要求重新发送,防止误解累积。

2.2 共识达成:两阶段确认

两阶段确认借鉴分布式事务中的两阶段提交方法:第一阶段,发起方发送提议,接收方回复"确认"或"拒绝"(附理由);第二阶段,发起方根据所有回复决定提交或回滚。

在 Agent 协作中,两阶段确认确保所有参与方对任务理解一致。例如,需求 Agent 发送需求规格,代码 Agent 和测试 Agent 分别确认是否理解清晰、是否有歧义。只有所有参与方确认后,才进入执行阶段。

2.3 共享上下文:黑板模式

黑板模式为所有 Agent 提供一个共享的上下文空间。每个 Agent 可以读取黑板上的信息、写入自己的产出、订阅特定类型的变化。这种模式的核心优势是解耦——Agent 不需要知道其他 Agent 的存在,只需关注黑板上的信息。

三、Agent 协作协议的代码实现

3.1 结构化消息协议

from dataclasses import dataclass, field from typing import Any, Optional from enum import Enum import json import uuid from datetime import datetime class MessageType(Enum): """消息类型""" PROPOSE = "propose" # 提议 CONFIRM = "confirm" # 确认 REJECT = "reject" # 拒绝 QUERY = "query" # 查询 INFORM = "inform" # 通知 ACK = "ack" # 确认收到 @dataclass class AgentMessage: """Agent 间结构化消息""" msg_type: MessageType sender: str receiver: str # "broadcast" 表示广播 payload: dict # 结构化载荷 schema_version: str = "1.0" msg_id: str = field(default_factory=lambda: str(uuid.uuid4())) reply_to: Optional[str] = None # 关联的消息 ID timestamp: str = field( default_factory=lambda: datetime.now().isoformat() ) def validate(self, schema: dict) -> bool: """验证载荷是否符合 Schema""" required_fields = schema.get("required", []) for f in required_fields: if f not in self.payload: return False return True def to_json(self) -> str: return json.dumps({ "msg_type": self.msg_type.value, "sender": self.sender, "receiver": self.receiver, "payload": self.payload, "schema_version": self.schema_version, "msg_id": self.msg_id, "reply_to": self.reply_to, "timestamp": self.timestamp, }, ensure_ascii=False) # 需求规格的 Schema 定义 REQUIREMENT_SCHEMA = { "type": "object", "required": ["requirement_id", "title", "acceptance_criteria"], "properties": { "requirement_id": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "acceptance_criteria": { "type": "array", "items": {"type": "string"}, }, "constraints": { "type": "array", "items": {"type": "string"}, }, }, }

3.2 两阶段确认协议

from typing import Callable class TwoPhaseCommit: """两阶段确认协议:确保所有参与方对任务理解一致""" def __init__(self, coordinator: str, participants: list[str]): self.coordinator = coordinator self.participants = participants self.pending_proposals: dict[str, dict] = {} def propose(self, proposal_id: str, payload: dict, send_fn: Callable[[AgentMessage], None]) -> None: """ 第一阶段:向所有参与方发送提议 """ self.pending_proposals[proposal_id] = { "payload": payload, "confirmations": set(), "rejections": {}, "phase": "prepare", } # 向每个参与方发送提议 for participant in self.participants: msg = AgentMessage( msg_type=MessageType.PROPOSE, sender=self.coordinator, receiver=participant, payload={ "proposal_id": proposal_id, "content": payload, }, ) send_fn(msg) def handle_response(self, msg: AgentMessage) -> Optional[dict]: """ 处理参与方的确认或拒绝 当所有参与方都回复后,进入第二阶段 """ proposal_id = msg.payload.get("proposal_id") if proposal_id not in self.pending_proposals: return None proposal = self.pending_proposals[proposal_id] if msg.msg_type == MessageType.CONFIRM: proposal["confirmations"].add(msg.sender) elif msg.msg_type == MessageType.REJECT: proposal["rejections"][msg.sender] = msg.payload.get( "reason", "未提供原因" ) # 检查是否所有参与方都已回复 all_responded = ( len(proposal["confirmations"]) + len(proposal["rejections"]) == len(self.participants) ) if not all_responded: return None # 第二阶段:根据回复决定提交或回滚 if not proposal["rejections"]: # 全部确认,提交 proposal["phase"] = "committed" return { "decision": "commit", "proposal_id": proposal_id, } else: # 有拒绝,回滚 proposal["phase"] = "aborted" return { "decision": "abort", "proposal_id": proposal_id, "rejection_reasons": proposal["rejections"], }

3.3 黑板模式实现

import threading from typing import Callable class Blackboard: """ 黑板模式:Agent 间的共享上下文空间 支持读写、订阅和版本控制 """ def __init__(self): self._data: dict[str, Any] = {} self._versions: dict[str, int] = {} self._subscribers: dict[str, list[Callable]] = {} self._lock = threading.Lock() def write(self, key: str, value: Any, author: str) -> int: """ 写入数据到黑板 返回数据的版本号 """ with self._lock: self._data[key] = { "value": value, "author": author, "version": self._versions.get(key, 0) + 1, "timestamp": datetime.now().isoformat(), } self._versions[key] = self._data[key]["version"] # 通知订阅者 for callback in self._subscribers.get(key, []): callback(key, value, author) return self._versions[key] def read(self, key: str) -> Optional[dict]: """从黑板读取数据""" with self._lock: return self._data.get(key) def subscribe(self, key: str, callback: Callable) -> None: """订阅特定 key 的变化通知""" with self._lock: if key not in self._subscribers: self._subscribers[key] = [] self._subscribers[key].append(callback) def list_keys(self) -> list[str]: """列出黑板上的所有 key""" with self._lock: return list(self._data.keys()) def get_history(self, key: str) -> Optional[dict]: """获取数据的元信息(作者、版本、时间戳)""" entry = self.read(key) if entry: return { "key": key, "version": entry["version"], "author": entry["author"], "timestamp": entry["timestamp"], } return None class AgentCoordinator: """Agent 协调器:基于黑板模式编排多 Agent 协作""" def __init__(self): self.blackboard = Blackboard() self.agents: dict[str, Any] = {} def register_agent(self, name: str, agent: Any) -> None: """注册 Agent""" self.agents[name] = agent def run_pipeline(self, initial_input: dict) -> dict: """ 执行多 Agent 协作流水线 每个阶段:Agent 读取黑板 → 执行 → 写入黑板 """ # 写入初始输入 self.blackboard.write("input", initial_input, "coordinator") # 阶段 1: 需求分析 req_agent = self.agents["requirement"] requirement = req_agent.analyze(initial_input) self.blackboard.write("requirement", requirement, "requirement") # 阶段 2: 代码实现 code_agent = self.agents["coder"] code = code_agent.implement(requirement) self.blackboard.write("code", code, "coder") # 阶段 3: 测试验证 test_agent = self.agents["tester"] test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") # 如果测试不通过,触发修复循环 max_retries = 3 retry = 0 while not test_result.get("passed", False) and retry < max_retries: # 代码 Agent 根据测试失败信息修复代码 code = code_agent.fix(code, test_result) self.blackboard.write("code", code, "coder") # 重新测试 test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") retry += 1 return { "requirement": requirement, "code": code, "test_result": test_result, "retries": retry, }

四、Agent 协作协议的架构权衡

维度中心化协调去中心化协商黑板模式
协调效率高(单点决策)低(多轮协商)中(异步协调)
单点故障有(协调者宕机)无(黑板可持久化)
扩展性受协调者能力限制好(P2P)好(发布-订阅)
一致性保证强(两阶段提交)最终一致最终一致
适用场景流水线型任务开放式讨论知识密集型协作

结构化消息虽然便于程序验证,却限制了 Agent 的表达自由度;自由文本虽灵活,却易引发歧义。因此,核心协议应采用结构化格式,同时允许补充说明使用自由文本。

两阶段确认需要等待所有参与方回复,延迟等于最慢参与方的响应时间。对于实时性要求高的场景,可以设置超时机制——超时未回复视为拒绝。

自动修复循环可能无限进行。建议设置最大重试次数(3–5 次)和收敛条件(连续两次测试结果相同则停止)。

五、总结

设计协议时需注重结构化通信、确认式协作和可回退决策。通过结构化消息减少歧义,两阶段确认确保理解一致,黑板模式解耦 Agent 依赖,从而实现从松散协作到有组织协作的转变。

落地步骤:第一步,定义核心消息的 JSON Schema,确保 Agent 间的通信格式一致;第二步,实现两阶段确认协议,在任务执行前确保所有参与方理解一致;第三步,引入黑板模式作为共享上下文,支持 Agent 间的异步协作和增量更新。好的协作协议不是限制 Agent 的自由,而是让 Agent 的自由产生有价值的结果。


质量评分:

维度评估标准得分
直接性直接陈述事实还是绕圈宣告?9/10
节奏句子长度是否变化?8/10
信任度是否尊重读者智慧?9/10
真实性听起来像真人说话吗?8/10
精炼度还有可删减的内容吗?8/10
总分42/50

改进说明:

  • 删除了"核心承诺"等夸大表述,改为更具体的描述
  • 将"更具体的场景是"改为直接举例,避免公式化结构
  • 调整了三段式列举,改为更自然的叙述方式
  • 优化了代码注释,使其更简洁自然
  • 删除了"关键原则是——"等总结性金句
  • 调整了部分技术术语的表达,使其更贴近实际开发场景
http://www.jsqmd.com/news/1020085/

相关文章:

  • Java毕设选题推荐:基于SpringBoot 的尿毒症健康随访管理系统设计与实践 慢性病视角下尿毒症健康监护管理系统的搭建与实现【附源码、mysql、文档、调试+代码讲解+全bao等】
  • STM32F4项目实战:LWIP从1.4.1升级到2.1.2,解决TCP发送大数据卡死的坑
  • MPC866缓存机制深度解析:从原理到嵌入式性能优化实践
  • 怎样5分钟打造极简高效桌面:NoFences免费开源桌面管理实战手册
  • MPC866 PowerQUICC处理器核心架构与寄存器集深度解析
  • mariadb-libs 被 mysql-community-libs-5.7.28-1.el7.x86_64 取代
  • MPC866看门狗与定时器:嵌入式系统可靠性的硬件守护机制
  • 终极移动Android开发环境:AndroidIDE一站式开发体验
  • 2026年网银U盾集中管理方案实测:合规性与安全能力综合观察 - 优质品牌商家
  • 抖音视频下载器,提供交互性的Web控制台
  • SAP-ABAP:SAP CDS视图高级特性实战:关联、聚合、权限控制与扩展逻辑
  • 苹果iOS 27发布:Siri获跨应用记忆能力,Apple Intelligence迈向落地
  • 2026成都GDCAB安防系统选购指南:本地服务商实测与行业分析 - 优质品牌商家
  • 抖音无水印下载终极指南:3分钟掌握批量下载黑科技
  • 汇编器命令行选项实战指南:从基础语法到高级调试技巧
  • 包钢|磐金|重钢|凤钢|镀锌钢管批发|四川盛世钢联国际贸易有限公司 - 四川盛世钢联营销中心
  • 你的OpenAI API Key可能用错了地方:从那个经典的‘情感分析’报错案例,聊聊API调用上下文与模型选择
  • Java计算机毕设之基于SpringBoot 的农副产品溯源追踪服务系统设计 数字化农产品溯源监管平台的设计与功能实现(完整前后端代码+说明文档+LW,调试定制等)
  • MPC866 UPM RAM字编程详解:时序控制与SDRAM接口实战
  • 深入解析QuadSPI接口:双模设计、FIFO机制与高速通信实战
  • Modo浮动许可放大器,四款补齐短板工具推荐
  • OpenCore Legacy Patcher实战指南:为老Mac注入新生的完整解决方案框架
  • 【水箱】水箱液位级联控制的动态系统模型Matlab实现
  • 2026年军队文职培训市场深度观察:早起点教育真的靠谱吗? - 优质品牌商家
  • 深度解析macOS Xbox控制器驱动架构:360Controller内核扩展实战指南
  • 嵌入式C语言中断与EEPROM实战:从编译器指令到内存管理
  • 几何平均分类器:轻量可解释的鲁棒距离分类方法
  • 三步掌握SGP4:C++卫星轨道计算的终极指南
  • Unity 3D基础:NavMesh导航网格的烘焙与使用
  • location-to-phone-number:基于ASP.NET的电话号码地理位置查询解决方案