제1부에서는 Dataform CompilationResult 및 WorkflowInvocation 객체를 살펴보았습니다. 유용하다고 생각하는 다른 몇 가지 API 엔드포인트를 간략히 살펴보겠습니다.
워크스페이스 관리 - delete_workspace 및create_workspace
delete_workspace는 대규모 팀에서 사용하는 Dataform 인스턴스에 필수적입니다. 이는 마ージ 후 워크스페이스를 자동으로 삭제할 수 있게 하여, 반기마다 "모두가 완료된 워크스페이스를 알려주세요"라는 메시지를 절약합니다. 이는 GitHub Actions와 같은 CI/CD 도구에 통합될 수 있습니다.
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
workspace_path = client.workspace_path(
PROJECT_ID,
REGION,
REPOSITORY_ID,
WORKSPACE_ID,
)
try:
operation = client.delete_workspace(name=workspace_path)
또한create_workspace - 새로운 브랜치에서 Dataform UI 외부에 코드 변경이 발생했을 때 자동으로 워크스페이스를 생성하는 데 매우 유용합니다.
Git 작업 - push_git_changes와 pull_git_changes
Dataform UI 외부에서 코드 변경을 자동으로 적용할 수 있는 유용한 엔드포인트 세트입니다.
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
workspace_path = client.workspace_path(
project_id,
region,
repository_id,
workspace_id,
)
# ------------------------------------------------------------
# 1. Pull latest changes from Git into the workspace
# ------------------------------------------------------------
print("Pulling latest Git changes into workspace...")
pull_request = dataform_v1.PullGitChangesRequest(
name=workspace_path
)
pull_operation = client.pull_git_changes(request=pull_request)
print("Pull initiated:", pull_operation.operation.name)
# Optional: wait for completion (simplified polling)
pull_result = pull_operation.result()
print("Pull completed successfully")
# ------------------------------------------------------------
# (Optional) 2. Make programmatic changes here
# ------------------------------------------------------------
# e.g. update config blocks, inject variables, generate models, format SQL, etc.
print("Applying automated workspace changes...")
# ------------------------------------------------------------
# 3. Push workspace changes back to Git
# ------------------------------------------------------------
print("Pushing workspace changes to Git...")
push_request = dataform_v1.PushGitChangesRequest(
name=workspace_path
)
push_operation = client.push_git_changes(request=push_request)
push_result = push_operation.result()
print("Push completed successfully")
print("Workspace successfully synced with Git")












