惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

GbyAI
GbyAI
T
Tenable Blog
Webroot Blog
Webroot Blog
L
Lohrmann on Cybersecurity
S
Securelist
S
Schneier on Security
NISL@THU
NISL@THU
Know Your Adversary
Know Your Adversary
C
Cybersecurity and Infrastructure Security Agency CISA
T
The Exploit Database - CXSecurity.com
L
LINUX DO - 热门话题
C
CXSECURITY Database RSS Feed - CXSecurity.com
O
OpenAI News
I
Intezer
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
TaoSecurity Blog
TaoSecurity Blog
S
Secure Thoughts
Application and Cybersecurity Blog
Application and Cybersecurity Blog
P
Privacy International News Feed
H
Hacker News: Front Page
N
Netflix TechBlog - Medium
M
MIT News - Artificial intelligence
博客园 - Franky
PCI Perspectives
PCI Perspectives
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Microsoft Azure Blog
Microsoft Azure Blog
MongoDB | Blog
MongoDB | Blog
L
LangChain Blog
P
Proofpoint News Feed
S
Security Affairs
WordPress大学
WordPress大学
The Last Watchdog
The Last Watchdog
S
SegmentFault 最新的问题
小众软件
小众软件
F
Full Disclosure
博客园 - 叶小钗
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
T
The Blog of Author Tim Ferriss
Simon Willison's Weblog
Simon Willison's Weblog
P
Palo Alto Networks Blog
Security Latest
Security Latest
P
Proofpoint News Feed
月光博客
月光博客
T
Tailwind CSS Blog
Scott Helme
Scott Helme
Hacker News - Newest:
Hacker News - Newest: "LLM"
Google Online Security Blog
Google Online Security Blog
T
Threat Research - Cisco Blogs
Help Net Security
Help Net Security
Project Zero
Project Zero

博客园 - 我才是银古

第16章:常见问题、排错与最佳实践 第15章:扩展生态、MCAD 与外部集成 第12章:实战案例:机械结构与 3D 打印零件 第14章:构建、测试、调试与贡献流程 第13章:OpenSCAD 源码架构与核心执行流程 第11章:预览、渲染、网格精度与性能优化 第09章:列表推导、递归与算法建模 第08章:参数化零件库与复用设计 第10章:导入导出、命令行与自动化 第06章:CSG 布尔建模方法 第07章:二维图形、拉伸、旋转与投影 第05章:基础几何、坐标系与变换 第04章:参数、变量、函数、模块与作用域 OpenSCAD 教程目录 第03章:OpenSCAD 语言基础 第02章:安装、环境配置与开发工作流 第01章:OpenSCAD 项目全景与学习路线 第02章:源码获取、编译与开发环境配置 第01章:OCCT项目全景与学习路线 第18章:二次开发实战与综合案例 第18章:综合实战案例 第17章:数据交换与协同 第16章:源码架构与二次开发 第15章:插件与自定义工作台开发 第14章:Python脚本宏与自动化 第13章:FEM仿真分析 第12章:CAM数控加工 第11章:SurfaceMesh与逆向工程 第10章:Draft二维绘图与BIM建筑 第09章:工程图TechDraw 第07章:参数化表达式与Spreadsheet 第08章:装配设计Assembly 第06章:Part工作台与几何内核 第05章:PartDesign实体特征建模 第04章:草图Sketcher约束建模 第02章:安装版本与工作环境配置 第03章:界面工作台与基础操作 第01章:项目全景与学习路线 第十二章:插件开发、研究功能与最佳实践 第十章:定时任务与自动化(Cron) 第七章:技能、记忆与自学习闭环 第八章:MCP 集成与上下文文件 第六章:工具系统与终端后端 第五章:模型供应商与配置体系 Hermes Agent 教程目录 第十一章:语音、视觉、浏览器与子代理协作 第四章:CLI/TUI 与会话管理 第十二章:学习路线、实战方案与最佳实践 第十一章:源码结构、开发调试与插件开发 第十章:自动化、远程访问、日志与排障 第九章:Control UI、节点、Canvas 与语音能力 第七章:工具、技能、插件与能力扩展 第八章:安全模型、访问控制与沙箱实践 第六章:Agent 工作区、会话与多智能体路由 第五章:多通道消息接入与聊天平台配置 第四章:配置体系、模型接入与认证管理 第三章:Gateway 架构、协议与运行机制 第二章:安装、环境准备与快速上手 第一章:OpenClaw 项目概览与核心定位 oh-my-openagent 教程目录 09-命令模型回退与配置参考 10-实战案例最佳实践与故障排除 05-工作模式-Ultrawork-Prometheus-Atlas 08-Hooks与MCP系统 06-Category与Skill系统 07-核心工具链 04-智能体全景详解 03-安装与环境配置 02-整体架构与多模型编排机制 01-项目简介与核心理念 01-项目概览与学习路线 02-安装部署与工具适配 03-Skill机制与using-superpowers 05-TDD系统化调试与完成前验证 04-需求澄清方案设计与计划编写 07-并行智能体子智能体与Git-Worktree 第六章:代码审查、反馈处理与分支收尾 08-中国特色Skills与本土团队落地 09-MCP构建工作流执行与自定义Skill 第23章:FreeCAD-Python-API Clipper2 C# 源码解读教程 第19章:PolyTree 多边形树结构 第20章:实际应用与最佳实践 第18章:Minkowski 和与差 第17章:RectClip 矩形裁剪优化 第16章:ClipperOffset 偏移类详解 第15章:填充规则详解 第14章:布尔运算执行流程 第13章:ClipperD 浮点裁剪类 第11章:OutRec 与 OutPt 输出结构 第9章:Active 活动边结构 第10章:Vertex 顶点与 LocalMinima 局部极小值 第12章:Clipper64 裁剪类详解 第7章:高精度运算与128位整数 第8章:ClipperBase 基类详解 第5章:枚举类型与常量定义 第6章:InternalClipper 内部工具类 第2章:核心数据结构 - Point64、PointD 第3章:路径与多边形表示 - Path64、PathD、Paths64、PathsD 第4章:矩形边界 - Rect64、RectD
第十三章:流水线引擎深度解析
我才是银古 · 2026-06-22 · via 博客园 - 我才是银古

