惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Recorded Future
Recorded Future
Recent Commits to openclaw:main
Recent Commits to openclaw:main
D
DataBreaches.Net
月光博客
月光博客
I
InfoQ
Microsoft Azure Blog
Microsoft Azure Blog
Security Latest
Security Latest
S
SegmentFault 最新的问题
K
Kaspersky official blog
T
Threat Research - Cisco Blogs
V
V2EX
GbyAI
GbyAI
The Last Watchdog
The Last Watchdog
Cyberwarzone
Cyberwarzone
Project Zero
Project Zero
J
Java Code Geeks
Schneier on Security
Schneier on Security
Attack and Defense Labs
Attack and Defense Labs
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
G
Google Developers Blog
WordPress大学
WordPress大学
B
Blog RSS Feed
Hugging Face - Blog
Hugging Face - Blog
H
Help Net Security
MongoDB | Blog
MongoDB | Blog
B
Blog
www.infosecurity-magazine.com
www.infosecurity-magazine.com
博客园 - Franky
Spread Privacy
Spread Privacy
C
CERT Recently Published Vulnerability Notes
H
Hackread – Cybersecurity News, Data Breaches, AI and More
Know Your Adversary
Know Your Adversary
Recent Announcements
Recent Announcements
The GitHub Blog
The GitHub Blog
C
Check Point Blog
Latest news
Latest news
W
WeLiveSecurity
小众软件
小众软件
H
Hacker News: Front Page
P
Privacy & Cybersecurity Law Blog
H
Heimdal Security Blog
博客园 - 叶小钗
N
News | PayPal Newsroom
IT之家
IT之家
P
Proofpoint News Feed
Apple Machine Learning Research
Apple Machine Learning Research
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
T
Tenable Blog
腾讯CDC
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com

博客园_首页

