当前位置: 首页 > news >正文

【从0到1构建一个ClaudeAgent】并

有些操作很慢,Agent 不能干等着。例如长时间编译/构建make,mvn compile,gradle build大数据处理hadoop,spark-submit等的一些工作

Java实现代码

java

public class BackgroundTasksSystem { // --- 配置 --- private static final Path WORKDIR = Paths.get(System.getProperty("user.dir")); private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); // --- 后台任务管理器 --- static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 锁 private final Object lock = new Object(); static class TaskInfo { String taskId; String status; // running, completed, timeout, error String result; String command; long startTime; Thread thread; // 关联的执行线程 } static class TaskNotification { String taskId; String status; String command; String result; } /** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); task.thread = thread; thread.start(); // 立即返回,不阻塞 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); } /** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } } /** * 检查任务状态 * 如果指定 taskId,检查单个任务;否则列出所有任务 */ public String check(String taskId) { if (taskId != null && !taskId.isEmpty()) { TaskInfo task = tasks.get(taskId); if (task == null) { return "Error: Unknown task " + taskId; } return String.format("[%s] %s\n%s", task.status, task.command.substring(0, Math.min(task.command.length(), 60)), task.result != null ? task.result : "(running)"); } else { StringBuilder sb = new StringBuilder(); for (Map.Entry<String, TaskInfo> entry : tasks.entrySet()) { TaskInfo task = entry.getValue(); sb.append(String.format("%s: [%s] %s\n", task.taskId, task.status, task.command.substring(0, Math.min(task.command.length(), 60)))); } return sb.length() > 0 ? sb.toString().trim() : "No background tasks."; } } /** * 清空通知队列并返回所有待处理的通知 */ public List<TaskNotification> drainNotifications() { synchronized (lock) { List<TaskNotification> notifications = new ArrayList<>(); while (!notificationQueue.isEmpty()) { notifications.add(notificationQueue.poll()); } return notifications; } } /** * 获取所有任务 */ public Map<String, TaskInfo> getAllTasks() { return new HashMap<>(tasks); } } // 初始化后台管理器 private static final BackgroundManager BG_MANAGER = new BackgroundManager(); // --- 工具枚举 --- public enum ToolType { BASH("bash", "Run a shell command (blocking)."), READ_FILE("read_file", "Read file contents."), WRITE_FILE("write_file", "Write content to file."), EDIT_FILE("edit_file", "Replace exact text in file."), BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), // 新增 CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name = name; this.description = description; } } // --- 工具处理器映射 --- private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>(); static { // ... 省略基础工具注册 // 后台任务工具 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); }); } // ... 省略相同的工具实现 // --- Agent 主循环(集成后台任务通知)--- public static void agentLoop(List<Map<String, Object>> messages) { while (true) { try { // 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 异步结果注入:将后台任务结果插入到对话中 // 结构化格式:用XML标签包裹,便于LLM解析 } // 显示当前活动任务 Map<String, BackgroundManager.TaskInfo> activeTasks = BG_MANAGER.getAllTasks(); int runningTasks = (int) activeTasks.values().stream() .filter(t -> "running".equals(t.status)) .count(); if (runningTasks > 0) { System.out.printf("[Active background tasks: %d]\n", runningTasks); } // ... 省略相同的 LLM 调用和工具执行逻辑 } catch (Exception e) { System.err.println("Error in agent loop: " + e.getMessage()); e.printStackTrace(); return; } } } }

这段代码引入了后台任务系统,解决了 Agent 在执行长时间任务时的阻塞问题

关键洞察:Agent 可以在命令执行时继续工作,而不是被阻塞。

异步任务处理架构

核心思想:从同步阻塞的任务执行升级为异步非阻塞的并发处理,让Agent能够同时处理多个耗时任务,实现"并行计算"能力,大幅提升效率和响应性。

java

// 后台任务管理器 - 异步执行引擎 static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 并发安全:使用线程安全集合 // 异步通信:通过队列传递任务结果 // 唯一标识:自动生成任务ID }
  • 解耦执行:任务提交和执行分离,立即返回控制权
  • 并发管理:多个后台任务可以同时运行
  • 结果异步收集:通过队列机制收集完成的任务结果
  • 线程安全:使用并发集合确保多线程安全

任务信息结构设计

java

// 任务信息实体 static class TaskInfo { String taskId; // 唯一标识 String status; // 状态:running, completed, timeout, error String result; // 执行结果 String command; // 执行的命令 long startTime; // 开始时间 Thread thread; // 关联的执行线程 // 完整状态跟踪:从启动到完成的全生命周期 // 线程关联:可以控制或监控执行线程 // 时间戳:支持超时和性能分析 } // 任务通知实体 static class TaskNotification { String taskId; String status; String command; String result; // 轻量传输:只包含必要信息 // 结构化:易于解析和处理 // 结果截断:避免过大的通知消息 }
  • 状态驱动:明确的任务状态生命周期
  • 结果持久:任务结果可以多次查询
  • 线程管理:可以跟踪和控制执行线程
  • 事件驱动:通过通知机制传递完成事件

异步任务启动机制

java

/** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); // 守护线程,不会阻止JVM退出 task.thread = thread; thread.start(); // 立即返回,不阻塞调用者 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); // 异步启动:立即返回任务ID,不等待命令完成 // 守护线程:不会阻止程序正常退出 // 线程命名:便于调试和监控 }
  • 立即返回:不阻塞主线程,立即返回控制权
  • 守护线程:后台任务不会阻止JVM退出
  • 资源管理:线程自动清理,避免内存泄漏
  • 友好反馈:返回任务ID和简化的命令描述

任务执行与结果收集

java

/** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } }
  • 超时保护:防止长时间运行的任务阻塞
  • 异常安全:全面捕获执行异常
  • 内存管理:截断大结果,避免内存溢出
  • 事件驱动:完成后立即通知主线程

智能通知注入机制

java

// 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 自动注入:自动将后台结果插入到对话中 // 结构化格式:XML标签明确标识内容类型 // 对话完整:添加assistant确认,保持对话结构 // 时机智能:在LLM调用前插入,确保LLM能看到最新结果 }
  • 自动同步:后台结果自动同步到主对话
  • 结构化格式:便于LLM识别和解析
  • 对话集成:无缝集成到现有对话流
  • 时机优化:在决策前注入,确保信息及时性

工具集成架构

java

// 后台任务工具集 public enum ToolType { BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 异步执行:立即返回,不阻塞 // 状态查询:支持单个和批量查询 // 语义清晰:工具名明确表示异步特性 } // 工具处理器映射 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); // 委托执行:将命令转交给后台管理器 // 立即返回:不等待任务完成 }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); // 灵活查询:支持单任务详查和列表概览 });
  • 接口统一:与同步工具相同的调用方式
  • 异步语义:工具名明确区分同步/异步
  • 灵活查询:支持多种查询方式
  • 无缝集成:与现有工具系统完全兼容

架构演进与价值

从 ContextCompactSystem 到 BackgroundTasksSystem 的升级

维度ContextCompactSystemBackgroundTasksSystem
执行模式同步串行异步并行
吞吐量一次一个任务并发多个任务
响应性阻塞等待立即响应
资源利用单线程多线程并发
任务类型短任务为主长短任务混合
http://www.jsqmd.com/news/1070563/

相关文章:

  • 2026年版牙科修复材料行业投资分析及前景趋势预测报告
  • LangChain框架在高炉炼铁智能化领域的应用~系列文章15:性能优化与部署 — 把AI模型“搬进“炼铁车间
  • 互联网大厂 Java 求职面试中的技术探讨
  • GEO 服务商横向测评:森辰 GEO、剪流 GEO、增长超人怎么选|中小企避坑选型指南
  • Xbox成就解锁终极指南:3分钟掌握免费开源工具的完整教程 [特殊字符]
  • 从大鼠到猫和犬,从基础研究到转化应用——云克隆推出骨骼肌细胞全系列
  • 为什么电流传感器检测信号会出现高频波动?
  • 传统变压器会SST被淘汰吗?
  • 如何在一台电脑上轻松实现多人分屏游戏:Nucleus Coop 实战指南
  • 杰理之固定通话音量【篇】
  • 计算机毕业设计之高校社团招新管理系统
  • 当游戏成就变成可编程的艺术:Xbox成就解锁器的逆向工程之旅
  • AlwaysOnTop窗口置顶工具:5分钟实现多任务效率翻倍的终极指南
  • 别再用旧犀牛!Rhino8.30最新版本 完整版安装教程
  • NoSleep防休眠助手:5分钟掌握Windows屏幕永不停歇的智能解决方案
  • 如何快速掌握微信小程序逆向分析:wxappUnpacker完整指南与5个实用技巧
  • 分类与回归的概念分析
  • 轻智能时代开启,谁在夯实智慧家庭的“地基”?
  • 分布式数据管理:跨设备数据库同步原理(61)
  • 《进程的 “虚拟内存王国”:一文吃透进程地址空间的布局与本质》
  • 深圳华智信创|华为IdeaHub会议协作平板金牌代理商
  • BetterNCM安装器完整指南:告别繁琐手动操作,一键安装网易云插件
  • 如何在5分钟内免费搭建Windows本地实时语音字幕系统
  • 渐进式交付:用白名单机制把 Agent 的风险降到最低
  • 【基础电子元件】电感
  • OBS多路RTMP推流插件:一站式高效直播多平台同步方案
  • 数字刊物系统用户操作手册
  • 3个简单步骤:让你的Switch手柄在电脑上完美运行
  • 工业传动升级,盖茨皮带技术亮点盘点|六大自研核心技术,赋能智造产线柔性传动提质
  • ripgrep:比 grep 快几十倍的命令行搜索工具