第十三章:流水线引擎深度解析

GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。


13.1 引擎整体流程

执行一个 YAML 流水线涉及四个阶段:

YAML 文件
    ↓ parser.py
PipelineDefinition(内存模型)
    ↓ validator.py
校验通过(或抛出 PipelineValidationError)
    ↓ executor.py + context.py
执行所有步骤,收集结果
    ↓ reporter.py
JSON 执行报告

13.2 parser.py:YAML 解析

职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。

核心函数

def parse_yaml(source: str | Path) -> PipelineDefinition:
    raw = _load_yaml(source)       # 读取文件或字符串 → dict
    return _build_pipeline(raw)    # 构建 PipelineDefinition

关键解析规则

  1. 顶层 pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.")
  2. steps 必须是非空列表pipeline.steps 为空时报错
  3. 每个步骤必须有 iduse:缺少时报错并指明步骤索引
  4. on_error 默认值为 "fail":解析时自动填充

解析的数据结构

@dataclass
class PipelineDefinition:
    name: str
    steps: list[StepDefinition]
    description: str = ""
    crs: str | None = None
    variables: dict = {}
    outputs: dict = {}

@dataclass
class StepDefinition:
    id: str
    use: str
    params: dict = {}
    when: str | None = None
    on_error: str = "fail"  # "fail" | "skip" | "retry"
    backend: str | None = None

13.3 validator.py:流水线校验

职责:在执行之前检查流水线的语义正确性,尽早发现问题。

