当前位置: 首页 > news >正文

OpenHarness源码研究-4-AgentLoop对话引擎与工具系统

OpenHarness源码研究-4-AgentLoop对话引擎与工具系统

从debug说起

第2篇我们写了oh -p "你是谁"的 debug 配置。断点打在run_print_mode()里,跟着调用栈一路往下走,能看到这样一条链路:

run_print_mode() ui/app.py └─ build_runtime(...) ui/runtime.py └─ _resolve_api_client_from_settings() └─ QueryEngine(...) └─ start_runtime(bundle) └─ handle_line(bundle, "你是谁", ...) ui/runtime.py └─ engine.submit_message("你是谁") engine/query_engine.py └─ run_query(context, messages) engine/query.py

第3篇分析了 API Client 层(_resolve_api_client_from_settings那段),这篇讲剩下的:消息模型、Agent Loop、工具系统、Runtime 组装、System Prompt。

消息模型

不同 LLM 的 API 格式不一样,但引擎内部需要一套统一的消息表示。这就是engine/messages.py的职责。

# engine/messages.py 第14-61行 class TextBlock(BaseModel): type: Literal["text"] = "text" text: str class ImageBlock(BaseModel): type: Literal["image"] = "image" media_type: str data: str # base64编码 class ToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str # 工具调用的唯一ID name: str # 工具名 input: dict[str, Any] # 参数 class ToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str # 关联到哪个tool_use content: str # 执行结果文本 is_error: bool = False # 是否执行出错 ContentBlock = TextBlock | ImageBlock | ToolUseBlock | ToolResultBlock class ConversationMessage(BaseModel): role: Literal["user", "assistant"] content: list[ContentBlock]

四个 ContentBlock 覆盖了对话中的所有信息类型。role 只有 user 和 assistant 两种——system prompt 不在这里,它作为独立参数传给 API。

这套模型是 Anthropic Messages API 的原生格式。OpenAICompatibleClient 把它翻译成 OpenAI 格式(第3篇分析过),AnthropicApiClient 直接序列化:

# engine/messages.py 第92-97行 def to_api_param(self) -> dict[str, Any]: return { "role": self.role, "content": [serialize_content_block(block) for block in self.content], }

消息模型决定了整个引擎的设计。如果把 system prompt 也当作一条消息、把 tool 也当作一种 role,那代码复杂度会成倍增加——OpenAI 就是这么做的,而 Anthropic 的设计更简洁。OpenHarness 选择了 Anthropic 格式作为内部规范,翻译工作全部丢给 OpenAICompatibleClient。

AgentLoop:while循环+tool_use检测

engine/query.pyrun_query()是整个项目的核心。它做的事说起来简单:

用户发一句话 → 模型回复(可能带 tool_use) → 有 tool_use?执行工具,把结果喂回去 → 模型再回复 → 还有 tool_use?继续循环 → 没有了,结束

用代码表达:

# engine/query.py 第88-143行(简化) turn_count = 0 while context.max_turns is None or turn_count < context.max_turns: turn_count += 1 # 上下文压缩检查 messages, was_compacted = await auto_compact_if_needed(...) # 调模型 async for event in context.api_client.stream_message(request): if isinstance(event, ApiTextDeltaEvent): yield AssistantTextDelta(text=event.text) # 逐字流式输出 elif isinstance(event, ApiMessageCompleteEvent): final_message = event.message # 模型说完了 messages.append(final_message) # 没有工具调用 → 结束 if not final_message.tool_uses: return # 有工具调用 → 执行 if len(tool_calls) == 1: result = await _execute_tool_call(...) # 单工具:顺序执行 tool_results = [result] else: results = await asyncio.gather(*[_run(tc) for tc in tool_calls]) # 多工具:并发 tool_results = list(results) # 把工具结果作为新的user消息追加 messages.append(ConversationMessage(role="user", content=tool_results)) # → 继续循环

几个关键设计决策:

1. 单工具顺序 vs 多工具并发的选择

如果模型一次返回了多个 tool_use(比如同时读3个文件),用asyncio.gather并发执行。只返回一个时走顺序路径。这个分支不是无谓的优化——并发时需要等所有工具都完成才能通知 UI 更新,顺序时则不需要等。两种场景的 UI 事件发送时序不同。

2. max_turns 的硬截断

# engine/query.py 第42-45行 class MaxTurnsExceeded(RuntimeError): def __init__(self, max_turns: int) -> None: super().__init__(f"Exceeded maximum turn limit ({max_turns})")

默认 200 轮。一次"turn" = 模型回复 + 可能的一次工具调用。200 轮对于大多数任务绰绰有余,但如果不设限,一个死循环的工具调用就能烧掉大量 token。这个截断是防御性的。

3. auto-compact:上下文太长怎么办

在每轮循环开始时检查:

