惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
The Hacker News
The Hacker News
P
Palo Alto Networks Blog
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
T
Tor Project blog
T
Troy Hunt's Blog
Microsoft Azure Blog
Microsoft Azure Blog
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
Last Week in AI
Last Week in AI
Hacker News - Newest:
Hacker News - Newest: "LLM"
D
Docker
博客园 - 三生石上(FineUI控件)
量子位
腾讯CDC
www.infosecurity-magazine.com
www.infosecurity-magazine.com
Cyberwarzone
Cyberwarzone
博客园 - 【当耐特】
Recent Announcements
Recent Announcements
M
MIT News - Artificial intelligence
Recorded Future
Recorded Future
G
GRAHAM CLULEY
P
Privacy & Cybersecurity Law Blog
T
Threat Research - Cisco Blogs
GbyAI
GbyAI
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Google DeepMind News
Google DeepMind News
Simon Willison's Weblog
Simon Willison's Weblog
Cloudbric
Cloudbric
Project Zero
Project Zero
SecWiki News
SecWiki News
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
W
WeLiveSecurity
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
Latest news
Latest news
Schneier on Security
Schneier on Security
小众软件
小众软件
U
Unit 42
Y
Y Combinator Blog
Help Net Security
Help Net Security
Vercel News
Vercel News
月光博客
月光博客
WordPress大学
WordPress大学
C
CERT Recently Published Vulnerability Notes
Google Online Security Blog
Google Online Security Blog
T
Tenable Blog
C
Check Point Blog
MongoDB | Blog
MongoDB | Blog
N
Netflix TechBlog - Medium
Blog — PlanetScale
Blog — PlanetScale

博客园 - 我才是银古

第16章:常见问题、排错与最佳实践 第15章:扩展生态、MCAD 与外部集成 第12章:实战案例:机械结构与 3D 打印零件 第14章:构建、测试、调试与贡献流程 第13章:OpenSCAD 源码架构与核心执行流程 第11章:预览、渲染、网格精度与性能优化 第09章:列表推导、递归与算法建模 第08章:参数化零件库与复用设计 第10章:导入导出、命令行与自动化 第06章:CSG 布尔建模方法 第07章:二维图形、拉伸、旋转与投影 第05章:基础几何、坐标系与变换 第04章:参数、变量、函数、模块与作用域 OpenSCAD 教程目录 第03章:OpenSCAD 语言基础 第02章:安装、环境配置与开发工作流 第01章:OpenSCAD 项目全景与学习路线 第02章:源码获取、编译与开发环境配置 第01章:OCCT项目全景与学习路线 第18章:二次开发实战与综合案例 第18章:综合实战案例 第17章:数据交换与协同 第16章:源码架构与二次开发 第15章:插件与自定义工作台开发 第14章:Python脚本宏与自动化 第13章:FEM仿真分析 第12章:CAM数控加工 第11章:SurfaceMesh与逆向工程 第10章:Draft二维绘图与BIM建筑 第09章:工程图TechDraw 第07章:参数化表达式与Spreadsheet 第08章:装配设计Assembly 第06章:Part工作台与几何内核 第05章:PartDesign实体特征建模 第04章:草图Sketcher约束建模 第02章:安装版本与工作环境配置 第03章:界面工作台与基础操作 第01章:项目全景与学习路线 第十二章:插件开发、研究功能与最佳实践 第十章:定时任务与自动化(Cron) 第七章:技能、记忆与自学习闭环 第八章:MCP 集成与上下文文件 第六章:工具系统与终端后端 第五章:模型供应商与配置体系 Hermes Agent 教程目录 第十一章:语音、视觉、浏览器与子代理协作 第四章:CLI/TUI 与会话管理 第十二章:学习路线、实战方案与最佳实践 第十一章:源码结构、开发调试与插件开发 第十章:自动化、远程访问、日志与排障 第九章:Control UI、节点、Canvas 与语音能力 第七章:工具、技能、插件与能力扩展 第八章:安全模型、访问控制与沙箱实践 第六章:Agent 工作区、会话与多智能体路由 第五章:多通道消息接入与聊天平台配置 第四章:配置体系、模型接入与认证管理 第三章:Gateway 架构、协议与运行机制 第二章:安装、环境准备与快速上手 第一章:OpenClaw 项目概览与核心定位 oh-my-openagent 教程目录 09-命令模型回退与配置参考 10-实战案例最佳实践与故障排除 05-工作模式-Ultrawork-Prometheus-Atlas 08-Hooks与MCP系统 06-Category与Skill系统 07-核心工具链 04-智能体全景详解 03-安装与环境配置 02-整体架构与多模型编排机制 01-项目简介与核心理念 01-项目概览与学习路线 02-安装部署与工具适配 03-Skill机制与using-superpowers 05-TDD系统化调试与完成前验证 04-需求澄清方案设计与计划编写 07-并行智能体子智能体与Git-Worktree 第六章:代码审查、反馈处理与分支收尾 08-中国特色Skills与本土团队落地 09-MCP构建工作流执行与自定义Skill 第23章:FreeCAD-Python-API Clipper2 C# 源码解读教程 第19章:PolyTree 多边形树结构 第20章:实际应用与最佳实践 第18章:Minkowski 和与差 第17章:RectClip 矩形裁剪优化 第16章:ClipperOffset 偏移类详解 第15章:填充规则详解 第14章:布尔运算执行流程 第13章:ClipperD 浮点裁剪类 第11章:OutRec 与 OutPt 输出结构 第9章:Active 活动边结构 第10章:Vertex 顶点与 LocalMinima 局部极小值 第12章:Clipper64 裁剪类详解 第7章:高精度运算与128位整数 第8章:ClipperBase 基类详解 第5章:枚举类型与常量定义 第6章:InternalClipper 内部工具类 第2章:核心数据结构 - Point64、PointD 第3章:路径与多边形表示 - Path64、PathD、Paths64、PathsD 第4章:矩形边界 - Rect64、RectD
第十三章:流水线引擎深度解析
我才是银古 · 2026-06-22 · via 博客园 - 我才是银古

