前言
Human in the loop(人机协作)在企业级 Agent 应用中非常重要——AI 在执关键工具时必须经过人类审批,避免误操作影响业务。我之前用 LangGraph 0.3 裸写了一套(旧文),当时需要在 tool 函数里手动调 interrupt(),很啰嗦。如今有了 DeepAgents 和内置的 HumanInTheLoopMiddleware,只需配置一个 interrupt_on 字典,中断逻辑全自动——在执行前暂停图执行,保存状态到 checkpointer,等待人类决策后恢复。
不过官方文档的示例代码比较简单,只演示了基本用法,没有说明如何在真实应用中整合。本文以"允许 AI 执行 shell 命令,但每次执行前需用户确认"为需求,一步步实现完整的人机协作流程。
核心概念
在开始之前,先理清几个关键概念:
| 概念 | 说明 |
|---|---|
| interrupt(中断) | 当 Agent 准备调用某个被监控的 tool 时,HumanInTheLoopMiddleware 调用 LangGraph 的 interrupt() 暂停图执行,并抛出包含 action_requests 和 review_configs 的请求 |
| checkpoint(检查点) | 中断时图状态会被持久化。必须配置 checkpointer,否则中断后无法恢复。生产环境建议用 AsyncPostgresSaver,测试用 InMemorySaver |
version="v2" |
LangGraph 1.0 的 v2 模式,ainvoke() 返回 GraphOutput 对象(含 .interrupts 属性),astream() 的 updates 流中会出现 __interrupt__ 事件 |
| Command(resume=) | 用户做出决策后,用 Command(resume={"decisions": [...]}) 从断点恢复执行 |
| Decision(决策) | 四种类型:approve(批准)、reject(拒绝并反馈)、edit(修改参数后执行)、respond(人类直接回答,跳过 tool 执行) |
执行生命周期
用户提问 → Agent 调用 LLM 生成回复→ LLM 决定调用 tool(如 execute_shell_command)→ after_model 钩子:检查 tool 是否在 interrupt_on 中→ 是:构建 HITLRequest → interrupt() → 暂停 ⌛→ 否:继续执行→ 人类做出决策(approve / reject / edit / respond)→ 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回
流程逻辑
以 Chainlit 聊天应用为交互载体,消息处理流程如下:
- 用户在聊天页面发送消息(如"检查下系统负载")
- Agent 调用 LLM 生成回复,LLM 决定调用
execute_shell_command HumanInTheLoopMiddleware检测到该 tool 在interrupt_on列表中,触发中断- Chainlit 应用检测到中断,向用户展示审批提示
- 用户回复
批准/拒绝,应用用Command(resume=)恢复执行 - Agent 根据决策执行或拒绝 tool,最终返回结果给用户
配置中断
首先需要在创建 Agent 时配置 HumanInTheLoopMiddleware:
from deepagents import create_deep_agent
from langchain.agents.middleware import HumanInTheLoopMiddlewareagent = create_deep_agent(model=llm,tools=[execute_shell_command],checkpointer=checkpointer, # 必须配置!system_prompt="你是一位智能助手...",middleware=[HumanInTheLoopMiddleware(interrupt_on={# 对 execute_shell_command 进行审批"execute_shell_command": {"allowed_decisions": ["approve", "reject"]}}),],
)
interrupt_on 是一个字典,key 为 tool 名称,value 的可配置项:
True— 允许所有四种决策(approve / edit / reject / respond)False— 不拦截该 tool(等同于不写){"allowed_decisions": [...]}— 只允许指定决策类型- 还可以配置
when谓词按参数条件判断是否拦截、description自定义中断提示文本
invoke 模式中的实现
v2 模式下的 ainvoke() 返回 GraphOutput 对象,可通过 .interrupts 属性直接获取中断数据,不需要去查 state。
检测中断
resp = await agent.ainvoke(input={"messages": [HumanMessage(content=query)]},config=config,version="v2",
)if resp.interrupts:# 存在中断,resp.interrupts 是 Interrupt 对象的元组interrupt = resp.interrupts[0]# interrupt.value 是 HITLRequest,包含 action_requests 和 review_configsprint(interrupt.value["action_requests"])
恢复中断
用户做出决策后,用 Command(resume=) 恢复:
from langgraph.types import Commandawait agent.ainvoke(Command(resume={"decisions": [{"type": "approve"}] # 或 {"type": "reject", "message": "..."}}),config=config, # 必须用同一个 thread_idversion="v2",
)
关键:如何分辨"新消息"还是"中断恢复"
在聊天应用中,用户发来的每条消息都走同一个 @cl.on_message 处理函数。用户说"检查负载"和回复"批准"都只是文本。解决方法是——调用前先检查是否有待处理的中断:
# 检查当前会话是否有待处理的中断
state = await agent.aget_state(config)
if state.next:# 有待处理中断 → 本次消息是审批回复,构建 resume 命令cmd = Command(resume={"decisions": [{"type": "approve"}]})await agent.ainvoke(cmd, config=config, version="v2")
else:# 无中断 → 正常对话resp = await agent.ainvoke({"messages": [HumanMessage(content=query)]}, config=config, version="v2")
state.next 不为空表示图执行被暂停了(有中断等待处理)。
stream 模式中的实现
流式模式需要用 stream_mode=["messages", "updates"](官方推荐同时开启两种流):
messages流:获取 LLM 的 token 级输出updates流:检测中断事件__interrupt__
async for chunk in agent.astream(input=input_data,stream_mode=["messages", "updates"],version="v2",config=config,
):if chunk["type"] == "messages":msg, _meta = chunk["data"]# msg 是 AIMessageChunk,包含 content 和 tool_callsif isinstance(msg, AIMessageChunk) and msg.content:yield extract_text(msg) # 流式输出文本elif chunk["type"] == "updates":if "__interrupt__" in chunk["data"]:interrupt = chunk["data"]["__interrupt__"][0]yield format_question(interrupt) # 输出审批问题
stream 模式的恢复与 invoke 类似——在调用 astream() 之前同样要先检查 state.next 来判断是正常对话还是中断恢复。
完整示例
下面是核心代码。
checkpointer和llm的配置函数、日志模块等非核心代码省略。
Agent 封装(internal/agent/agent.py 核心部分)
# --- 审批关键词匹配 ---
_APPROVE_KEYWORDS = frozenset({"yes", "accept", "approve", "ok", "是", "允许", "同意", "批准"}
)def _parse_decision(query: str) -> str:return "approve" if query.strip().lower() in _APPROVE_KEYWORDS else "reject"def _build_resume_command(decision_type: str, actions_count: int) -> Command:item = {"type": decision_type}if decision_type == "reject":item["message"] = "user rejected this action"return Command(resume={"decisions": [item for _ in range(actions_count)]})def _extract_text(message) -> str:"""从消息中提取纯文本(兼容 str 和 list[dict] 两种 content 格式)。"""if not message or not hasattr(message, "content"):return ""content = message.contentif isinstance(content, str):return contentif isinstance(content, list):return "".join(b.get("text", "")for b in contentif isinstance(b, dict) and b.get("type") == "text")return ""def _format_interrupt_question(interrupt) -> str:"""将中断数据格式化为用户的审批问题。"""action_requests = interrupt.value.get("action_requests", [])review_configs = interrupt.value.get("review_configs", [])allowed = (review_configs[0].get("allowed_decisions", ["approve", "reject"])if review_configselse ["approve", "reject"])lines = []for req in action_requests:lines.append("Do you approve me to execute this action?\n\n"f"- name: {req['name']}\n"f"- args: `{req['args']}`\n")lines.append(f"Input your decision: {', '.join(allowed)}\n")return "\n".join(lines)class AIAgent:# ... __init__, _init_deep_agent, _init_tools 省略 ...async def _has_pending_interrupt(self, config: RunnableConfig) -> bool:state = await self._agent.aget_state(config)return bool(state.next)# --- invoke 模式 ---async def ainvoke(self, query: str, config: RunnableConfig) -> str:if not self._agent:await self._init_deep_agent()# 优先处理中断恢复if await self._has_pending_interrupt(config):state = await self._agent.aget_state(config)actions_count = len(state.interrupts[0].value["action_requests"])decision = _parse_decision(query)cmd = _build_resume_command(decision, actions_count)await self._agent.ainvoke(cmd, config=config, version="v2")# 恢复后取最新消息state = await self._agent.aget_state(config)if state.values and "messages" in state.values:return _extract_text(state.values["messages"][-1])return "Oops, something went wrong."# 正常对话resp = await self._agent.ainvoke(input={"messages": [HumanMessage(content=query)]},config=config,version="v2",)if resp.interrupts:return _format_interrupt_question(resp.interrupts[0])return _extract_text(resp.value["messages"][-1])# --- stream 模式 ---async def astream(self, query: str, config: RunnableConfig):if not self._agent:await self._init_deep_agent()# 判断是中断恢复还是正常对话state = await self._agent.aget_state(config)if state.next:actions_count = len(state.interrupts[0].value["action_requests"])decision = _parse_decision(query)input_data = _build_resume_command(decision, actions_count)else:input_data = {"messages": [HumanMessage(content=query)]}async for chunk in self._agent.astream(input=input_data,stream_mode=["messages", "updates"],version="v2",config=config,):if chunk["type"] == "messages":msg, _meta = chunk["data"]if isinstance(msg, AIMessageChunk) and msg.content:yield _extract_text(msg)elif chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:yield _format_interrupt_question(chunk["data"]["__interrupt__"][0])
Chainlit 应用层(chainlit_app.py 核心部分)
@cl.on_message
async def main(msg: cl.Message):config = RunnableConfig(configurable={"thread_id": cl.context.session.id},)# stream 模式(推荐)final_answer = cl.Message(content="")async for chunk in ai_agent.astream(msg.content, config=config):await final_answer.stream_token(chunk)await final_answer.send()# 或者 invoke 模式# resp = await ai_agent.ainvoke(msg.content, config)# await cl.Message(resp).send()
Chainlit 应用层的代码非常简洁——因为中断检测和恢复逻辑全部封装在 AIAgent 内部了。Chainlit 只需要流式输出 astream() / ainvoke() 的返回结果即可。
交互效果
[用户]: 检查下系统负载[AI]: 🔧 正在调用工具: execute_shell_command...[AI]: Do you approve me to execute this action?- name: execute_shell_command- args: `{"command": "cat /proc/loadavg && free -h", "timeout": 10}`Input your decision: approve, reject[用户]: 批准[AI]: 当前系统负载: 0.52 0.38 0.25,内存总容量 62Gi,已用 10Gi,剩余 46Gi,系统运行正常。
进阶:使用 interrupt_on 的 when 谓词
如果不想拦截所有 shell 命令,只想拦截危险操作(如 rm、dd、写入系统目录等),可以用 when 谓词按参数条件判断:
from langgraph.prebuilt.tool_node import ToolCallRequestdef is_dangerous_command(request: ToolCallRequest) -> bool:"""只拦截包含危险操作的命令。"""command = request.tool_call["args"].get("command", "")dangerous = {"rm ", "dd ", "mkfs", "shutdown", "reboot"}return any(d in command for d in dangerous)HumanInTheLoopMiddleware(interrupt_on={"execute_shell_command": {"allowed_decisions": ["approve", "reject"],"when": is_dangerous_command, # 只在危险命令时拦截}}
)
when 谓词返回 True 才触发中断,返回 False 则自动批准。注意 when 需要 langchain >= 1.3.3。
改进点
- 目前 reject 时用的是固定消息,实际产品中可以让用户输入拒绝原因,方便 LLM 调整后续行为
- 审批提示目前是纯文本,可以用 Chainlit 的
AskActionMessage做成按钮交互(不过受制于 Chainlit Action 的payload类型限制,需要额外处理) - 如果有多个 tool 同时被拦截,
action_requests列表中会有多项,本文为简化只取了第一个,生产环境应遍历处理
补充
DeepAgents 的 HumanInTheLoopMiddleware 把之前需要手写的中断逻辑全部封装好了。集成到真实应用的关键只有三步:
- 创建 Agent 时:配置
interrupt_on字典 + 确保有 checkpointer - 每次调用前:通过
state.next判断是正常对话还是中断恢复 - 恢复时:用
Command(resume={"decisions": [...]})传入用户决策
ainvoke 和 astream 两种模式的核心逻辑一致,只是检测中断的方式不同(.interrupts 属性 vs updates 流中的 __interrupt__)。
