【从0到1构建一个ClaudeAgent】并
有些操作很慢,Agent 不能干等着。例如长时间编译/构建:make,mvn compile,gradle build或大数据处理:hadoop,spark-submit等的一些工作
Java实现代码
java
public class BackgroundTasksSystem { // --- 配置 --- private static final Path WORKDIR = Paths.get(System.getProperty("user.dir")); private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); // --- 后台任务管理器 --- static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 锁 private final Object lock = new Object(); static class TaskInfo { String taskId; String status; // running, completed, timeout, error String result; String command; long startTime; Thread thread; // 关联的执行线程 } static class TaskNotification { String taskId; String status; String command; String result; } /** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); task.thread = thread; thread.start(); // 立即返回,不阻塞 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); } /** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } } /** * 检查任务状态 * 如果指定 taskId,检查单个任务;否则列出所有任务 */ public String check(String taskId) { if (taskId != null && !taskId.isEmpty()) { TaskInfo task = tasks.get(taskId); if (task == null) { return "Error: Unknown task " + taskId; } return String.format("[%s] %s\n%s", task.status, task.command.substring(0, Math.min(task.command.length(), 60)), task.result != null ? task.result : "(running)"); } else { StringBuilder sb = new StringBuilder(); for (Map.Entry<String, TaskInfo> entry : tasks.entrySet()) { TaskInfo task = entry.getValue(); sb.append(String.format("%s: [%s] %s\n", task.taskId, task.status, task.command.substring(0, Math.min(task.command.length(), 60)))); } return sb.length() > 0 ? sb.toString().trim() : "No background tasks."; } } /** * 清空通知队列并返回所有待处理的通知 */ public List<TaskNotification> drainNotifications() { synchronized (lock) { List<TaskNotification> notifications = new ArrayList<>(); while (!notificationQueue.isEmpty()) { notifications.add(notificationQueue.poll()); } return notifications; } } /** * 获取所有任务 */ public Map<String, TaskInfo> getAllTasks() { return new HashMap<>(tasks); } } // 初始化后台管理器 private static final BackgroundManager BG_MANAGER = new BackgroundManager(); // --- 工具枚举 --- public enum ToolType { BASH("bash", "Run a shell command (blocking)."), READ_FILE("read_file", "Read file contents."), WRITE_FILE("write_file", "Write content to file."), EDIT_FILE("edit_file", "Replace exact text in file."), BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), // 新增 CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name = name; this.description = description; } } // --- 工具处理器映射 --- private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>(); static { // ... 省略基础工具注册 // 后台任务工具 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); }); } // ... 省略相同的工具实现 // --- Agent 主循环(集成后台任务通知)--- public static void agentLoop(List<Map<String, Object>> messages) { while (true) { try { // 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 异步结果注入:将后台任务结果插入到对话中 // 结构化格式:用XML标签包裹,便于LLM解析 } // 显示当前活动任务 Map<String, BackgroundManager.TaskInfo> activeTasks = BG_MANAGER.getAllTasks(); int runningTasks = (int) activeTasks.values().stream() .filter(t -> "running".equals(t.status)) .count(); if (runningTasks > 0) { System.out.printf("[Active background tasks: %d]\n", runningTasks); } // ... 省略相同的 LLM 调用和工具执行逻辑 } catch (Exception e) { System.err.println("Error in agent loop: " + e.getMessage()); e.printStackTrace(); return; } } } }这段代码引入了后台任务系统,解决了 Agent 在执行长时间任务时的阻塞问题
关键洞察:Agent 可以在命令执行时继续工作,而不是被阻塞。
异步任务处理架构
核心思想:从同步阻塞的任务执行升级为异步非阻塞的并发处理,让Agent能够同时处理多个耗时任务,实现"并行计算"能力,大幅提升效率和响应性。
java
// 后台任务管理器 - 异步执行引擎 static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 并发安全:使用线程安全集合 // 异步通信:通过队列传递任务结果 // 唯一标识:自动生成任务ID }- 解耦执行:任务提交和执行分离,立即返回控制权
- 并发管理:多个后台任务可以同时运行
- 结果异步收集:通过队列机制收集完成的任务结果
- 线程安全:使用并发集合确保多线程安全
任务信息结构设计
java
// 任务信息实体 static class TaskInfo { String taskId; // 唯一标识 String status; // 状态:running, completed, timeout, error String result; // 执行结果 String command; // 执行的命令 long startTime; // 开始时间 Thread thread; // 关联的执行线程 // 完整状态跟踪:从启动到完成的全生命周期 // 线程关联:可以控制或监控执行线程 // 时间戳:支持超时和性能分析 } // 任务通知实体 static class TaskNotification { String taskId; String status; String command; String result; // 轻量传输:只包含必要信息 // 结构化:易于解析和处理 // 结果截断:避免过大的通知消息 }- 状态驱动:明确的任务状态生命周期
- 结果持久:任务结果可以多次查询
- 线程管理:可以跟踪和控制执行线程
- 事件驱动:通过通知机制传递完成事件
异步任务启动机制
java
/** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); // 守护线程,不会阻止JVM退出 task.thread = thread; thread.start(); // 立即返回,不阻塞调用者 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); // 异步启动:立即返回任务ID,不等待命令完成 // 守护线程:不会阻止程序正常退出 // 线程命名:便于调试和监控 }- 立即返回:不阻塞主线程,立即返回控制权
- 守护线程:后台任务不会阻止JVM退出
- 资源管理:线程自动清理,避免内存泄漏
- 友好反馈:返回任务ID和简化的命令描述
任务执行与结果收集
java
/** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } }- 超时保护:防止长时间运行的任务阻塞
- 异常安全:全面捕获执行异常
- 内存管理:截断大结果,避免内存溢出
- 事件驱动:完成后立即通知主线程
智能通知注入机制
java
// 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 自动注入:自动将后台结果插入到对话中 // 结构化格式:XML标签明确标识内容类型 // 对话完整:添加assistant确认,保持对话结构 // 时机智能:在LLM调用前插入,确保LLM能看到最新结果 }- 自动同步:后台结果自动同步到主对话
- 结构化格式:便于LLM识别和解析
- 对话集成:无缝集成到现有对话流
- 时机优化:在决策前注入,确保信息及时性
工具集成架构
java
// 后台任务工具集 public enum ToolType { BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 异步执行:立即返回,不阻塞 // 状态查询:支持单个和批量查询 // 语义清晰:工具名明确表示异步特性 } // 工具处理器映射 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); // 委托执行:将命令转交给后台管理器 // 立即返回:不等待任务完成 }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); // 灵活查询:支持单任务详查和列表概览 });- 接口统一:与同步工具相同的调用方式
- 异步语义:工具名明确区分同步/异步
- 灵活查询:支持多种查询方式
- 无缝集成:与现有工具系统完全兼容
架构演进与价值
从 ContextCompactSystem 到 BackgroundTasksSystem 的升级:
| 维度 | ContextCompactSystem | BackgroundTasksSystem |
|---|---|---|
| 执行模式 | 同步串行 | 异步并行 |
| 吞吐量 | 一次一个任务 | 并发多个任务 |
| 响应性 | 阻塞等待 | 立即响应 |
| 资源利用 | 单线程 | 多线程并发 |
| 任务类型 | 短任务为主 | 长短任务混合 |
