当前位置: 首页 > news >正文

DeepAgents - Human in the loop

前言

Human in the loop(人机协作)在企业级 Agent 应用中非常重要——AI 在执关键工具时必须经过人类审批,避免误操作影响业务。我之前用 LangGraph 0.3 裸写了一套(旧文),当时需要在 tool 函数里手动调 interrupt(),很啰嗦。如今有了 DeepAgents 和内置的 HumanInTheLoopMiddleware,只需配置一个 interrupt_on 字典,中断逻辑全自动——在执行前暂停图执行,保存状态到 checkpointer,等待人类决策后恢复。

不过官方文档的示例代码比较简单,只演示了基本用法,没有说明如何在真实应用中整合。本文以"允许 AI 执行 shell 命令,但每次执行前需用户确认"为需求,一步步实现完整的人机协作流程。

核心概念

在开始之前,先理清几个关键概念:

概念 说明
interrupt(中断) 当 Agent 准备调用某个被监控的 tool 时,HumanInTheLoopMiddleware 调用 LangGraph 的 interrupt() 暂停图执行,并抛出包含 action_requestsreview_configs 的请求
checkpoint(检查点) 中断时图状态会被持久化。必须配置 checkpointer,否则中断后无法恢复。生产环境建议用 AsyncPostgresSaver,测试用 InMemorySaver
version="v2" LangGraph 1.0 的 v2 模式,ainvoke() 返回 GraphOutput 对象(含 .interrupts 属性),astream()updates 流中会出现 __interrupt__ 事件
Command(resume=) 用户做出决策后,用 Command(resume={"decisions": [...]}) 从断点恢复执行
Decision(决策) 四种类型:approve(批准)、reject(拒绝并反馈)、edit(修改参数后执行)、respond(人类直接回答,跳过 tool 执行)

执行生命周期

用户提问 → Agent 调用 LLM 生成回复→ LLM 决定调用 tool(如 execute_shell_command)→ after_model 钩子:检查 tool 是否在 interrupt_on 中→ 是:构建 HITLRequest → interrupt() → 暂停 ⌛→ 否:继续执行→ 人类做出决策(approve / reject / edit / respond)→ 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回

流程逻辑

以 Chainlit 聊天应用为交互载体,消息处理流程如下:

  1. 用户在聊天页面发送消息(如"检查下系统负载")
  2. Agent 调用 LLM 生成回复,LLM 决定调用 execute_shell_command
  3. HumanInTheLoopMiddleware 检测到该 tool 在 interrupt_on 列表中,触发中断
  4. Chainlit 应用检测到中断,向用户展示审批提示
  5. 用户回复 批准 / 拒绝,应用用 Command(resume=) 恢复执行
  6. Agent 根据决策执行或拒绝 tool,最终返回结果给用户

配置中断

首先需要在创建 Agent 时配置 HumanInTheLoopMiddleware

from deepagents import create_deep_agent
from langchain.agents.middleware import HumanInTheLoopMiddlewareagent = create_deep_agent(model=llm,tools=[execute_shell_command],checkpointer=checkpointer,  # 必须配置!system_prompt="你是一位智能助手...",middleware=[HumanInTheLoopMiddleware(interrupt_on={# 对 execute_shell_command 进行审批"execute_shell_command": {"allowed_decisions": ["approve", "reject"]}}),],
)

interrupt_on 是一个字典,key 为 tool 名称,value 的可配置项:

  • True — 允许所有四种决策(approve / edit / reject / respond)
  • False — 不拦截该 tool(等同于不写)
  • {"allowed_decisions": [...]} — 只允许指定决策类型
  • 还可以配置 when 谓词按参数条件判断是否拦截、description 自定义中断提示文本

invoke 模式中的实现

v2 模式下的 ainvoke() 返回 GraphOutput 对象,可通过 .interrupts 属性直接获取中断数据,不需要去查 state。

检测中断