Linux实操--组管理、权限管理和定时任务 - NE_STOP Java + EasyExcel 实现单个接口导出多个Excel - LucaJu Mem0 源码解析系列(二):提示词工程的深度剖析 TaskFlow究竟是什么?和普通Skills技能有什么区别 - Winton-H 图论证明题 - 洛苡hh 嘉立创开源:应该是全网MicroPython教程最多的开发板 - FreakStudio Hermes Agent 集成实践:从协议到生产 - Newbe36524 2026年AI编程工具横评:Cursor、Codex、Claude Code、Zed、Windsurf Java程序员必看的RAG入门教程 - 苏三说技术 2026 AI效率神器:Superpowers + Claude Code 保姆级教程 - 狂师 本地大模型部署全攻略:从 0 到 1 玩转 Ollama 【从0到1构建一个ClaudeAgent】内存管理-上下文压缩 - 程序员Seven .NET 高级开发 | 设计、实现一个事件总线框架 - 痴者工良 电子小白入门之NE555 3. WorkBuddy:隐藏玩法,一键召唤专家,让 AI 以"专家身份"给你干活 - 岳小哥AI 和AI一起搞事情#3:Claude Teammate 游戏开发翻车实录 - 风雨中的小七 【OpenClaw】通过 Nanobot 源码学习架构---(7)Memory - 罗西的思考 C# .NET 周刊|2026年3月3期 - InCerry 我在 Debian 11 上把 K8s 单机搭起来了,过程没你想的那么顺(/opt 目录版) - 陌陌卡上 深度学习进阶(七)Data-efficient Image Transformer - 哥布林学者 CLI+Skill搭建浏览器AI自动化框架,告别一切重复枯燥任务 - 技术爬爬虾 告别Token账单无底洞:OpenClaw本地部署,重塑企业数据主权的唯一解 - 清风915938629 FastAPI+Vue:文件分片上传+秒传+断点续传,这坑我帮你踩平了! - 一名程序媛呀 SBTI 爆火后,我做了个程序员版的 CBTI。。已开源 + 附开发过程 - 程序员鱼皮 多模态检索开始进入工程期:用 Sentence Transformers 搭建可落地的 Multimodal RAG 100多行代码实现一个最简单的Agent(用ReAct) Claude Code 通关手册(八):推荐 5 个 Hooks,代码质量提升 3 倍 - 暮色之狐 老板:“有人截图了!”。安全部门:“收到,马上查暗水印!” - why技术 技术之外,皆是人间 C#/.NET/.NET Core技术前沿周刊 | 第 69 期(2026年4.01-4.12) - 追逐时光者 Snack JSONPath 项目架构分析 - 带刺的坐椅 Claude Code Buddy 小析:一个非核心功能,如何体现产品的细节完成度 - 葡萄城技术团队 AI新时代下的图床管理方案-Cloudflare图床+MCP+Skills方案指南 - bingo彬哥 化繁为简:顺丰速运App如何通过 HarmonyOS SDK实现专业级空间测量 - HarmonyOS_SDK 从零实现富文本编辑器#13-React非编辑节点的内容渲染 - WindRunnerMax AI开发-python-langchain框架(3-23-OpenAI Functions风格Tool Calling智能助手) - 万笑佛 .NET + AI 进阶实战:基于类的技能开发 - 打造可治理的 Agent 能力模块 - NetCoreKevin 【从0到1构建一个ClaudeAgent】规划与协调-技能 - 程序员Seven 上周热点回顾(4.6-4.12) - 博客园团队 电子小白的工具三件套:面包板、杜邦线、万能板 单表五亿数据的查询优化 | Mysql、StarRocks - 痴者工良 WorkBuddy:从“我是谁”到“帮我干活” - 岳小哥AI C# 如何减少代码运行时间:7 个实战技巧 - 码农刚子 基于HelixToolkit.SharpDX 渲染3D模型 - 笺上知微 从零开始的双臂具身VLA起源及现阶段发展综述 - SkyXZ 记对 xonsh shell 的使用, 脚本编写, 迁移及调优 - pluvium27 受够了Vibe Coding的失控?换个起点,让AI事半功倍 从开始配置漏洞环境到漏洞复现流程 - 難しい 关于10年工作经验的程序员对OpenClaw的实战经验分享以及看法 - 虚无境 Any metadata 的内存布局 C# .NET 周刊|2026年3月2期 - InCerry 我帮你测过了,测试圈排名第二的 Skill 依然很牛逼 Skill Discovery | 无监督技能发现的经典工作总结 - MoonOut PbootCMS 网站内容数量多导致访问慢?这些实用优化方案帮你提速! - 家兴网络技术工作室 上下文工程是什么?过时了么?一文讲明白! - 一枫说码 网站漏洞怎么发现并修复?一篇实用指南(附完整流程) - 家兴网络技术工作室 开了 TUN 模式还是直连?90% 的人都踩过这个坑 Github日报|2026年04月12日 - AI一族 AScript扩展多种脚本语言 - rockey627 AI 学习笔记:Agent 的记忆机制 你能被装进一个文件里吗?——7 万人把同事"蒸馏"成了 AI - 我没有三颗心脏 Claude Code 通关手册(七):给 AI 装上技能包——Skills 完全指南 - 暮色之狐 在浏览器中快速编辑代码:VSCode Web 集成实践 - Newbe36524 蒸馏自己 skill?基于 Deepseek 的蒸馏器,丐版蒸馏方式,简单便捷 - To_Carpe_Diem Spring AI Aliababa和AgentScope,哪个更好? - 苏三说技术 Etsy 把 1000 个 MySQL 分片迁进 Vitess:425TB 数据背后的真正问题不是性能,而是运维规模 MicroPython LVGL基础知识和概念:底层渲染与性能优化 - FreakStudio 数据库草图算法 Python 潮流周刊#146:CPython 引入 Rust 的进展 - 豌豆花下猫 最小生成树 - mofei1116 红日靶场七:从外网入口、容器逃逸到 AD 接管的完整利用链复盘 - YouDiscovered1t 分享四款开源且实用的 Kafka 管理工具 - 追逐时光者 vLLM 权重加载机制全解析:从挑战到理想架构 LCT 学习笔记 - ACehomoxue Avalonia UI 12.0.0 正式发布:架构演进和性能飞跃 - 张善友 当 AI Agent 把调用链拉长,延迟开始成为一门生意 conhost.exe 无法显示 U+2717 - 145a 太秀了,我把自己蒸馏成了 Skill!已开源 - 程序员鱼皮 ASP.NET Core 内存缓存实战:一篇搞懂该怎么配、怎么避坑 基于 Ghostty 带有分割标签页和为 Claude 编程设计的通知终端 - BugShare AI 焊死入口:教育的“操作系统级”重塑 - 郝hai 初级Java开发工程师使用sql脚本编写代码的过程是简单而且不糊涂 - CoderOilStation Claude Code通关手册(六):MCP协议完全指南 - 暮色之狐 边框灯光环绕动画特效实现指南 - Newbe36524 开源:子木蒸馏版的 SEO 审计工具 seo-audit-skill v1.0 我所理解的Python元模型 【从0到1构建一个ClaudeAgent】规划与协调-TodoWrite - 程序员Seven Claude 和 Codex 在审计 Skill 上性能差异探究 - ACai_sec AScript如何实现中文脚本引擎 - rockey627 【渗透测试】HTB Season10 Garfield 全过程wp - dynasty_chenzi Android 开发者为什么必须掌握 AI 能力?端侧视角下的技术变革 树状数组正确性证明 - AC-wyr 你的 AI 焦虑,可能比 AI 本身更危险——ATM 机没有消灭银行柜员,但恐慌消灭了你的判断力 - 我没有三颗心脏 一个拉胯的分库分表方案有多绝望?整个部门都在救火! - 冰河团队 动态规划入门必学之走方格问题 - Ofnoname PostgREST 与 PostgreSQL 角色权限配置全解析(生产级实践) - SheepDog1998 使用 UEFI 图形输出协议 GOP 在屏幕上显示图像的方法 - 阿源- Claude Code通关手册(五):组建你的AI专家团队,子代理系统 - 暮色之狐 一个程序员到架构师的催婚路之感悟(整整10年后的催婚相亲感悟) - MisterLip 用 Agent Skill 自动生成工作周报 - 赵康
DeepAgents - Human in the loop
花酒锄作田 · 2026-06-14 · via 博客园_首页