第十三章:流水线引擎深度解析

GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。


13.1 引擎整体流程

执行一个 YAML 流水线涉及四个阶段:

YAML 文件
    ↓ parser.py
PipelineDefinition(内存模型)
    ↓ validator.py
校验通过(或抛出 PipelineValidationError)
    ↓ executor.py + context.py
执行所有步骤,收集结果
    ↓ reporter.py
JSON 执行报告

13.2 parser.py:YAML 解析

职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。

核心函数

def parse_yaml(source: str | Path) -> PipelineDefinition:
    raw = _load_yaml(source)       # 读取文件或字符串 → dict
    return _build_pipeline(raw)    # 构建 PipelineDefinition

关键解析规则

  1. 顶层 pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.")
  2. steps 必须是非空列表pipeline.steps 为空时报错
  3. 每个步骤必须有 iduse:缺少时报错并指明步骤索引
  4. on_error 默认值为 "fail":解析时自动填充

解析的数据结构

@dataclass
class PipelineDefinition:
    name: str
    steps: list[StepDefinition]
    description: str = ""
    crs: str | None = None
    variables: dict = {}
    outputs: dict = {}

@dataclass
class StepDefinition:
    id: str
    use: str
    params: dict = {}
    when: str | None = None
    on_error: str = "fail"  # "fail" | "skip" | "retry"
    backend: str | None = None

13.3 validator.py:流水线校验

职责:在执行之前检查流水线的语义正确性,尽早发现问题。

