【从0到1构建一个ClaudeAgent】协作-团队协议
这个在智能体团队系统的基础上,增加了两种协议:
- 关机协议(Shutdown Protocol):
- 领导智能体请求团队成员关机
- 团队成员可以批准或拒绝关机请求
- 基于 request_id 的请求-响应模式
- 计划审批协议(Plan Approval Protocol):
- 团队成员提交计划给领导审批
- 领导批准或拒绝计划
- 同样基于 request_id 的请求-响应模式
关键洞察:相同的 request_id 关联模式,应用于两个不同的协作领域。
Java代码
java
public class TeamProtocolsSystem { // --- 配置 --- private static final Path WORKDIR = Paths.get(System.getProperty("user.dir")); private static final Path TEAM_DIR = WORKDIR.resolve(".team"); private static final Path INBOX_DIR = TEAM_DIR.resolve("inbox"); private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); // 有效消息类型 private static final Set<String> VALID_MSG_TYPES = Set.of( "message", "broadcast", "shutdown_request", "shutdown_response", "plan_approval_response" ); // --- 请求追踪器 --- private static final Map<String, ShutdownRequest> shutdownRequests = new ConcurrentHashMap<>(); private static final Map<String, PlanRequest> planRequests = new ConcurrentHashMap<>(); private static final Object trackerLock = new Object(); static class ShutdownRequest { String requestId; String target; // 目标智能体 String status; // pending, approved, rejected long timestamp; } static class PlanRequest { String requestId; String from; // 提交者 String plan; // 计划内容 String status; // pending, approved, rejected long timestamp; } // --- 消息系统(MessageBus)--- static class MessageBus { private final Path inboxDir; private final AtomicInteger requestIdCounter = new AtomicInteger(1); public MessageBus(Path inboxDir) { this.inboxDir = inboxDir; try { Files.createDirectories(inboxDir); } catch (IOException e) { throw new RuntimeException("Failed to create inbox directory", e); } } /** * 生成唯一的请求ID */ public String generateRequestId() { return "req_" + requestIdCounter.getAndIncrement() + "_" + System.currentTimeMillis() % 10000; } /** * 发送消息到指定智能体 */ public String send(String sender, String to, String content, String msgType, Map<String, Object> extra) { if (!VALID_MSG_TYPES.contains(msgType)) { return String.format("Error: Invalid type '%s'. Valid: %s", msgType, String.join(", ", VALID_MSG_TYPES)); } Map<String, Object> message = new LinkedHashMap<>(); message.put("type", msgType); message.put("from", sender); message.put("content", content); message.put("timestamp", System.currentTimeMillis() / 1000.0); if (extra != null) { message.putAll(extra); } // ... 省略相同的发送逻辑 } } // 初始化消息总线 private static final MessageBus BUS = new MessageBus(INBOX_DIR); // --- 智能体管理器(TeammateManager)--- static class TeammateManager { // ... 省略相同的配置加载、保存逻辑 private void teammateLoop(String name, String role, String prompt, AtomicBoolean stopFlag) { String systemPrompt = String.format( "You are '%s', role: %s, at %s. " + "Submit plans via plan_approval before major work. " + "Respond to shutdown_request with shutdown_response.", name, role, WORKDIR ); // 增强系统提示:明确协议要求 // 计划审批:重要工作前需要提交计划 // 关机响应:需要响应关机请求 List<Map<String, Object>> messages = new ArrayList<>(); messages.add(Map.of("role", "user", "content", prompt)); boolean shouldExit = false; // 最大迭代次数限制 for (int i = 0; i < 50 && !stopFlag.get(); i++) { try { // 检查邮箱 List<Map<String, Object>> inbox = BUS.readInbox(name); for (Map<String, Object> msg : inbox) { messages.add(Map.of("role", "user", "content", gson.toJson(msg))); } if (shouldExit) { break; } // ... 省略相同的LLM调用和执行逻辑 for (Map<String, Object> block : content) { if ("tool_use".equals(block.get("type"))) { String toolName = (String) block.get("name"); String toolId = (String) block.get("id"); @SuppressWarnings("unchecked") Map<String, Object> args = (Map<String, Object>) block.get("input"); String output = executeTeammateTool(name, toolName, args); // 如果批准了关机,设置退出标志 if ("shutdown_response".equals(toolName) && Boolean.TRUE.equals(args.get("approve"))) { shouldExit = true; } } } } } // 更新状态 Map<String, Object> member = findMember(name); if (member != null) { member.put("status", shouldExit ? "shutdown" : "idle"); saveConfig(); } // 状态更新:根据退出原因设置不同状态 } private String executeTeammateTool(String sender, String toolName, Map<String, Object> args) { try { switch (toolName) { // ... 省略基础工具 case "shutdown_response": String reqId = (String) args.get("request_id"); Boolean approve = (Boolean) args.get("approve"); if (approve == null) approve = false; synchronized (trackerLock) { ShutdownRequest req = shutdownRequests.get(reqId); if (req != null) { req.status = approve ? "approved" : "rejected"; } } // 状态更新:在追踪器中更新请求状态 String reason = (String) args.get("reason"); if (reason == null) reason = ""; BUS.send( sender, "lead", reason, "shutdown_response", Map.of("request_id", reqId, "approve", approve) ); // 回复领导:通知领导审批结果 return String.format("Shutdown %s", approve ? "approved" : "rejected"); case "plan_approval": String planText = (String) args.get("plan"); String planReqId = BUS.generateRequestId(); synchronized (trackerLock) { planRequests.put(planReqId, new PlanRequest(planReqId, sender, planText)); } // 计划提交:创建新的计划请求 BUS.send( sender, "lead", planText, "plan_approval_response", Map.of("request_id", planReqId, "plan", planText) ); // 通知领导:发送计划审批请求 return String.format("Plan submitted (request_id=%s). Waiting for lead approval.", planReqId); } } } } // --- 领导特定的协议处理器 --- /** * 处理关机请求 */ private static String handleShutdownRequest(String teammate) { String reqId = BUS.generateRequestId(); synchronized (trackerLock) { shutdownRequests.put(reqId, new ShutdownRequest(reqId, teammate)); } // 创建请求:在追踪器中记录关机请求 BUS.send( "lead", teammate, "Please shut down gracefully.", "shutdown_request", Map.of("request_id", reqId) ); // 发送请求:向目标智能体发送关机请求 return String.format("Shutdown request %s sent to '%s' (status: pending)", reqId, teammate); } /** * 处理计划审批 */ private static String handlePlanReview(String requestId, boolean approve, String feedback) { PlanRequest req; synchronized (trackerLock) { req = planRequests.get(requestId); } if (req == null) { return String.format("Error: Unknown plan request_id '%s'", requestId); } synchronized (trackerLock) { req.status = approve ? "approved" : "rejected"; } // 状态更新:更新计划审批状态 BUS.send( "lead", req.from, feedback, "plan_approval_response", Map.of("request_id", requestId, "approve", approve, "feedback", feedback) ); // 回复提交者:发送审批结果和反馈 return String.format("Plan %s for '%s'", approve ? "approved" : "rejected", req.from); } /** * 检查关机请求状态 */ private static String checkShutdownStatus(String requestId) { synchronized (trackerLock) { ShutdownRequest req = shutdownRequests.get(requestId); if (req == null) { return gson.toJson(Map.of("error", "not found")); } return gson.toJson(Map.of( "request_id", req.requestId, "target", req.target, "status", req.status, "timestamp", req.timestamp )); } // 状态查询:返回请求的详细信息 } // --- 工具枚举 --- public enum ToolType { BASH("bash", "Run a shell command."), READ_FILE("read_file", "Read file contents."), WRITE_FILE("write_file", "Write content to file."), EDIT_FILE("edit_file", "Replace exact text in file."), SPAWN_TEAMMATE("spawn_teammate", "Spawn a persistent teammate that runs in its own thread."), LIST_TEAMMATES("list_teammates", "List all teammates with name, role, status."), SEND_MESSAGE("send_message", "Send a message to a teammate's inbox."), READ_INBOX("read_inbox", "Read and drain the lead's inbox."), BROADCAST("broadcast", "Send a message to all teammates."), SHUTDOWN_REQUEST("shutdown_request", "Request a teammate to shut down gracefully. Returns a request_id for tracking."), // 新增 SHUTDOWN_RESPONSE("shutdown_response", "Check the status of a shutdown request by request_id."), // 新增 PLAN_APPROVAL("plan_approval", "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback."); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name = name; this.description = description; } } // --- 工具处理器映射 --- private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>(); static { // ... 省略基础工具和团队管理工具注册 // 协议工具 TOOL_HANDLERS.put(ToolType.SHUTDOWN_REQUEST.name, args -> { String teammate = (String) args.get("teammate"); return handleShutdownRequest(teammate); }); TOOL_HANDLERS.put(ToolType.SHUTDOWN_RESPONSE.name, args -> { String requestId = (String) args.get("request_id"); return checkShutdownStatus(requestId); }); TOOL_HANDLERS.put(ToolType.PLAN_APPROVAL.name, args -> { String requestId = (String) args.get("request_id"); Boolean approve = (Boolean) args.get("approve"); String feedback = (String) args.get("feedback"); if (feedback == null) feedback = ""; return handlePlanReview(requestId, approve, feedback); }); } // --- Agent 主循环(领导智能体)--- public static void agentLoop(List<Map<String, Object>> messages) { while (true) { try { // ... 省略邮箱检查逻辑 // 显示待处理的请求 synchronized (trackerLock) { long pendingShutdowns = shutdownRequests.values().stream() .filter(r -> "pending".equals(r.status)) .count(); long pendingPlans = planRequests.values().stream() .filter(r -> "pending".equals(r.status)) .count(); if (pendingShutdowns > 0 || pendingPlans > 0) { System.out.printf("[Pending requests: %d shutdowns, %d plans]%n", pendingShutdowns, pendingPlans); } } // 状态监控:实时显示待处理的协议请求 // ... 省略相同的 LLM 调用和执行逻辑 } } } }协议化团队协作系统
核心思想:从松散的智能体协作升级为结构化、协议化、有状态的团队协作系统,引入正式的请求-响应协议、状态追踪、审批流程,实现企业级团队管理的标准化和可控性。
java
// 请求追踪器 - 协议状态管理 private static final Map<String, ShutdownRequest> shutdownRequests = new ConcurrentHashMap<>(); private static final Map<String, PlanRequest> planRequests = new ConcurrentHashMap<>(); private static final Object trackerLock = new Object(); // 全局状态:追踪所有协议请求的状态 // 并发安全:支持多线程安全访问 // 生命周期:从创建、处理到完成的全周期追踪
- 状态驱动:所有协议请求都有明确的状态生命周期
- 集中管理:全局追踪所有请求状态
- 并发安全:支持多智能体并发访问
- 可追溯性:所有请求都有时间戳和唯一标识
正式的协议类型定义
java
// 关机请求协议 static class ShutdownRequest { String requestId; // 唯一请求ID String target; // 目标智能体 String status; // 状态:pending, approved, rejected long timestamp; // 创建时间戳 // 结构化请求:明确的目标和状态 // 唯一标识:支持多个并发请求 // 时间追踪:支持超时和时序分析 } // 计划审批协议 static class PlanRequest { String requestId; // 唯一请求ID String from; // 提交者 String plan; // 计划内容 String status; // 状态:pending, approved, rejected long timestamp; // 创建时间戳 // 计划提交流程:从提交到审批的完整生命周期 // 内容关联:保存计划的具体内容 // 来源追踪:明确计划提交者 }- 协议实体:每个协议类型都有明确的数据结构
- 状态机:明确的pending → approved/rejected状态转换
- 审计跟踪:时间戳支持审计和时序分析
- 关系管理:明确的请求-响应关系
协议工具生态系统
java
// 协议工具集 public enum ToolType { SHUTDOWN_REQUEST("shutdown_request", "Request a teammate to shut down gracefully. Returns a request_id for tracking."), SHUTDOWN_RESPONSE("shutdown_response", "Check the status of a shutdown request by request_id."), PLAN_APPROVAL("plan_approval", "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback."); // 正式协议:明确的工具命名和描述 // 请求-响应模式:领导发起请求,成员响应 // 状态查询:支持请求状态检查 // 反馈机制:支持审批反馈 }- 标准化接口:所有协议都有标准化的工具接口
- 完整工作流:创建、查询、处理的完整工作流
- 可追踪性:通过request_id追踪所有请求
- 反馈机制:支持审批反馈,促进改进