# engine/query.py 第91-97行 messages, was_compacted = await auto_compact_if_needed( messages, api_client=context.api_client, model=context.model, system_prompt=context.system_prompt, state=compact_state, )

上下文压缩分两步:先尝试 microcompact(把旧工具结果的 content 清掉换成占位文本,成本极低),如果还不够,再做 full compact(调 LLM 对旧消息做摘要,成本较高但有意义)。阈值是AUTOCOMPACT_BUFFER_TOKENS = 13000,给模型预留的输出空间。

流式事件-引擎怎么通知UI

引擎在执行过程中产生 6 种事件:

# engine/stream_events.py AssistantTextDelta → 模型正在逐字输出,这是下一个字 AssistantTurnComplete → 模型说完了,附完整消息+token用量 ToolExecutionStarted → 开始执行工具,告诉UI显示了 ToolExecutionCompleted → 工具执行完毕,附输出 ErrorEvent → 出错了 StatusEvent → 临时状态通知(重试中之类)

UI 层只消费这些事件,不关心事件是怎么产生的。run_print_mode()run_repl()处理同一套事件,只是渲染方式不同。这个设计和前端框架里的"状态管理"是一个思路——引擎是 store,事件是 action,UI 是 view。

工具系统-AI操控电脑的接口

tools/base.py定义了工具的契约:

class BaseTool(ABC): name: str description: str input_model: type[BaseModel] # Pydantic模型,自动生成JSON Schema async def execute(self, arguments, context) -> ToolResult: """执行工具""" def is_read_only(self, arguments) -> bool: """是否只读。权限检查用""" def to_api_schema(self) -> dict: """转为API要求的JSON Schema格式"""

每个工具就是实现这 5 个东西。以文件读取为例:

# tools/file_read_tool.py class FileReadTool(BaseTool): name = "read_file" description = "Read a text file from the local repository." input_model = FileReadToolInput # path + offset + limit def is_read_only(self, arguments): return True # 读文件是只读操作 async def execute(self, arguments, context): # 读文件、编号行号、返回

is_read_only是关键——它直接关联权限系统。读操作自动放行,写操作触发权限检查。

Bash 工具更复杂一些。它的is_read_only默认返回 False(没法静态判断一个 shell 命令是不是只读),所以总是需要权限确认,除非用户在 full_auto 模式下:

# tools/bash_tool.py 第30-79行 async def execute(self, arguments, context): process = await create_shell_subprocess(arguments.command, cwd=cwd, ...) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=...) # 超时则 kill 进程 # 输出截断到 12000 字符 return ToolResult(output=text, is_error=process.returncode != 0, ...)

AgentTool 更有意思——工具里调子 Agent:

# tools/agent_tool.py 第43-94行 async def execute(self, arguments, context): executor = registry.get_executor("subprocess") config = TeammateSpawnConfig( name=arguments.subagent_type, prompt=arguments.prompt, cwd=str(context.cwd), ... ) result = await executor.spawn(config) return ToolResult(output=f"Spawned agent {result.agent_id} ...")

工具不再是简单的文件操作,而是可以递归地启动新的 Agent 进程。这就从"工具调用"升级到了"多 Agent 协作"。

Runtime组装-build_runtime的12步

ui/runtime.pybuild_runtime()把所有零件装到一起:

# ui/runtime.py 第163-301行 async def build_runtime(cwd, model, ...): # 1. 合并 CLI 覆盖到 settings settings = load_settings().merge_cli_overrides(...) # 2. 加载 plugins plugins = load_plugins(settings, cwd, ...) # 3. 创建 API Client resolved_api_client = _resolve_api_client_from_settings(settings) # 4. 连接 MCP servers mcp_manager = McpClientManager(load_mcp_server_configs(...)) await mcp_manager.connect_all() # 5. 创建 ToolRegistry(内置工具 + MCP 工具) tool_registry = create_default_tool_registry(mcp_manager) # 6. 初始化 AppState(UI 状态) app_state = AppStateStore(AppState(...)) # 7. 加载 Hooks hook_executor = HookExecutor(hook_reloader.current_registry(), ...) # 8. 构建 System Prompt system_prompt_text = build_runtime_system_prompt(settings, ...) # 9. 创建 QueryEngine engine = QueryEngine(api_client=..., tool_registry=..., ...) # 10. 恢复历史消息(如果有) if restore_messages: engine.load_messages(restored) # 11. 注册 slash 命令 commands = create_default_command_registry(...) # 12. 打包成 RuntimeBundle 返回 return RuntimeBundle(api_client=..., engine=..., ...)

这 12 步的依赖关系是单向的:settings 在最前面,engine 在最后面。RuntimeBundle只是一个 dataclass,把所有东西打成一个包。后续handle_line()从这个包里取东西用。

handle_line()是交互的核心——它判断用户输入是 slash 命令还是普通对话:

# ui/runtime.py 第428-567行 async def handle_line(bundle, line, ...): parsed = bundle.commands.lookup(line) if parsed is not None: # slash命令 → 走 CommandHandler result = await command.handler(args, context) # 可能触发 prompt 提交或 pending continuation else: # 普通对话 → 直接送 engine async for event in bundle.engine.submit_message(line): await render_event(event) # UI层渲染每个事件 bundle.session_backend.save_snapshot(...) # 自动存档

SystemPrompt-AI看到的第一段话

System Prompt 不是一段写死的文本,而是在build_runtime_system_prompt()里动态拼装的:

# prompts/context.py 第46-120行 def build_runtime_system_prompt(settings, cwd, ...): sections = [ build_system_prompt(), # 基础prompt(BASE_SYSTEM_PROMPT + 环境信息) f"Effort: {settings.effort}", # 推理设置 ] # Skills列表 skills = load_skill_registry(cwd, ...) if skills: sections.append(skills_section) # CLAUDE.md项目指令 claude_md = load_claude_md_prompt(cwd) if claude_md: sections.append(claude_md) # Issue/PR上下文(如果有) if issue_file.exists(): sections.append(issue_content) # 记忆系统 if settings.memory.enabled: memory_section = load_memory_prompt(cwd) sections.append(memory_section) relevant = find_relevant_memories(query, cwd) sections.append(relevant_section) return "\n\n".join(sections)

基础 prompt 本身就包含了环境信息——OS、Shell、日期、Git 分支等,由get_environment_info()自动探测。这样 AI 不用问"你是什么系统"就能直接给出正确的命令。

CLAUDE.md是放在项目根目录的一个文件,用户在里面写项目约定和偏好。它会被原样注入 System Prompt。这和 GitHub Copilot 的.github/copilot-instructions.md是一个思路。

总结

  • Agent Loop 的核心就是一个 while 循环:模型说一句 → 有 tool_use 就执行 → 结果喂回去 → 继续
  • 单工具顺序执行,多工具asyncio.gather并发。分支的原因不是性能,是 UI 事件时序不同
  • max_turns=200 是防御性的硬截断,auto-compact 在每轮循环前检查是否需要压缩上下文
  • BaseTool 的 5 个契约方法构成了工具系统的基础,is_read_only直接关联权限
  • RuntimeBundle 是会话的"零件包",12 步装配顺序是单向依赖
  • System Prompt 是动态拼装的:base + 环境 + skills + CLAUDE.md + 记忆

写到最后

http://www.jsqmd.com/news/1099841/

相关文章:

  • 如何深度掌控AMD Ryzen处理器:专业硬件调试工具完全指南
  • ros2 humble安装anaconda
  • 机器人-混合关节架构
  • Certbot:免费自动化 HTTPS 证书管理工具
  • 2026年桌面风扇推荐:三款不同功能定位机型,按需选择不踩坑
  • 【毕设级】SpringBoot + MySQL + Thymeleaf 实现高校教材征订管理系统(班级统订+个人补订)
  • Linux生产环境硬盘挂载:告别盘符漂移,使用UUID实现稳定自动挂载
  • 手把手教你用SM2259XT2开卡工具修复固态硬盘(附B0KB颗粒实测)
  • 小学期记录
  • Awesome LLM Skills:给 AI 编程助手装上各种技能包
  • 3分钟掌握深度学习漫画翻译神器:BallonsTranslator完全指南
  • 机器人-从“性能参数领先”转向“工业化能力领先”
  • 效率够高吗?8款AI论文网站排行榜,毕业季救星!
  • Docker部署-非root用户openEuler 20.03部署
  • How To Secure A Linux Server:一份持续更新的服务器安全加固手册
  • 2026年6月个人工作生活总结
  • Linux Page Cache 导致视频解码第一次慢、第二次快的原因分析与缓存清理方法
  • PYTHON+AI LLM DAY NINTY-TWO
  • vmware安装win10教程(保姆级图文):VMware16虚拟机部署Windows10,附win10镜像iso文件下载
  • OpenHarness源码研究-3-codex配置到输出对话
  • PDF转Excel免费工具推荐,批量转换不收费绿色版
  • Windows 11本地部署GLM-5.2大模型:集成Claw工具调用与Agent知识库实战
  • 零基础自学C++逆向学习日记 Day.5
  • 【题解-信息学奥赛一本通】1224:最大子矩阵
  • 【数仓避坑04】金额换算精度踩坑:先除后乘导致大额资金隐性资损,先乘后除精度最优详解
  • 当企业应用AI销冠系统时,如何利用数字员工提升工作效率?
  • 数据库工程:生产级查询优化全案例拆解‌
  • 企业级离线翻译架构重构:LibreTranslate 1.9.6如何实现数据主权与性能突破
  • 2026年AI企业服务系统五大评测:乔掌门AI与同类品牌深度对比排名推荐
  • 基于微积分思维的数学分析教学