mini 的 agent 循环是它最值钱的设计。它把"模型推理→执行动作→观察结果→继续推理"这个核心闭环压缩到了极致:核心循环不到 20 行,但通过异常驱动的控制流和线性的消息历史,覆盖了正常执行、任务完成、格式错误、超限退出、用户中断、未捕获异常等所有路径。这份笔记只讲这一条控制流是怎么工作的。
1. 核心循环
1.1 run() — 顶层入口
DefaultAgent.run() 是 agent 生命周期的顶层入口:
def run(self, task: str = "", **kwargs) -> dict:self.extra_template_vars |= {"task": task, **kwargs}self.messages = []self.add_messages(self.model.format_message(role="system", content=self._render_template(self.config.system_template)),self.model.format_message(role="user", content=self._render_template(self.config.instance_template)),)while True:try:self.step()except InterruptAgentFlow as e:self.add_messages(*e.messages)except Exception as e:self.handle_uncaught_exception(e)raisefinally:self.save(self.config.output_path)if self.messages[-1].get("role") == "exit":breakreturn self.messages[-1].get("extra", {})
这段代码虽然短,但包含了 agent 循环的全部状态管理逻辑。值得拆开细看。
1.2 消息初始化
前两条消息固定是 system 和 user:
self.add_messages({"role": "system", "content": "<渲染后的 system_template>"},{"role": "user", "content": "<渲染后的 instance_template>"},
)
system 消息告诉模型它是什么、能做什么、有什么规则;instance 消息告诉模型当前的具体任务。两条消息都用 Jinja2 渲染,渲染变量来自 agent 配置、环境信息(如 uname)、模型配置和外部注入的 task。
此后的所有消息都是这对初始消息基础上的追加。每次模型调用和工具执行都会在上一条消息后面继续添加。所以 self.messages 永远是一个线性的、从 system 开始的列表。
1.3 主循环的判断
每条消息都有一个 role 字段。角色有几类:
system:系统指令user:用户输入或工具执行结果assistant:模型回复tool:工具结果(tool call 模式)exit:退出标记
主循环只检查最后一条消息的 role 是不是 exit。exit 不是标准 API 角色,而是 mini 自己定义的状态标记。一条消息能被标记为 exit 的方式只有几种:
- 环境检测到
COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT→Submitted("exit", ...)异常 - 步数或成本超限 →
LimitsExceeded("exit", ...)异常 - 未捕获的异常 →
handle_uncaught_exception("exit", ...)
正常执行过程中(model 回答 → 工具执行 → 观察 → 继续),不会有消息的 role 是 exit,所以循环一直继续。
1.4 step() — 单步执行
def step(self) -> list[dict]:return self.execute_actions(self.query())
step() 只有一行,但它定义了 agent 一个步骤的完整语义:先问模型要动作,再执行动作。
反过来想:如果你要给 mini 加一个"在每次模型调用前检查某些条件"的 hook,你应该覆写 query();如果你要给 mini 换一种"执行完动作后怎么把结果返回给模型"的方式,你应该覆写 execute_actions()。这就是这一行代码表达的架构语义。
1.5 query() — 模型调用
def query(self) -> dict:if 0 < self.config.step_limit <= self.n_calls or 0 < self.config.cost_limit <= self.cost:raise LimitsExceeded({"role": "exit", "content": "LimitsExceeded","extra": {"exit_status": "LimitsExceeded", "submission": ""},})self.n_calls += 1message = self.model.query(self.messages)self.cost += message.get("extra", {}).get("cost", 0.0)self.add_messages(message)return message
这里有三层含义:
-
前置检查:步数和成本限制在模型调用之前检查。这意味着限制是"不能超过"而不是"超过后多跑了一步再停"。设置
step_limit=0或cost_limit=0表示无限制。 -
调用与追加:
self.model.query(self.messages)把当前完整消息历史发给模型,模型返回的 assistant 消息立刻通过add_messages()追加到历史末尾。这意味着模型永远看到截至上一轮为止的完整历史。 -
返回消息给 step:
query()返回的是一条 assistant 消息,它的extra.actions字段里包含了从模型响应中解析出来的动作列表。step()把这个消息传给execute_actions()。
1.6 execute_actions() — 动作执行与结果回灌
def execute_actions(self, message: dict) -> list[dict]:outputs = [self.env.execute(action) for action in message.get("extra", {}).get("actions", [])]return self.add_messages(*self.model.format_observation_messages(message, outputs, self.get_template_vars()))
这里的每一步都值得注意:
- 从 assistant 消息的
extra.actions里提取动作列表,这里每一个 action 都是{"command": "...", "tool_call_id": "..."}格式。 - 逐个执行:
self.env.execute(action)。注意这是一个列表推导式,如果模型在一次回复中调用了多个工具,三个动作会依次执行。execute_actions不捕获异常——如果某个命令执行过程中环境抛出了Submitted,它会直接冒泡到run()的InterruptAgentFlow分支。 - 结果格式化:
model.format_observation_messages()把每个 action 和对应的 output 配对,用配置好的observation_template渲染成模型可读的工具结果消息。 - 结果回灌:格式化的结果消息通过
add_messages追加到历史末尾。下一轮query()时模型就能看到这些结果。
2. 异常驱动的控制流
mini 用异常而不是返回值来控制 agent 状态的改变。这个设计让正常路径(模型回答→执行→继续)保持极简,而不需要每一步都检查"是否该退出/是否格式错误/是否超限"。
2.1 异常体系
所有控制异常都继承自 InterruptAgentFlow:
class InterruptAgentFlow(Exception):def __init__(self, *messages: dict):self.messages = messagesclass Submitted(InterruptAgentFlow): ...
class LimitsExceeded(InterruptAgentFlow): ...
class UserInterruption(InterruptAgentFlow): ...
class FormatError(InterruptAgentFlow): ...
InterruptAgentFlow 携带的不是错误消息,而是要追加到消息历史的消息列表。当异常被 run() 的 except InterruptAgentFlow 捕获后,异常携带的消息通过 add_messages(*e.messages) 追加到历史中。
这意味着一件事:异常不是"出错",而是一种"从正常路径跳转到消息追加然后继续检查循环条件"的控制机制。每种异常携带的消息定义了该事件在消息历史里留下的痕迹。
2.2 Submitted — 环境检测到任务完成
触发:任何 Environment 实现的 _check_finished 方法,检查命令输出的第一行是否是 COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT,且 returncode 为 0。
# LocalEnvironment._check_finished
lines = output.get("output", "").lstrip().splitlines(keepends=True)
if lines and lines[0].strip() == "COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT" and output["returncode"] == 0:submission = "".join(lines[1:])raise Submitted({"role": "exit","content": submission,"extra": {"exit_status": "Submitted", "submission": submission},})
冒泡路径:env.execute() → execute_actions() 列表推导式 → step() → run() 的 InterruptAgentFlow 分支
结果:携带的 exit 消息被追加到历史。下一轮 while 循环检查到 messages[-1].role == "exit",退出循环,run() 返回最后一条消息的 extra 字典(包含 exit_status 和 submission)。
COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT 是一个约定俗成的信号字符串。模型通过 prompt 里的指令学会在完成任务后输出这个命令。它像网络协议里的 FIN 报文——不是系统强加的控制命令,而是一种模型和环境之间约定的通信信号。环境不把它当作"需要执行的内容",而是当作"停止信号"。
2.3 LimitsExceeded — 超限
触发:query() 的前置检查,步数或成本超限。
if 0 < self.config.step_limit <= self.n_calls or 0 < self.config.cost_limit <= self.cost:raise LimitsExceeded({"role": "exit","content": "LimitsExceeded","extra": {"exit_status": "LimitsExceeded", "submission": ""},})
冒泡路径:query() → step() → run()
结果:与 Submitted 类似,但 exit_status 是 "LimitsExceeded",submission 为空。InteractiveAgent 会覆写 query(),在超限时提示用户输入新的步数和成本限制,然后重新调用 super().query(),不会直接退出。
2.4 FormatError — 模型输出格式不对
触发:parse_toolcall_actions() 在解析模型的 tool_call 时发现任何格式问题。
可能的原因:
- 没有 tool_calls(每条回复必须至少有一个)
- 工具名不是
bash(不支持其他工具) - 没有
command参数 - 参数 JSON 解析失败
if error_msg:raise FormatError({"role": "user","content": Template(format_error_template).render(actions=[], error=error_msg),"extra": {"interrupt_type": "FormatError"},})
注意:这条消息的 role 是 "user",不是 "exit"。也就是说 FormatError 不会导致循环退出,它只是把一个"格式更正提示"作为 user 消息插入历史,然后让模型重新回答。
路径:model.query() → _parse_actions() 内部 → step() → run() 的 InterruptAgentFlow 分支 → 错误消息被追加 → 循环继续(因为 role 不是 exit)→ 下一轮 query() 时模型看到错误提示,尝试纠正。
这是一种自纠正机制。系统不因为模型一次格式错误就失败,而是把正确格式再次告诉模型,让它在下一轮纠正。只有一种情况格式错误会导致退出:模型反复输出错误格式,直到超出步数限制。
2.5 UserInterruption — 用户中断
这是 InteractiveAgent 特有的异常,DefaultAgent 不包含这个逻辑。
触发场景:
- 用户在 confirm 模式下拒绝执行命令并附上了反馈
- 用户按 Ctrl+C 中断并输入了新指令
- 用户通过
/u切换到 human 模式
def _interrupt(self, content: str, *, itype: str = "UserInterruption") -> NoReturn:raise UserInterruption({"role": "user", "content": content,"extra": {"interrupt_type": itype},})
消息的 role 是 "user",所以和 FormatError 一样,不会退出循环。模型下一轮会看到用户的反馈并调整行为。
itype 字段区分不同类型:
"UserInterruption":用户中途打断了模型"UserNewTask":用户在 agent 完成后给了新任务"UserRejection":用户拒绝了模型提出的命令
2.6 未捕获异常
except Exception as e:self.handle_uncaught_exception(e)raise
handle_uncaught_exception 追加一个 role=exit 的消息,包含异常信息、traceback 和空的 submission。然后重新抛出。所以未捕获异常会同时做两件事:在轨迹里留下记录,以及让调用方知道发生了什么。
3. InteractiveAgent 的扩展
3.1 三种模式
InteractiveAgent 覆写了 query()、step() 和 execute_actions() 来加入人机交互。三种模式通过 config.mode 控制:
- human:
query()里直接让用户输入命令,不调模型 - confirm:
execute_actions()里对每条命令做白名单匹配,不匹配的要用户按回车确认 - yolo:不掉确认逻辑,全部直接执行
3.2 query() 覆写 — human 模式
def query(self) -> dict:if self.config.mode == "human":match command := self._prompt_and_handle_slash_commands("> "):case "/y" | "/c":pass # 只切换模式,命令不执行case _:msg = {"role": "user","content": f"User command: \n```bash\n{command}\n```","extra": {"actions": [{"command": command}]},}self.add_messages(msg)return msgreturn super().query()
human 模式下用户输入的命令直接包装成 user 消息,附带 action。execute_actions 会执行它,就像它是模型的输出一样。这样的统一处理意味着"用户手工操作"和"模型推理出命令"在消息历史里遵循完全相同的格式。
3.3 execute_actions() 覆写 — confirm 模式
def execute_actions(self, message: dict) -> list[dict]:actions = message.get("extra", {}).get("actions", [])commands = [action["command"] for action in actions]outputs = []try:self._ask_confirmation_or_interrupt(commands)for action in actions:outputs.append(self.env.execute(action))except Submitted as e:self._check_for_new_task_or_submit(e)finally:result = self.add_messages(*self.model.format_observation_messages(message, outputs, self.get_template_vars()))return result
关键设计点在 finally:即使 Submitted 被抛出、用户被询问新任务、最终决定退出,add_messages 仍然会执行。这保证了当前步骤已执行的操作结果(outputs 列表里的内容)一定会被写入消息历史,不会因为退出而丢失。
_ask_confirmation_or_interrupt 就是这一步的人机安全边界。它在命令真正执行前停下来问用户:
def _ask_confirmation_or_interrupt(self, commands: list[str]) -> None:if not any(self._should_ask_confirmation(c) for c in commands):return # 全部命令都不需要确认,直接通过prompt = (f"Execute {len(commands)} action(s)? Enter to confirm, ""type comment to reject, or /h to show available commands\n> ")match user_input := self._prompt_and_handle_slash_commands(prompt).strip():case "" | "/y": # 回车或 /y → 确认执行passcase "/u": # /u → 不执行,切到 human 模式让用户自己接管self._interrupt("Commands not executed. Switching to human mode")case _: # 输入了其他文字 → 不执行,用户反馈作为下一轮上下文self._interrupt(f"Commands not executed. The user rejected with: {user_input}")
三种结果:
- 确认(回车/
/y):什么也不做,函数返回,命令继续往下执行 - 拒绝并反馈(输入文字):抛
UserInterruption,用户的反馈被塞进下一轮模型上下文。模型下一轮会看到"刚才的命令被拒绝了,理由是:xxx",然后调整行为 - 切换到 human 模式(
/u):抛UserInterruption,告知模型"命令没有执行,已切换到 human 模式"
而 _should_ask_confirmation 的判断逻辑是:当前模式是 confirm 且命令不匹配白名单里的正则。比如你可以配置 whitelist_actions: ["ls", "cat .*"],那 ls 和 cat 开头的命令就不会弹出确认。
3.4 confirm_exit + Submitted 拦截
当环境抛出 Submitted 后,InteractiveAgent 会拦截它:
except Submitted as e:self._check_for_new_task_or_submit(e)
_check_for_new_task_or_submit 询问用户:"agent 想结束了,输入新任务或回车退出"。如果用户输入了新任务,它抛出一个 UserInterruption(role=user,带新任务内容),循环继续。如果用户直接回车,Submitted 被重新抛出,run() 的 InterruptAgentFlow 分支收到后正常退出。
3.5 斜杠命令
三个模式切换命令:
/y→ yolo 模式/c→ confirm 模式/u→ human 模式/m→ 多行输入/h→ 帮助
这些命令不进入消息历史,只影响当前 agent 的状态。_prompt_and_handle_slash_commands 是一个自循环——输入 /h 后显示帮助再重新 prompt。
4. 消息管理的精要
4.1 add_messages 是唯一的消息写入点
def add_messages(self, *messages: dict) -> list[dict]:self.messages.extend(messages)return list(messages)
整个 agent 只有这一个方法往 self.messages 里加东西。这带来了几个好处:
- 所有消息变更都能在这一个地方打断点
- InteractiveAgent 只需覆写这一个方法就能在每条消息被添加时附加 UI 输出
- Debug 日志只需在
add_messages里加一句logger.debug
4.2 消息格式
内部消息格式比 API 消息多了一个 extra 字段:
# assistant 消息
{"role": "assistant","content": "...","extra": {"actions": [{"command": "...", "tool_call_id": "..."}],"response": {<完整的 litellm 响应对象>},"cost": 0.01,"timestamp": 1716000000.0}
}# 工具结果消息
{"role": "tool","tool_call_id": "...","content": "<渲染后的 observation_template>","extra": {"raw_output": "<原始输出>","returncode": 0,"timestamp": 1716000000.0}
}
extra 字段在发给模型 API 前通过 _prepare_messages_for_api 去掉(只保留 API 需要的字段),但它在轨迹保存时被完整保留。这样轨迹文件里既有模型看到的 content,又有调试和分析需要的 metadata。
4.3 线性历史的威力
因为消息历史是完全线性的,agent 的当前状态可以通过 self.messages 这个列表完全表达。没有隐藏状态,没有被压缩掉的旧消息(除非你通过覆写 query() 来实现),没有分支。这意味着:
- 轨迹文件可以完整复现一次运行
- 微调数据不需要任何"上下文还原"步骤
- 某轮模型看到什么,完全由
messages[:n]决定
这和 onyx 的消息树设计、cline 的界面消息/模型消息分离形成了鲜明对比。mini 选择"什么都没有"这个最简单的状态管理方案。