前言

Human in the loop(人机协作)在企业级 Agent 应用中非常重要——AI 在执关键工具时必须经过人类审批,避免误操作影响业务。我之前用 LangGraph 0.3 裸写了一套(旧文),当时需要在 tool 函数里手动调 interrupt(),很啰嗦。如今有了 DeepAgents 和内置的 HumanInTheLoopMiddleware,只需配置一个 interrupt_on 字典,中断逻辑全自动——在执行前暂停图执行,保存状态到 checkpointer,等待人类决策后恢复。

不过官方文档的示例代码比较简单,只演示了基本用法,没有说明如何在真实应用中整合。本文以"允许 AI 执行 shell 命令,但每次执行前需用户确认"为需求,一步步实现完整的人机协作流程。

核心概念

在开始之前,先理清几个关键概念:

概念 说明
interrupt(中断) 当 Agent 准备调用某个被监控的 tool 时,HumanInTheLoopMiddleware 调用 LangGraph 的 interrupt() 暂停图执行,并抛出包含 action_requestsreview_configs 的请求
checkpoint(检查点) 中断时图状态会被持久化。必须配置 checkpointer,否则中断后无法恢复。生产环境建议用 AsyncPostgresSaver,测试用 InMemorySaver
version="v2" LangGraph 1.0 的 v2 模式,ainvoke() 返回 GraphOutput 对象(含 .interrupts 属性),astream()updates 流中会出现 __interrupt__ 事件
Command(resume=) 用户做出决策后,用 Command(resume={"decisions": [...]}) 从断点恢复执行
Decision(决策) 四种类型:approve(批准)、reject(拒绝并反馈)、edit(修改参数后执行)、respond(人类直接回答,跳过 tool 执行)