resp = await agent.ainvoke(input={"messages": [HumanMessage(content=query)]},config=config,version="v2",
)if resp.interrupts:# 存在中断,resp.interrupts 是 Interrupt 对象的元组interrupt = resp.interrupts[0]# interrupt.value 是 HITLRequest,包含 action_requests 和 review_configsprint(interrupt.value["action_requests"])

恢复中断

用户做出决策后,用 Command(resume=) 恢复:

from langgraph.types import Commandawait agent.ainvoke(Command(resume={"decisions": [{"type": "approve"}]  # 或 {"type": "reject", "message": "..."}}),config=config,  # 必须用同一个 thread_idversion="v2",
)

关键:如何分辨"新消息"还是"中断恢复"

在聊天应用中,用户发来的每条消息都走同一个 @cl.on_message 处理函数。用户说"检查负载"和回复"批准"都只是文本。解决方法是——调用前先检查是否有待处理的中断

# 检查当前会话是否有待处理的中断
state = await agent.aget_state(config)
if state.next:# 有待处理中断 → 本次消息是审批回复,构建 resume 命令cmd = Command(resume={"decisions": [{"type": "approve"}]})await agent.ainvoke(cmd, config=config, version="v2")
else:# 无中断 → 正常对话resp = await agent.ainvoke({"messages": [HumanMessage(content=query)]}, config=config, version="v2")

state.next 不为空表示图执行被暂停了(有中断等待处理)。

stream 模式中的实现

流式模式需要用 stream_mode=["messages", "updates"](官方推荐同时开启两种流):

  • messages 流:获取 LLM 的 token 级输出
  • updates 流:检测中断事件 __interrupt__
async for chunk in agent.astream(input=input_data,stream_mode=["messages", "updates"],version="v2",config=config,
):if chunk["type"] == "messages":msg, _meta = chunk["data"]# msg 是 AIMessageChunk,包含 content 和 tool_callsif isinstance(msg, AIMessageChunk) and msg.content:yield extract_text(msg)     # 流式输出文本elif chunk["type"] == "updates":if "__interrupt__" in chunk["data"]:interrupt = chunk["data"]["__interrupt__"][0]yield format_question(interrupt)  # 输出审批问题

stream 模式的恢复与 invoke 类似——在调用 astream() 之前同样要先检查 state.next 来判断是正常对话还是中断恢复。

完整示例

下面是核心代码。checkpointerllm 的配置函数、日志模块等非核心代码省略。

Agent 封装(internal/agent/agent.py 核心部分)

# --- 审批关键词匹配 ---
_APPROVE_KEYWORDS = frozenset({"yes", "accept", "approve", "ok", "是", "允许", "同意", "批准"}
)def _parse_decision(query: str) -> str:return "approve" if query.strip().lower() in _APPROVE_KEYWORDS else "reject"def _build_resume_command(decision_type: str, actions_count: int) -> Command:item = {"type": decision_type}if decision_type == "reject":item["message"] = "user rejected this action"return Command(resume={"decisions": [item for _ in range(actions_count)]})def _extract_text(message) -> str:"""从消息中提取纯文本(兼容 str 和 list[dict] 两种 content 格式)。"""if not message or not hasattr(message, "content"):return ""content = message.contentif isinstance(content, str):return contentif isinstance(content, list):return "".join(b.get("text", "")for b in contentif isinstance(b, dict) and b.get("type") == "text")return ""def _format_interrupt_question(interrupt) -> str:"""将中断数据格式化为用户的审批问题。"""action_requests = interrupt.value.get("action_requests", [])review_configs = interrupt.value.get("review_configs", [])allowed = (review_configs[0].get("allowed_decisions", ["approve", "reject"])if review_configselse ["approve", "reject"])lines = []for req in action_requests:lines.append("Do you approve me to execute this action?\n\n"f"- name: {req['name']}\n"f"- args: `{req['args']}`\n")lines.append(f"Input your decision: {', '.join(allowed)}\n")return "\n".join(lines)class AIAgent:# ... __init__, _init_deep_agent, _init_tools 省略 ...async def _has_pending_interrupt(self, config: RunnableConfig) -> bool:state = await self._agent.aget_state(config)return bool(state.next)# --- invoke 模式 ---async def ainvoke(self, query: str, config: RunnableConfig) -> str:if not self._agent:await self._init_deep_agent()# 优先处理中断恢复if await self._has_pending_interrupt(config):state = await self._agent.aget_state(config)actions_count = len(state.interrupts[0].value["action_requests"])decision = _parse_decision(query)cmd = _build_resume_command(decision, actions_count)await self._agent.ainvoke(cmd, config=config, version="v2")# 恢复后取最新消息state = await self._agent.aget_state(config)if state.values and "messages" in state.values:return _extract_text(state.values["messages"][-1])return "Oops, something went wrong."# 正常对话resp = await self._agent.ainvoke(input={"messages": [HumanMessage(content=query)]},config=config,version="v2",)if resp.interrupts:return _format_interrupt_question(resp.interrupts[0])return _extract_text(resp.value["messages"][-1])# --- stream 模式 ---async def astream(self, query: str, config: RunnableConfig):if not self._agent:await self._init_deep_agent()# 判断是中断恢复还是正常对话state = await self._agent.aget_state(config)if state.next:actions_count = len(state.interrupts[0].value["action_requests"])decision = _parse_decision(query)input_data = _build_resume_command(decision, actions_count)else:input_data = {"messages": [HumanMessage(content=query)]}async for chunk in self._agent.astream(input=input_data,stream_mode=["messages", "updates"],version="v2",config=config,):if chunk["type"] == "messages":msg, _meta = chunk["data"]if isinstance(msg, AIMessageChunk) and msg.content:yield _extract_text(msg)elif chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:yield _format_interrupt_question(chunk["data"]["__interrupt__"][0])

Chainlit 应用层(chainlit_app.py 核心部分)

@cl.on_message
async def main(msg: cl.Message):config = RunnableConfig(configurable={"thread_id": cl.context.session.id},)# stream 模式(推荐)final_answer = cl.Message(content="")async for chunk in ai_agent.astream(msg.content, config=config):await final_answer.stream_token(chunk)await final_answer.send()# 或者 invoke 模式# resp = await ai_agent.ainvoke(msg.content, config)# await cl.Message(resp).send()

Chainlit 应用层的代码非常简洁——因为中断检测和恢复逻辑全部封装在 AIAgent 内部了。Chainlit 只需要流式输出 astream() / ainvoke() 的返回结果即可。

交互效果

[用户]: 检查下系统负载[AI]: 🔧 正在调用工具: execute_shell_command...[AI]: Do you approve me to execute this action?- name: execute_shell_command- args: `{"command": "cat /proc/loadavg && free -h", "timeout": 10}`Input your decision: approve, reject[用户]: 批准[AI]: 当前系统负载: 0.52 0.38 0.25,内存总容量 62Gi,已用 10Gi,剩余 46Gi,系统运行正常。

进阶:使用 interrupt_on 的 when 谓词

如果不想拦截所有 shell 命令,只想拦截危险操作(如 rmdd、写入系统目录等),可以用 when 谓词按参数条件判断:

from langgraph.prebuilt.tool_node import ToolCallRequestdef is_dangerous_command(request: ToolCallRequest) -> bool:"""只拦截包含危险操作的命令。"""command = request.tool_call["args"].get("command", "")dangerous = {"rm ", "dd ", "mkfs", "shutdown", "reboot"}return any(d in command for d in dangerous)HumanInTheLoopMiddleware(interrupt_on={"execute_shell_command": {"allowed_decisions": ["approve", "reject"],"when": is_dangerous_command,   # 只在危险命令时拦截}}
)

when 谓词返回 True 才触发中断,返回 False 则自动批准。注意 when 需要 langchain >= 1.3.3

改进点

  • 目前 reject 时用的是固定消息,实际产品中可以让用户输入拒绝原因,方便 LLM 调整后续行为
  • 审批提示目前是纯文本,可以用 Chainlit 的 AskActionMessage 做成按钮交互(不过受制于 Chainlit Action 的 payload 类型限制,需要额外处理)
  • 如果有多个 tool 同时被拦截,action_requests 列表中会有多项,本文为简化只取了第一个,生产环境应遍历处理

补充

DeepAgents 的 HumanInTheLoopMiddleware 把之前需要手写的中断逻辑全部封装好了。集成到真实应用的关键只有三步:

  1. 创建 Agent 时:配置 interrupt_on 字典 + 确保有 checkpointer
  2. 每次调用前:通过 state.next 判断是正常对话还是中断恢复
  3. 恢复时:用 Command(resume={"decisions": [...]}) 传入用户决策

ainvokeastream 两种模式的核心逻辑一致,只是检测中断的方式不同(.interrupts 属性 vs updates 流中的 __interrupt__)。

http://www.jsqmd.com/news/1014519/

相关文章:

  • AI应用开发:基于知识图谱(Graphify)与 MCP 协议的低成本自主自动化测试方案
  • 【CANdelaStudio-从入门到深入到实战】12 安全访问(Security Access)——种子-密钥机制的工程实现
  • Python 高手编程系列三百三十六 :命名和使用
  • 终极指南:免费让老款Mac焕发新生,体验最新macOS系统
  • 2026广州变压器回收油浸vs干式差价与铜铁分离算价 - 广东再生资源回收
  • OpenGL透视投影实战:用glFrustum和gluLookAt在头歌平台搞定立方体三点透视
  • 构建可扩展的后端系统:架构设计的核心考量
  • MPC8280 SIU与中断控制器配置实战:从原理到稳定系统构建
  • 【CANdelaStudio-从入门到深入到实战】13 诊断会话控制:为什么ECU需要“多重人格”?
  • 2026年6月国内做得好的X-Ray智能点料机品牌推荐,AI自动插件机/波峰焊机,X-Ray智能点料机厂家口碑推荐 - 品牌推荐师
  • 2026免费音频转FLAC在线保姆级教程!无限制工具手把手教学,免费获得无损音乐格式 - 时时资讯
  • 影刀RPA新手教程_从手工到自动的思维转变RPA落地的五种心法
  • Matplotlib的AnnotationBbox太难用?手把手教你实现PyQt图表悬停提示与光标线(避坑指南)
  • 影刀RPA新手教程_魔法指令入门用自然语言生成自动化流程
  • 手机高效使用技巧实战指南
  • ISODATA vs K-Means:在ENVI CLASSIC里实战对比,到底该选哪个算法?
  • 087、GitHub Actions 集成:Pull Request 自动审查、Issue 自动分类与标签管理
  • 气象科研绘图进阶:用Cartopy和MetPy美化你的大气温度垂直廓线图
  • 2026免费音频变速在线保姆级教程!无限制工具手把手教学,0.5x慢速~2x快速随心调 - 时时资讯
  • 2026佛山中央空调回收拆机能卖多少5种机型残值对比 - 广东再生资源回收
  • 飞书接入智能体
  • Joy-Con Toolkit:开源手柄调试与个性化定制解决方案
  • SpringBoot项目从fastjson1.x升级到fastjson2.x,Redis序列化配置怎么改?(附完整代码)
  • 从内存困境到流畅体验:PCL2启动器的智能资源管理革命
  • 电脑新手必备:从装机到日常维护的实用指南
  • Java 8老系统SQL Agent实战:AI生成候选SQL,安全引擎拦截后再执行
  • 如何让2008年以后的旧款Mac安装最新macOS?OCLP-Mod终极指南
  • 【AI Daily】AI日报 2026-06-14
  • 惊了!原来论文可以这样省时间?2026降AIGC网站推荐合集
  • 心电图特征点检测系统Matlab程序含GUI2(设计源文件+万字报告+讲解)(支持资料、图片参考_降重降ai)