校验规则

  1. 步骤 ID 格式:匹配 [a-z0-9_-],点号(.)不允许
  2. 步骤 ID 唯一性:同一流水线不能有重复 ID
  3. 步骤类型存在use 指定的步骤必须已注册(通过 registry.has()
  4. 参数引用合法性$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义
  5. on_error 合法值:必须是 fail/skip/retry 之一
  6. outputs 中的引用:引用的步骤必须存在

区分校验与执行时错误

错误类型 触发时机 异常类型
顶层 pipeline: 缺失 解析时 PipelineParseError
步骤 ID 不合法 校验时 PipelineValidationError
引用未定义步骤 校验时 PipelineValidationError
步骤执行异常 执行时 StepExecutionError
变量解析失败 执行时 VariableResolutionError

geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。


13.4 context.py:上下文与引用解析

职责:维护流水线执行状态,提供变量替换和步骤引用解析。

PipelineContext

class PipelineContext:
    variables: dict              # 流水线变量(含 --var 覆盖)
    _step_outputs: dict[str, StepResult]  # 已完成步骤的输出

    def set_output(step_id, result)  # 步骤完成后存储结果
    def get_output(step_id)          # 获取步骤结果
    def resolve(value)               # 解析值(变量替换/步骤引用)
    def resolve_params(params)        # 解析整个 params 字典

引用解析算法

resolve(value) 的处理逻辑:

def resolve(self, value):
    if not isinstance(value, str):
        return value           # 非字符串直接返回

    if value.startswith("$") and not value.startswith("${"):
        return self._resolve_step_ref(value)  # 步骤引用

    if "${" in value:
        return self._substitute_variables(value)  # 变量替换

    return value               # 普通字符串

def _resolve_step_ref(self, ref):
    ref_body = ref[1:]
    if "." not in ref_body:
        step_id, attr = ref_body, "output"   # $step → $step.output
    else:
        step_id, attr = ref_body.split(".", 1)
    result = self._step_outputs[step_id]
    return getattr(result, attr)             # 访问 StepResult 属性

getattr(result, attr) 的工作原理StepResult 实现了 __getattr__,查找顺序为:

  1. 内置属性(outputstatsmetadataissues
  2. stats 字典键(如 feature_countissues_count
  3. metadata 字典键(如 issues_gdftransform
  4. 以上都找不到 → 抛出 AttributeError,转为 VariableResolutionError

StepContext

每个步骤执行时接收 StepContext,提供参数访问:

class StepContext:
    def param(self, name, default=None)  # 获取指定参数值
    def input(self, name="input")        # input("input") 的快捷方式
    @property params                      # 完整参数字典
    backend                              # 后端对象(可能为 None)

13.5 executor.py:步骤执行调度

职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。

主执行流程

def execute_pipeline(pipeline) -> dict:
    context = PipelineContext(variables=pipeline.variables)
    backend_manager = BackendManager.default()

    for step_def in pipeline.steps:
        # 1. 条件判断
        if step_def.when and not _evaluate_condition(step_def.when, context):
            context.set_output(step_def.id, StepResult())
            # → 记录 status=skipped,继续

        # 2. 执行步骤(含重试)
        result = _execute_step(step_def, context, backend_manager)
        context.set_output(step_def.id, result)

    # 3. 解析 outputs
    resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()}

    # 4. 生成 JSON 报告
    return build_report(...)

when 条件安全求值

_evaluate_condition 函数使用 AST 白名单确保安全:

  1. 用正则替换 ${var}$step.attr 占位符为 Python 字面值
  2. 解析为 AST(ast.parse(mode="eval")
  3. 通过 validate_condition_ast 检查 AST 节点类型(白名单)
  4. 使用空 __builtins__ 求值:eval(tree, {"__builtins__": {}}, {})

若求值失败,返回 False(步骤被跳过),不中断流水线。

重试机制

def _with_retry(fn, max_attempts, step_id):
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except StepExecutionError:
            raise    # 已封装的执行错误不重试
        except Exception as e:
            if attempt < max_attempts:
                time.sleep(0.5 * attempt)  # 递增等待:0.5s, 1s, 1.5s
    raise last_error

智能错误建议

当步骤失败时,_suggest_fix 函数分析错误信息并提供建议:

patterns = [
    ("crs" in msg and "degree" in msg → 建议添加 reproject 步骤),
    ("file not found" → 检查文件路径),
    ("invalid geometry" → 添加 qc.geometry_validity),
    ("keyerror" → 检查字段名是否存在),
    ...
]

这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。


13.6 reporter.py:JSON 报告生成

职责:将执行结果汇总为标准 JSON 报告结构。

报告格式

{
  "pipeline": "流水线名称",
  "status": "success",              // "success" | "error"
  "duration": 1.234,                // 总耗时(秒)
  "steps": [
    {
      "id": "load-roads",
      "step": "io.read_vector",
      "status": "success",          // "success" | "skipped" | "error"
      "duration": 0.089,
      "output_summary": {           // StepResult.summary() 的输出
        "feature_count": 100,
        "crs": "EPSG:4326",
        "geometry_types": ["LineString"]
      },
      // QC 步骤还会有:
      "issues_count": 5,
      "issues": [{"rule_id": "...", "severity": "error", "message": "..."}]
    },
    {
      "id": "optional-step",
      "step": "vector.simplify",
      "status": "skipped",
      "skip_reason": "condition not met: ${enable_simplify} == true"
    },
    {
      "id": "failed-step",
      "step": "vector.clip",
      "status": "error",
      "error": "CRS mismatch: ...",
      "suggestion": "Add a vector.reproject step..."
    }
  ],
  "outputs": {
    "result": "output/buffer.geojson",
    "feature_count": 100
  }
}

13.7 Python API:在代码中使用引擎

除 CLI 外,也可直接在 Python 代码中调用引擎 API:

import geopipe_agent  # 触发内置步骤的自动注册

from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.validator import validate_pipeline
from geopipe_agent.engine.executor import execute_pipeline
from geopipe_agent.errors import GeopipeAgentError

# 方式一:从文件加载
pipeline = parse_yaml("my_pipeline.yaml")

# 方式二:从字符串加载
yaml_str = """
pipeline:
  name: "inline pipeline"
  steps:
    - id: load
      use: io.read_vector
      params: { path: "data.shp" }
"""
pipeline = parse_yaml(yaml_str)

# 运行时覆盖变量
pipeline.variables["input_path"] = "data/roads.shp"
pipeline.variables["buffer_dist"] = 500

# 校验(返回警告列表,或抛出异常)
warnings = validate_pipeline(pipeline)

# 执行(返回 JSON 报告字典)
try:
    report = execute_pipeline(pipeline)
    print(f"Status: {report['status']}")
    print(f"Duration: {report['duration']}s")
except GeopipeAgentError as e:
    print(f"Error: {e}")

13.8 本章小结

本章深入解析了流水线引擎的 5 个模块:

  1. parser.py:YAML → PipelineDefinition,强制要求顶层 pipeline:
  2. validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性
  3. context.py:维护执行上下文,解析 $step.attr${var} 引用
  4. executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议
  5. reporter.py:汇总生成标准化 JSON 报告

导航← 第十二章:数据质检步骤第十四章:多后端系统 →