




























GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。
执行一个 YAML 流水线涉及四个阶段:
YAML 文件
↓ parser.py
PipelineDefinition(内存模型)
↓ validator.py
校验通过(或抛出 PipelineValidationError)
↓ executor.py + context.py
执行所有步骤,收集结果
↓ reporter.py
JSON 执行报告
parser.py:YAML 解析职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。
def parse_yaml(source: str | Path) -> PipelineDefinition:
raw = _load_yaml(source) # 读取文件或字符串 → dict
return _build_pipeline(raw) # 构建 PipelineDefinition
pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.")steps 必须是非空列表:pipeline.steps 为空时报错id 和 use:缺少时报错并指明步骤索引on_error 默认值为 "fail":解析时自动填充@dataclass
class PipelineDefinition:
name: str
steps: list[StepDefinition]
description: str = ""
crs: str | None = None
variables: dict = {}
outputs: dict = {}
@dataclass
class StepDefinition:
id: str
use: str
params: dict = {}
when: str | None = None
on_error: str = "fail" # "fail" | "skip" | "retry"
backend: str | None = None
validator.py:流水线校验职责:在执行之前检查流水线的语义正确性,尽早发现问题。
[a-z0-9_-],点号(.)不允许use 指定的步骤必须已注册(通过 registry.has())$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义on_error 合法值:必须是 fail/skip/retry 之一outputs 中的引用:引用的步骤必须存在| 错误类型 | 触发时机 | 异常类型 |
|---|---|---|
顶层 pipeline: 缺失 |
解析时 | PipelineParseError |
| 步骤 ID 不合法 | 校验时 | PipelineValidationError |
| 引用未定义步骤 | 校验时 | PipelineValidationError |
| 步骤执行异常 | 执行时 | StepExecutionError |
| 变量解析失败 | 执行时 | VariableResolutionError |
geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。
context.py:上下文与引用解析职责:维护流水线执行状态,提供变量替换和步骤引用解析。
PipelineContext 类class PipelineContext:
variables: dict # 流水线变量(含 --var 覆盖)
_step_outputs: dict[str, StepResult] # 已完成步骤的输出
def set_output(step_id, result) # 步骤完成后存储结果
def get_output(step_id) # 获取步骤结果
def resolve(value) # 解析值(变量替换/步骤引用)
def resolve_params(params) # 解析整个 params 字典
resolve(value) 的处理逻辑:
def resolve(self, value):
if not isinstance(value, str):
return value # 非字符串直接返回
if value.startswith("$") and not value.startswith("${"):
return self._resolve_step_ref(value) # 步骤引用
if "${" in value:
return self._substitute_variables(value) # 变量替换
return value # 普通字符串
def _resolve_step_ref(self, ref):
ref_body = ref[1:]
if "." not in ref_body:
step_id, attr = ref_body, "output" # $step → $step.output
else:
step_id, attr = ref_body.split(".", 1)
result = self._step_outputs[step_id]
return getattr(result, attr) # 访问 StepResult 属性
getattr(result, attr) 的工作原理:StepResult 实现了 __getattr__,查找顺序为:
output、stats、metadata、issues)stats 字典键(如 feature_count、issues_count)metadata 字典键(如 issues_gdf、transform)AttributeError,转为 VariableResolutionErrorStepContext 类每个步骤执行时接收 StepContext,提供参数访问:
class StepContext:
def param(self, name, default=None) # 获取指定参数值
def input(self, name="input") # input("input") 的快捷方式
@property params # 完整参数字典
backend # 后端对象(可能为 None)
executor.py:步骤执行调度职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。
def execute_pipeline(pipeline) -> dict:
context = PipelineContext(variables=pipeline.variables)
backend_manager = BackendManager.default()
for step_def in pipeline.steps:
# 1. 条件判断
if step_def.when and not _evaluate_condition(step_def.when, context):
context.set_output(step_def.id, StepResult())
# → 记录 status=skipped,继续
# 2. 执行步骤(含重试)
result = _execute_step(step_def, context, backend_manager)
context.set_output(step_def.id, result)
# 3. 解析 outputs
resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()}
# 4. 生成 JSON 报告
return build_report(...)
when 条件安全求值_evaluate_condition 函数使用 AST 白名单确保安全:
${var} 和 $step.attr 占位符为 Python 字面值ast.parse(mode="eval"))validate_condition_ast 检查 AST 节点类型(白名单)__builtins__ 求值:eval(tree, {"__builtins__": {}}, {})若求值失败,返回 False(步骤被跳过),不中断流水线。
def _with_retry(fn, max_attempts, step_id):
for attempt in range(1, max_attempts + 1):
try:
return fn()
except StepExecutionError:
raise # 已封装的执行错误不重试
except Exception as e:
if attempt < max_attempts:
time.sleep(0.5 * attempt) # 递增等待:0.5s, 1s, 1.5s
raise last_error
当步骤失败时,_suggest_fix 函数分析错误信息并提供建议:
patterns = [
("crs" in msg and "degree" in msg → 建议添加 reproject 步骤),
("file not found" → 检查文件路径),
("invalid geometry" → 添加 qc.geometry_validity),
("keyerror" → 检查字段名是否存在),
...
]
这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。
reporter.py:JSON 报告生成职责:将执行结果汇总为标准 JSON 报告结构。
{
"pipeline": "流水线名称",
"status": "success", // "success" | "error"
"duration": 1.234, // 总耗时(秒)
"steps": [
{
"id": "load-roads",
"step": "io.read_vector",
"status": "success", // "success" | "skipped" | "error"
"duration": 0.089,
"output_summary": { // StepResult.summary() 的输出
"feature_count": 100,
"crs": "EPSG:4326",
"geometry_types": ["LineString"]
},
// QC 步骤还会有:
"issues_count": 5,
"issues": [{"rule_id": "...", "severity": "error", "message": "..."}]
},
{
"id": "optional-step",
"step": "vector.simplify",
"status": "skipped",
"skip_reason": "condition not met: ${enable_simplify} == true"
},
{
"id": "failed-step",
"step": "vector.clip",
"status": "error",
"error": "CRS mismatch: ...",
"suggestion": "Add a vector.reproject step..."
}
],
"outputs": {
"result": "output/buffer.geojson",
"feature_count": 100
}
}
除 CLI 外,也可直接在 Python 代码中调用引擎 API:
import geopipe_agent # 触发内置步骤的自动注册
from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.validator import validate_pipeline
from geopipe_agent.engine.executor import execute_pipeline
from geopipe_agent.errors import GeopipeAgentError
# 方式一:从文件加载
pipeline = parse_yaml("my_pipeline.yaml")
# 方式二:从字符串加载
yaml_str = """
pipeline:
name: "inline pipeline"
steps:
- id: load
use: io.read_vector
params: { path: "data.shp" }
"""
pipeline = parse_yaml(yaml_str)
# 运行时覆盖变量
pipeline.variables["input_path"] = "data/roads.shp"
pipeline.variables["buffer_dist"] = 500
# 校验(返回警告列表,或抛出异常)
warnings = validate_pipeline(pipeline)
# 执行(返回 JSON 报告字典)
try:
report = execute_pipeline(pipeline)
print(f"Status: {report['status']}")
print(f"Duration: {report['duration']}s")
except GeopipeAgentError as e:
print(f"Error: {e}")
本章深入解析了流水线引擎的 5 个模块:
parser.py:YAML → PipelineDefinition,强制要求顶层 pipeline: 键validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性context.py:维护执行上下文,解析 $step.attr 和 ${var} 引用executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议reporter.py:汇总生成标准化 JSON 报告此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。