系列概覽
本系列部落格文章是針對希望探索 Dataform 核心功能之外的 Dataform 用戶。除了其他事情之外,我們將深入研究 Dataform 的強大 API,在 GitHub Actions 中建立自動化的 CI/CD 流程,建立用於監控成本的管線,並透過程式碼修改 config {} 區塊。
Dataform 的 API 是探索 Dataform 更強大且文件記錄較少的功能的自然起點。對 API 的理解允許使用者將 Dataform 作為自動化現代數據平台的所有功能中心.
Dataform API 的關鍵概念
該 API 有兩個物件,它們是任何工作流程的基本組成部分:
-
CompilationResult- 在特定時間點的 Dataform 工作區編譯狀態( 建立流程 的結果)。 -
WorkflowInvocation- 單次執行的CompilationResult( 執行流程 的結果)。
CompilationResult
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
project_id = "my-project"
region = "europe-west2"
repository_id = "analytics"
workspace_id = "dev"
workspace = client.workspace_path(
project_id,
region,
repository_id,
workspace_id,
)
repository = client.repository_path(
project_id,
region,
repository_id,
)
compilation_result = dataform_v1.CompilationResult(
workspace=workspace
)
response = client.create_compilation_result(
parent=repository,
compilation_result=compilation_result,
)
print(response.name)
回應採取以下形式projects/<project_id>/locations/<region>/repositories/<repository_id>/compilationResults/<compilation_result_id> (其中 <compilation_result_id> 是一個 UUID v4).
每當偵測到程式碼變更時,Dataform UI 會產生一個 CompilationResult 物件,儘管 UI 會隱藏其 ID。在執行後,可以在 UI 的 Executions 頁面看到這個 ID。
CompilationResult 包含有關 Dataform 工作空間中每個工作的資訊,允許使用者透過程式碼瀏覽 DAG,並提取每個檔案的 config {} 區塊等資訊。接下來,每個工作都可以透過以下方式進行迴圈:
request = dataform_v1.QueryCompilationResultActionsRequest(
name=COMPILATION_RESULT
)
response = client.query_compilation_result_actions(
request=request
)
for action in response.compilation_result_actions:
...
WorkflowInvocation
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
project_id = "my-project"
region = "europe-west2"
repository_id = "analytics"
repository = client.repository_path(
project_id , region, repository_id
)
# compilation_result comes from the previous snippet
invocation = dataform_v1.WorkflowInvocation(
compilation_result=compilation_result.name
)
response = client.create_workflow_invocation(
parent=repository,
workflow_invocation=invocation,
)
print(f'{response.name}: {response.state}')
AWorkflowInvocation 每次執行執行時都在 Dataform UI 中產生:
WorkflowInvocation 包含已執行每個任務的資訊,允許用戶將任務綁定到它們的 BigQuery 任務 ID,並查看所創建的物件(例如表格或視圖)。每個已執行的任務都可以透過以下方式進行迭代:
request = dataform_v1.QueryWorkflowInvocationActionsRequest
name=WORKFLOW_INVOCATION
)
response = client.query_workflow_invocation_actions(
request=request
)
for action in response.workflow_invocation_actions:
...
將它們綁定在一起
當這兩個物件結合時,Dataform的API才真正展現其威力。一個典型的流程如下:
- 工作空間被更新(手動或透過Git),
- 從工作空間產生一個
CompilationResult, - 從那次編譯建立一個
WorkflowInvocation, - BigQuery執行產生的DAG。
僅僅這兩個物件就足以自動化在 Dataform UI 之外建立和執行 Dataform DAG。這使得 CI/CD 管道變得容易,我們將在稍後的文章中探討。