执行生命周期

用户提问 → Agent 调用 LLM 生成回复
  → LLM 决定调用 tool(如 execute_shell_command)
    → after_model 钩子:检查 tool 是否在 interrupt_on 中
      → 是:构建 HITLRequest → interrupt() → 暂停 ⌛
      → 否:继续执行
  → 人类做出决策(approve / reject / edit / respond)
    → 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回

流程逻辑

以 Chainlit 聊天应用为交互载体,消息处理流程如下:

  1. 用户在聊天页面发送消息(如"检查下系统负载")
  2. Agent 调用 LLM 生成回复,LLM 决定调用 execute_shell_command
  3. HumanInTheLoopMiddleware 检测到该 tool 在 interrupt_on 列表中,触发中断
  4. Chainlit 应用检测到中断,向用户展示审批提示
  5. 用户回复 批准 / 拒绝,应用用 Command(resume=) 恢复执行
  6. Agent 根据决策执行或拒绝 tool,最终返回结果给用户

配置中断

首先需要在创建 Agent 时配置 HumanInTheLoopMiddleware

from deepagents import create_deep_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware

agent = create_deep_agent(
    model=llm,
    tools=[execute_shell_command],
    checkpointer=checkpointer,  # 必须配置!
    system_prompt="你是一位智能助手...",
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                # 对 execute_shell_command 进行审批
                "execute_shell_command": {
                    "allowed_decisions": ["approve", "reject"]
                }
            }
        ),
    ],
)

interrupt_on 是一个字典,key 为 tool 名称,value 的可配置项:

  • True — 允许所有四种决策(approve / edit / reject / respond)
  • False — 不拦截该 tool(等同于不写)
  • {"allowed_decisions": [...]} — 只允许指定决策类型
  • 还可以配置 when 谓词按参数条件判断是否拦截、description 自定义中断提示文本

invoke 模式中的实现

v2 模式下的 ainvoke() 返回 GraphOutput 对象,可通过 .interrupts 属性直接获取中断数据,不需要去查 state。

检测中断

resp = await agent.ainvoke(
    input={"messages": [HumanMessage(content=query)]},
    config=config,
    version="v2",
)

if resp.interrupts:
    # 存在中断,resp.interrupts 是 Interrupt 对象的元组
    interrupt = resp.interrupts[0]
    # interrupt.value 是 HITLRequest,包含 action_requests 和 review_configs
    print(interrupt.value["action_requests"])

恢复中断

用户做出决策后,用 Command(resume=) 恢复:

from langgraph.types import Command

await agent.ainvoke(
    Command(resume={
        "decisions": [{"type": "approve"}]  # 或 {"type": "reject", "message": "..."}
    }),
    config=config,  # 必须用同一个 thread_id
    version="v2",
)

关键:如何分辨"新消息"还是"中断恢复"

在聊天应用中,用户发来的每条消息都走同一个 @cl.on_message 处理函数。用户说"检查负载"和回复"批准"都只是文本。解决方法是——调用前先检查是否有待处理的中断

# 检查当前会话是否有待处理的中断
state = await agent.aget_state(config)
if state.next:
    # 有待处理中断 → 本次消息是审批回复,构建 resume 命令
    cmd = Command(resume={"decisions": [{"type": "approve"}]})
    await agent.ainvoke(cmd, config=config, version="v2")
else:
    # 无中断 → 正常对话
    resp = await agent.ainvoke(
        {"messages": [HumanMessage(content=query)]}, config=config, version="v2"
    )

state.next 不为空表示图执行被暂停了(有中断等待处理)。

stream 模式中的实现

流式模式需要用 stream_mode=["messages", "updates"](官方推荐同时开启两种流):

  • messages 流:获取 LLM 的 token 级输出
  • updates 流:检测中断事件 __interrupt__
