

























Human in the loop(人机协作)在企业级 Agent 应用中非常重要——AI 在执关键工具时必须经过人类审批,避免误操作影响业务。我之前用 LangGraph 0.3 裸写了一套(旧文),当时需要在 tool 函数里手动调 interrupt(),很啰嗦。如今有了 DeepAgents 和内置的 HumanInTheLoopMiddleware,只需配置一个 interrupt_on 字典,中断逻辑全自动——在执行前暂停图执行,保存状态到 checkpointer,等待人类决策后恢复。
不过官方文档的示例代码比较简单,只演示了基本用法,没有说明如何在真实应用中整合。本文以"允许 AI 执行 shell 命令,但每次执行前需用户确认"为需求,一步步实现完整的人机协作流程。
在开始之前,先理清几个关键概念:
| 概念 | 说明 |
|---|---|
| interrupt(中断) | 当 Agent 准备调用某个被监控的 tool 时,HumanInTheLoopMiddleware 调用 LangGraph 的 interrupt() 暂停图执行,并抛出包含 action_requests 和 review_configs 的请求 |
| checkpoint(检查点) | 中断时图状态会被持久化。必须配置 checkpointer,否则中断后无法恢复。生产环境建议用 AsyncPostgresSaver,测试用 InMemorySaver |
version="v2" |
LangGraph 1.0 的 v2 模式,ainvoke() 返回 GraphOutput 对象(含 .interrupts 属性),astream() 的 updates 流中会出现 __interrupt__ 事件 |
| Command(resume=) | 用户做出决策后,用 Command(resume={"decisions": [...]}) 从断点恢复执行 |
| Decision(决策) | 四种类型:approve(批准)、reject(拒绝并反馈)、edit(修改参数后执行)、respond(人类直接回答,跳过 tool 执行) |
用户提问 → Agent 调用 LLM 生成回复
→ LLM 决定调用 tool(如 execute_shell_command)
→ after_model 钩子:检查 tool 是否在 interrupt_on 中
→ 是:构建 HITLRequest → interrupt() → 暂停 ⌛
→ 否:继续执行
→ 人类做出决策(approve / reject / edit / respond)
→ 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回
以 Chainlit 聊天应用为交互载体,消息处理流程如下:
execute_shell_commandHumanInTheLoopMiddleware 检测到该 tool 在 interrupt_on 列表中,触发中断批准 / 拒绝,应用用 Command(resume=) 恢复执行首先需要在创建 Agent 时配置 HumanInTheLoopMiddleware:
from deepagents import create_deep_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
agent = create_deep_agent(
model=llm,
tools=[execute_shell_command],
checkpointer=checkpointer, # 必须配置!
system_prompt="你是一位智能助手...",
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
# 对 execute_shell_command 进行审批
"execute_shell_command": {
"allowed_decisions": ["approve", "reject"]
}
}
),
],
)
interrupt_on 是一个字典,key 为 tool 名称,value 的可配置项:
True — 允许所有四种决策(approve / edit / reject / respond)False — 不拦截该 tool(等同于不写){"allowed_decisions": [...]} — 只允许指定决策类型when 谓词按参数条件判断是否拦截、description 自定义中断提示文本v2 模式下的 ainvoke() 返回 GraphOutput 对象,可通过 .interrupts 属性直接获取中断数据,不需要去查 state。
resp = await agent.ainvoke(
input={"messages": [HumanMessage(content=query)]},
config=config,
version="v2",
)
if resp.interrupts:
# 存在中断,resp.interrupts 是 Interrupt 对象的元组
interrupt = resp.interrupts[0]
# interrupt.value 是 HITLRequest,包含 action_requests 和 review_configs
print(interrupt.value["action_requests"])
用户做出决策后,用 Command(resume=) 恢复:
from langgraph.types import Command
await agent.ainvoke(
Command(resume={
"decisions": [{"type": "approve"}] # 或 {"type": "reject", "message": "..."}
}),
config=config, # 必须用同一个 thread_id
version="v2",
)
在聊天应用中,用户发来的每条消息都走同一个 @cl.on_message 处理函数。用户说"检查负载"和回复"批准"都只是文本。解决方法是——调用前先检查是否有待处理的中断:
# 检查当前会话是否有待处理的中断
state = await agent.aget_state(config)
if state.next:
# 有待处理中断 → 本次消息是审批回复,构建 resume 命令
cmd = Command(resume={"decisions": [{"type": "approve"}]})
await agent.ainvoke(cmd, config=config, version="v2")
else:
# 无中断 → 正常对话
resp = await agent.ainvoke(
{"messages": [HumanMessage(content=query)]}, config=config, version="v2"
)
state.next 不为空表示图执行被暂停了(有中断等待处理)。
流式模式需要用 stream_mode=["messages", "updates"](官方推荐同时开启两种流):
messages 流:获取 LLM 的 token 级输出updates 流:检测中断事件 __interrupt__async for chunk in agent.astream(
input=input_data,
stream_mode=["messages", "updates"],
version="v2",
config=config,
):
if chunk["type"] == "messages":
msg, _meta = chunk["data"]
# msg 是 AIMessageChunk,包含 content 和 tool_calls
if isinstance(msg, AIMessageChunk) and msg.content:
yield extract_text(msg) # 流式输出文本
elif chunk["type"] == "updates":
if "__interrupt__" in chunk["data"]:
interrupt = chunk["data"]["__interrupt__"][0]
yield format_question(interrupt) # 输出审批问题
stream 模式的恢复与 invoke 类似——在调用 astream() 之前同样要先检查 state.next 来判断是正常对话还是中断恢复。
下面是核心代码。
checkpointer和llm的配置函数、日志模块等非核心代码省略。
internal/agent/agent.py 核心部分)# --- 审批关键词匹配 ---
_APPROVE_KEYWORDS = frozenset(
{"yes", "accept", "approve", "ok", "是", "允许", "同意", "批准"}
)
def _parse_decision(query: str) -> str:
return "approve" if query.strip().lower() in _APPROVE_KEYWORDS else "reject"
def _build_resume_command(decision_type: str, actions_count: int) -> Command:
item = {"type": decision_type}
if decision_type == "reject":
item["message"] = "user rejected this action"
return Command(resume={"decisions": [item for _ in range(actions_count)]})
def _extract_text(message) -> str:
"""从消息中提取纯文本(兼容 str 和 list[dict] 两种 content 格式)。"""
if not message or not hasattr(message, "content"):
return ""
content = message.content
if isinstance(content, str):
return content
if isinstance(content, list):
return "".join(
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
)
return ""
def _format_interrupt_question(interrupt) -> str:
"""将中断数据格式化为用户的审批问题。"""
action_requests = interrupt.value.get("action_requests", [])
review_configs = interrupt.value.get("review_configs", [])
allowed = (
review_configs[0].get("allowed_decisions", ["approve", "reject"])
if review_configs
else ["approve", "reject"]
)
lines = []
for req in action_requests:
lines.append(
"Do you approve me to execute this action?\n\n"
f"- name: {req['name']}\n"
f"- args: `{req['args']}`\n"
)
lines.append(f"Input your decision: {', '.join(allowed)}\n")
return "\n".join(lines)
class AIAgent:
# ... __init__, _init_deep_agent, _init_tools 省略 ...
async def _has_pending_interrupt(self, config: RunnableConfig) -> bool:
state = await self._agent.aget_state(config)
return bool(state.next)
# --- invoke 模式 ---
async def ainvoke(self, query: str, config: RunnableConfig) -> str:
if not self._agent:
await self._init_deep_agent()
# 优先处理中断恢复
if await self._has_pending_interrupt(config):
state = await self._agent.aget_state(config)
actions_count = len(
state.interrupts[0].value["action_requests"]
)
decision = _parse_decision(query)
cmd = _build_resume_command(decision, actions_count)
await self._agent.ainvoke(cmd, config=config, version="v2")
# 恢复后取最新消息
state = await self._agent.aget_state(config)
if state.values and "messages" in state.values:
return _extract_text(state.values["messages"][-1])
return "Oops, something went wrong."
# 正常对话
resp = await self._agent.ainvoke(
input={"messages": [HumanMessage(content=query)]},
config=config,
version="v2",
)
if resp.interrupts:
return _format_interrupt_question(resp.interrupts[0])
return _extract_text(resp.value["messages"][-1])
# --- stream 模式 ---
async def astream(self, query: str, config: RunnableConfig):
if not self._agent:
await self._init_deep_agent()
# 判断是中断恢复还是正常对话
state = await self._agent.aget_state(config)
if state.next:
actions_count = len(
state.interrupts[0].value["action_requests"]
)
decision = _parse_decision(query)
input_data = _build_resume_command(decision, actions_count)
else:
input_data = {"messages": [HumanMessage(content=query)]}
async for chunk in self._agent.astream(
input=input_data,
stream_mode=["messages", "updates"],
version="v2",
config=config,
):
if chunk["type"] == "messages":
msg, _meta = chunk["data"]
if isinstance(msg, AIMessageChunk) and msg.content:
yield _extract_text(msg)
elif chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:
yield _format_interrupt_question(
chunk["data"]["__interrupt__"][0]
)
chainlit_app.py 核心部分)@cl.on_message
async def main(msg: cl.Message):
config = RunnableConfig(
configurable={"thread_id": cl.context.session.id},
)
# stream 模式(推荐)
final_answer = cl.Message(content="")
async for chunk in ai_agent.astream(msg.content, config=config):
await final_answer.stream_token(chunk)
await final_answer.send()
# 或者 invoke 模式
# resp = await ai_agent.ainvoke(msg.content, config)
# await cl.Message(resp).send()
Chainlit 应用层的代码非常简洁——因为中断检测和恢复逻辑全部封装在 AIAgent 内部了。Chainlit 只需要流式输出 astream() / ainvoke() 的返回结果即可。
[用户]: 检查下系统负载
[AI]: 🔧 正在调用工具: execute_shell_command...
[AI]: Do you approve me to execute this action?
- name: execute_shell_command
- args: `{"command": "cat /proc/loadavg && free -h", "timeout": 10}`
Input your decision: approve, reject
[用户]: 批准
[AI]: 当前系统负载: 0.52 0.38 0.25,
内存总容量 62Gi,已用 10Gi,剩余 46Gi,系统运行正常。
如果不想拦截所有 shell 命令,只想拦截危险操作(如 rm、dd、写入系统目录等),可以用 when 谓词按参数条件判断:
from langgraph.prebuilt.tool_node import ToolCallRequest
def is_dangerous_command(request: ToolCallRequest) -> bool:
"""只拦截包含危险操作的命令。"""
command = request.tool_call["args"].get("command", "")
dangerous = {"rm ", "dd ", "mkfs", "shutdown", "reboot"}
return any(d in command for d in dangerous)
HumanInTheLoopMiddleware(
interrupt_on={
"execute_shell_command": {
"allowed_decisions": ["approve", "reject"],
"when": is_dangerous_command, # 只在危险命令时拦截
}
}
)
when 谓词返回 True 才触发中断,返回 False 则自动批准。注意 when 需要 langchain >= 1.3.3。
AskActionMessage 做成按钮交互(不过受制于 Chainlit Action 的 payload 类型限制,需要额外处理)action_requests 列表中会有多项,本文为简化只取了第一个,生产环境应遍历处理DeepAgents 的 HumanInTheLoopMiddleware 把之前需要手写的中断逻辑全部封装好了。集成到真实应用的关键只有三步:
interrupt_on 字典 + 确保有 checkpointerstate.next 判断是正常对话还是中断恢复Command(resume={"decisions": [...]}) 传入用户决策ainvoke 和 astream 两种模式的核心逻辑一致,只是检测中断的方式不同(.interrupts 属性 vs updates 流中的 __interrupt__)。
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。