校验规则

  1. 步骤 ID 格式:匹配 [a-z0-9_-],点号(.)不允许
  2. 步骤 ID 唯一性:同一流水线不能有重复 ID
  3. 步骤类型存在use 指定的步骤必须已注册(通过 registry.has()
  4. 参数引用合法性$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义
  5. on_error 合法值:必须是 fail/skip/retry 之一
  6. outputs 中的引用:引用的步骤必须存在

区分校验与执行时错误

错误类型 触发时机 异常类型
顶层 pipeline: 缺失 解析时 PipelineParseError
步骤 ID 不合法 校验时 PipelineValidationError
引用未定义步骤 校验时 PipelineValidationError
步骤执行异常 执行时 StepExecutionError
变量解析失败 执行时 VariableResolutionError

geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。


13.4 context.py:上下文与引用解析

职责:维护流水线执行状态,提供变量替换和步骤引用解析。

PipelineContext

class PipelineContext:
    variables: dict              # 流水线变量(含 --var 覆盖)
    _step_outputs: dict[str, StepResult]  # 已完成步骤的输出

    def set_output(step_id, result)  # 步骤完成后存储结果
    def get_output(step_id)          # 获取步骤结果
    def resolve(value)               # 解析值(变量替换/步骤引用)
    def resolve_params(params)        # 解析整个 params 字典

引用解析算法

resolve(value) 的处理逻辑:

def resolve(self, value):
    if not isinstance(value, str):
        return value           # 非字符串直接返回

    if value.startswith("$") and not value.startswith("${"):
        return self._resolve_step_ref(value)  # 步骤引用

    if "${" in value:
        return self._substitute_variables(value)  # 变量替换

    return value               # 普通字符串

def _resolve_step_ref(self, ref):
    ref_body = ref[1:]
    if "." not in ref_body:
        step_id, attr = ref_body, "output"   # $step → $step.output
    else:
        step_id, attr = ref_body.split(".", 1)
    result = self._step_outputs[step_id]
    return getattr(result, attr)             # 访问 StepResult 属性

getattr(result, attr) 的工作原理StepResult 实现了 __getattr__,查找顺序为:

  1. 内置属性(outputstatsmetadataissues
  2. stats 字典键(如 feature_countissues_count
  3. metadata 字典键(如 issues_gdftransform
  4. 以上都找不到 → 抛出 AttributeError,转为 VariableResolutionError

StepContext

每个步骤执行时接收 StepContext,提供参数访问:

class StepContext:
    def param(self, name, default=None)  # 获取指定参数值
    def input(self, name="input")        # input("input") 的快捷方式
    @property params                      # 完整参数字典
    backend                              # 后端对象(可能为 None)

13.5 executor.py:步骤执行调度

职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。

主执行流程

def execute_pipeline(pipeline) -> dict:
    context = PipelineContext(variables=pipeline.variables)
    backend_manager = BackendManager.default()

    for step_def in pipeline.steps:
        # 1. 条件判断
        if step_def.when and not _evaluate_condition(step_def.when, context):
            context.set_output(step_def.id, StepResult())
            # → 记录 status=skipped,继续

        # 2. 执行步骤(含重试)
        result = _execute_step(step_def, context, backend_manager)
        context.set_output(step_def.id, result)

    # 3. 解析 outputs
    resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()}

    # 4. 生成 JSON 报告
    return build_report(...)

when 条件安全求值

_evaluate_condition 函数使用 AST 白名单确保安全:

  1. 用正则替换 ${var}$step.attr 占位符为 Python 字面值
  2. 解析为 AST(ast.parse(mode="eval")
  3. 通过 validate_condition_ast 检查 AST 节点类型(白名单)
  4. 使用空 __builtins__ 求值:eval(tree, {"__builtins__": {}}, {})

若求值失败,返回 False(步骤被跳过),不中断流水线。

重试机制

def _with_retry(fn, max_attempts, step_id):
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except StepExecutionError:
            raise    # 已封装的执行错误不重试
        except Exception as e:
            if attempt < max_attempts:
                time.sleep(0.5 * attempt)  # 递增等待:0.5s, 1s, 1.5s
    raise last_error

智能错误建议

当步骤失败时,_suggest_fix 函数分析错误信息并提供建议:

patterns = [
    ("crs" in msg and "degree" in msg → 建议添加 reproject 步骤),
    ("file not found" → 检查文件路径),
    ("invalid geometry" → 添加 qc.geometry_validity),
    ("keyerror" → 检查字段名是否存在),
    ...
]

这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。


13.6 reporter.py:JSON 报告生成

职责:将执行结果汇总为标准 JSON 报告结构。

报告格式

{
  "pipeline": "流水线名称",
  "status": "success",              // "success" | "error"
  "duration": 1.234,                // 总耗时(秒)
  "steps": [
    {
      "id": "load-roads",
      "step": "io.read_vector",
      "status": "success",          // "success" | "skipped" | "error"
      "duration": 0.089,
      "output_summary": {           // StepResult.summary() 的输出
        "feature_count": 100,
        "crs": "EPSG:4326",
        "geometry_types": ["LineString"]
      },
      // QC 步骤还会有:
      "issues_count": 5,
      "issues": [{"rule_id": "...", "severity": "error", "message": "..."}]
    },
    {
      "id": "optional-step",
      "step": "vector.simplify",
      "status": "skipped",
      "skip_reason": "condition not met: ${enable_simplify} == true"
    },
    {
      "id": "failed-step",
      "step": "vector.clip",
      "status": "error",
      "error": "CRS mismatch: ...",
      "suggestion": "Add a vector.reproject step..."
    }
  ],
  "outputs": {
    "result": "output/buffer.geojson",
    "feature_count": 100
  }
}

13.7 Python API:在代码中使用引擎

除 CLI 外,也可直接在 Python 代码中调用引擎 API:

import geopipe_agent  # 触发内置步骤的自动注册

from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.validator import validate_pipeline
from geopipe_agent.engine.executor import execute_pipeline
from geopipe_agent.errors import GeopipeAgentError

# 方式一:从文件加载
pipeline = parse_yaml("my_pipeline.yaml")

# 方式二:从字符串加载
yaml_str = """
pipeline:
  name: "inline pipeline"
  steps:
    - id: load
      use: io.read_vector
      params: { path: "data.shp" }
"""
pipeline = parse_yaml(yaml_str)

# 运行时覆盖变量
pipeline.variables["input_path"] = "data/roads.shp"
pipeline.variables["buffer_dist"] = 500

# 校验(返回警告列表,或抛出异常)
warnings = validate_pipeline(pipeline)

# 执行(返回 JSON 报告字典)
try:
    report = execute_pipeline(pipeline)
    print(f"Status: {report['status']}")
    print(f"Duration: {report['duration']}s")
except GeopipeAgentError as e:
    print(f"Error: {e}")

13.8 本章小结

本章深入解析了流水线引擎的 5 个模块:

  1. parser.py:YAML → PipelineDefinition,强制要求顶层 pipeline:
  2. validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性
  3. context.py:维护执行上下文,解析 $step.attr${var} 引用
  4. executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议
  5. reporter.py:汇总生成标准化 JSON 报告

导航← 第十二章:数据质检步骤第十四章:多后端系统 →