async for chunk in agent.astream(
    input=input_data,
    stream_mode=["messages", "updates"],
    version="v2",
    config=config,
):
    if chunk["type"] == "messages":
        msg, _meta = chunk["data"]
        # msg 是 AIMessageChunk,包含 content 和 tool_calls
        if isinstance(msg, AIMessageChunk) and msg.content:
            yield extract_text(msg)     # 流式输出文本
    elif chunk["type"] == "updates":
        if "__interrupt__" in chunk["data"]:
            interrupt = chunk["data"]["__interrupt__"][0]
            yield format_question(interrupt)  # 输出审批问题

stream 模式的恢复与 invoke 类似——在调用 astream() 之前同样要先检查 state.next 来判断是正常对话还是中断恢复。

完整示例

下面是核心代码。checkpointerllm 的配置函数、日志模块等非核心代码省略。

Agent 封装(internal/agent/agent.py 核心部分)

# --- 审批关键词匹配 ---
_APPROVE_KEYWORDS = frozenset(
    {"yes", "accept", "approve", "ok", "是", "允许", "同意", "批准"}
)

def _parse_decision(query: str) -> str:
    return "approve" if query.strip().lower() in _APPROVE_KEYWORDS else "reject"

def _build_resume_command(decision_type: str, actions_count: int) -> Command:
    item = {"type": decision_type}
    if decision_type == "reject":
        item["message"] = "user rejected this action"
    return Command(resume={"decisions": [item for _ in range(actions_count)]})

def _extract_text(message) -> str:
    """从消息中提取纯文本(兼容 str 和 list[dict] 两种 content 格式)。"""
    if not message or not hasattr(message, "content"):
        return ""
    content = message.content
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        return "".join(
            b.get("text", "")
            for b in content
            if isinstance(b, dict) and b.get("type") == "text"
        )
    return ""

def _format_interrupt_question(interrupt) -> str:
    """将中断数据格式化为用户的审批问题。"""
    action_requests = interrupt.value.get("action_requests", [])
    review_configs = interrupt.value.get("review_configs", [])
    allowed = (
        review_configs[0].get("allowed_decisions", ["approve", "reject"])
        if review_configs
        else ["approve", "reject"]
    )

    lines = []
    for req in action_requests:
        lines.append(
            "Do you approve me to execute this action?\n\n"
            f"- name: {req['name']}\n"
            f"- args: `{req['args']}`\n"
        )
    lines.append(f"Input your decision: {', '.join(allowed)}\n")
    return "\n".join(lines)


class AIAgent:
    # ... __init__, _init_deep_agent, _init_tools 省略 ...

    async def _has_pending_interrupt(self, config: RunnableConfig) -> bool:
        state = await self._agent.aget_state(config)
        return bool(state.next)

    # --- invoke 模式 ---
    async def ainvoke(self, query: str, config: RunnableConfig) -> str:
        if not self._agent:
            await self._init_deep_agent()

        # 优先处理中断恢复
        if await self._has_pending_interrupt(config):
            state = await self._agent.aget_state(config)
            actions_count = len(
                state.interrupts[0].value["action_requests"]
            )
            decision = _parse_decision(query)
            cmd = _build_resume_command(decision, actions_count)
            await self._agent.ainvoke(cmd, config=config, version="v2")

            # 恢复后取最新消息
            state = await self._agent.aget_state(config)
            if state.values and "messages" in state.values:
                return _extract_text(state.values["messages"][-1])
            return "Oops, something went wrong."

        # 正常对话
        resp = await self._agent.ainvoke(
            input={"messages": [HumanMessage(content=query)]},
            config=config,
            version="v2",
        )
        if resp.interrupts:
            return _format_interrupt_question(resp.interrupts[0])
        return _extract_text(resp.value["messages"][-1])

    # --- stream 模式 ---
    async def astream(self, query: str, config: RunnableConfig):
        if not self._agent:
            await self._init_deep_agent()

        # 判断是中断恢复还是正常对话
        state = await self._agent.aget_state(config)
        if state.next:
            actions_count = len(
                state.interrupts[0].value["action_requests"]
            )
            decision = _parse_decision(query)
            input_data = _build_resume_command(decision, actions_count)
        else:
            input_data = {"messages": [HumanMessage(content=query)]}

        async for chunk in self._agent.astream(
            input=input_data,
            stream_mode=["messages", "updates"],
            version="v2",
            config=config,
        ):
            if chunk["type"] == "messages":
                msg, _meta = chunk["data"]
                if isinstance(msg, AIMessageChunk) and msg.content:
                    yield _extract_text(msg)
            elif chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:
                yield _format_interrupt_question(
                    chunk["data"]["__interrupt__"][0]
                )

Chainlit 应用层(chainlit_app.py 核心部分)

@cl.on_message
async def main(msg: cl.Message):
    config = RunnableConfig(
        configurable={"thread_id": cl.context.session.id},
    )

    # stream 模式(推荐)
    final_answer = cl.Message(content="")
    async for chunk in ai_agent.astream(msg.content, config=config):
        await final_answer.stream_token(chunk)
    await final_answer.send()

    # 或者 invoke 模式
    # resp = await ai_agent.ainvoke(msg.content, config)
    # await cl.Message(resp).send()

Chainlit 应用层的代码非常简洁——因为中断检测和恢复逻辑全部封装在 AIAgent 内部了。Chainlit 只需要流式输出 astream() / ainvoke() 的返回结果即可。

交互效果

[用户]: 检查下系统负载

[AI]: 🔧 正在调用工具: execute_shell_command...

[AI]: Do you approve me to execute this action?

       - name: execute_shell_command
       - args: `{"command": "cat /proc/loadavg && free -h", "timeout": 10}`

       Input your decision: approve, reject

[用户]: 批准

[AI]: 当前系统负载: 0.52 0.38 0.25,
       内存总容量 62Gi,已用 10Gi,剩余 46Gi,系统运行正常。

进阶:使用 interrupt_on 的 when 谓词

如果不想拦截所有 shell 命令,只想拦截危险操作(如 rmdd、写入系统目录等),可以用 when 谓词按参数条件判断:

from langgraph.prebuilt.tool_node import ToolCallRequest

def is_dangerous_command(request: ToolCallRequest) -> bool:
    """只拦截包含危险操作的命令。"""
    command = request.tool_call["args"].get("command", "")
    dangerous = {"rm ", "dd ", "mkfs", "shutdown", "reboot"}
    return any(d in command for d in dangerous)

HumanInTheLoopMiddleware(
    interrupt_on={
        "execute_shell_command": {
            "allowed_decisions": ["approve", "reject"],
            "when": is_dangerous_command,   # 只在危险命令时拦截
        }
    }
)

when 谓词返回 True 才触发中断,返回 False 则自动批准。注意 when 需要 langchain >= 1.3.3

改进点

  • 目前 reject 时用的是固定消息,实际产品中可以让用户输入拒绝原因,方便 LLM 调整后续行为
  • 审批提示目前是纯文本,可以用 Chainlit 的 AskActionMessage 做成按钮交互(不过受制于 Chainlit Action 的 payload 类型限制,需要额外处理)
  • 如果有多个 tool 同时被拦截,action_requests 列表中会有多项,本文为简化只取了第一个,生产环境应遍历处理

补充

DeepAgents 的 HumanInTheLoopMiddleware 把之前需要手写的中断逻辑全部封装好了。集成到真实应用的关键只有三步:

  1. 创建 Agent 时:配置 interrupt_on 字典 + 确保有 checkpointer
  2. 每次调用前:通过 state.next 判断是正常对话还是中断恢复
  3. 恢复时:用 Command(resume={"decisions": [...]}) 传入用户决策

ainvokeastream 两种模式的核心逻辑一致,只是检测中断的方式不同(.interrupts 属性 vs updates 流中的 __interrupt__)。