本文档介绍了如何在 Dataform 中执行以下操作:
准备工作
如需使用工作流配置安排运行作业或使用工作流和 Cloud Scheduler 安排运行作业,请确保您执行以下操作:
如需使用 Cloud Composer 安排运行作业,请确保您执行以下操作:
- 选择或创建 Dataform 仓库。
- 向 Dataform 授予对 BigQuery 的访问权限。
- 选择或创建 Dataform 工作区。
- 至少创建一个表。
- 创建 Cloud Composer 2 环境。
所需的角色
如需获得完成本文档中任务所需的权限,请让管理员为您授予以下 IAM 角色:
-
针对代码库的 Dataform Admin (
roles/dataform.admin
) -
代码库上的 Dataform Editor (
roles/dataform.editor
) 以及 Cloud Composer 环境的服务账号 -
Cloud Composer 环境的服务账号上的 Composer Worker (
roles/composer.worker
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
如需使用默认 Dataform 服务账号以外的服务账号,请向自定义服务账号授予访问权限。
使用工作流配置安排运行作业
本部分介绍了如何在 Dataform 中创建工作流配置,以便安排和配置工作流运行。您可以使用工作流配置按计划执行 Dataform 工作流。
工作流配置简介
如需在 BigQuery 中安排对所有或所选工作流操作的 Dataform 运行,您可以创建工作流配置。在工作流配置中,您可以选择编译版本配置、选择要执行的工作流操作,以及设置运行时间表。
然后,在按计划运行工作流配置期间,Dataform 会将您从发布配置的最新编译结果中选择的操作部署到 BigQuery。您还可以使用 Dataform API workflowConfigs 手动触发工作流配置的运行。
Dataform 工作流配置包含以下运行设置:
- 工作流配置的 ID。
- 发布配置。
服务账号。
这是与工作流配置关联的服务账号。您可以选择默认的 Dataform 服务账号或与您的 Google Cloud 项目关联的服务账号,也可以手动输入其他服务账号。默认情况下,工作流配置使用与其代码库相同的服务账号。
要执行的工作流操作:
- 所有操作。
- 选择操作。
- 选择代码。
运行时间表和时区。
创建工作流配置
如需创建 Dataform 工作流配置,请按以下步骤操作:
- 在代码库中,前往版本和时间安排。
- 在工作流配置部分,点击创建。
在创建工作流配置窗格的配置 ID 字段中,输入工作流配置的唯一 ID。
ID 只能包含数字、字母、连字符和下划线。
在版本配置菜单中,选择一个编译版本配置。
可选:在频率字段中,输入运行频率(采用 unix-cron 格式)。
为确保 Dataform 在相应版本配置中执行最新的编译结果,请在编译结果创建时间和安排的执行时间之间至少间隔一小时。
在服务账号菜单中,为工作流配置选择一个服务账号。
在该菜单中,您可以选择默认的 Dataform 服务账号,也可以选择与您有权访问的与您的 Google Cloud 项目关联的任何服务账号。如果您未选择服务账号,工作流配置将使用代码库的服务账号。
可选:在时区菜单中,选择运行作业的时区。
默认时区为世界协调时间 (UTC)。
选择要执行的工作流操作:
- 如需执行整个工作流,请点击所有操作。
- 如需在工作流中执行所选操作,请点击选择操作,然后选择操作。
- 如需针对所选代码执行操作,请点击代码选择,然后选择代码。
- 可选:如需执行所选操作或代码及其依赖项,请选择包含依赖项选项。
- 可选:如需执行所选操作或代码及其依赖项,请选择包含依赖项选项。
- 可选:如需从头开始重新构建所有表,请选择运行并完全刷新选项。
如果不使用此选项,Dataform 会更新增量表,而不会从头重建这些表。
点击创建。
例如,以下工作流配置会在 CEST 时区每小时执行带有 hourly
标记的操作:
- 配置 ID:
production-hourly
- 版本配置:-
- 频率:
0 * * * *
- 时区:
Central European Summer Time (CEST)
- 选择工作流操作:选择标记、
hourly
标记
修改工作流配置
如需修改工作流配置,请按以下步骤操作:
- 在代码库中,前往版本和时间安排。
- 在要修改的工作流配置旁边,点击 更多菜单,然后点击修改。
- 在修改工作流配置窗格中,修改发布配置设置,然后点击保存。
删除工作流配置
如需删除工作流配置,请按以下步骤操作:
- 在代码库中,前往版本和时间安排。
- 在要删除的工作流配置旁边,点击 更多菜单,然后点击删除。
- 在删除版本配置对话框中,点击删除。
使用 Workflows 和 Cloud Scheduler 安排运行
本部分介绍了如何使用 Workflows 和 Cloud Scheduler 安排 Dataform 工作流的运行。
关于安排的工作流程运行
您可以通过创建用于触发 Workflows 工作流的 Cloud Scheduler 作业来设置 Dataform 工作流的运行频率。Workflows 会按照您定义的编排工作流执行服务。
工作流会分两个步骤执行 Dataform 工作流。首先,它会从 Git 提供商中拉取您的 Dataform 代码库代码,并将其编译为编译结果。然后,它使用编译结果创建 Dataform 工作流,并按照您设置的频率执行该工作流。
创建定期编排工作流
如需安排 Dataform 工作流的运行,请使用 Workflows 创建一个编排工作流,并将 Cloud Scheduler 作业添加为触发器。
Workflows 使用服务账号向工作流授予对Google Cloud 资源的访问权限。创建一个服务账号,并为其授予 Dataform Editor (
roles/dataform.editor
) Identity and Access Management 角色,以及管理编排工作流所需的最低权限。如需了解详情,请参阅向工作流授予访问 Google Cloud 资源的权限。创建一个编排工作流,并使用以下 YAML 源代码作为工作流定义:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目的 ID。
- REPOSITORY_LOCATION:Dataform 仓库的位置。
- REPOSITORY_ID:您的 Dataform 代码库的名称。
- GIT_COMMITISH:您要从中执行 Dataform 代码的 Git 分支。对于新创建的代码库,请将其替换为
main
。
自定义 Dataform 工作流“创建编译结果”请求
您可以更新现有的编排工作流,并以 YAML 格式定义 Dataform 工作流创建编译结果请求设置。如需详细了解这些设置,请参阅 projects.locations.repositories.compilationResults
REST 资源参考文档。
例如,如需在编译期间向所有操作添加 _dev
schemaSuffix
设置,请将 createCompilationResult
步骤正文替换为以下代码段:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
您还可以在工作流运行请求中将其他设置作为运行时参数传递,并使用变量访问这些参数。如需了解详情,请参阅在执行请求中传递运行时参数。
自定义 Dataform 工作流调用请求
您可以更新现有的编排工作流,并以 YAML 格式定义 Dataform 工作流调用请求设置。如需详细了解调用请求设置,请参阅 projects.locations.repositories.workflowInvocations
REST 资源参考文档。
例如,若要仅使用包含所有传递依赖项的 hourly
标记执行操作,请将 createWorkflowInvocation
正文替换为以下代码段:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
您还可以在工作流运行请求中将其他设置作为运行时参数传递,并使用变量访问这些参数。如需了解详情,请参阅在执行请求中传递运行时参数。
使用 Cloud Composer 安排运行
您可以使用 Cloud Composer 2 安排 Dataform 运行。Dataform 不支持 Cloud Composer 1。
如需使用 Cloud Composer 2 管理 Dataform 运行作业的安排,您可以在 Airflow 有向无环图 (DAG) 中使用 Dataform 运算符。您可以创建用于调度 Dataform 工作流调用的 Airflow DAG。
Dataform 提供各种 Airflow 运算符。这些操作包括用于获取编译结果、获取工作流调用和取消工作流调用的操作。如需查看可用 Dataform Airflow 运算符的完整列表,请参阅 Google Dataform 运算符。
安装 google-cloud-dataform
PyPi 软件包
如果您使用的是 Cloud Composer 2 版本 2.0.25
及更高版本,则此软件包已预安装在您的环境中。您无需安装它。
如果您使用的是较低版本的 Cloud Composer 2,请安装 google-cloud-dataform
PyPi 软件包。
在 PyPI 软件包部分,指定版本 ==0.2.0
。
创建用于调度 Dataform 工作流调用的 Airflow DAG
如需使用 Cloud Composer 2 管理 Dataform 工作流的定期运行作业,请使用 Dataform Airflow 运算符编写 DAG,然后将其上传到环境的存储分区。
以下代码示例展示了一个 Airflow DAG,该 DAG 会创建 Dataform 编译结果并启动 Dataform 工作流调用:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID。
- REPOSITORY_ID:您的 Dataform 代码库的名称。
- REGION:Dataform 代码库所在的区域。
- COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称。
- GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git 提交 ID,例如分支或 Git SHA。
以下代码示例展示了一个 Airflow DAG,该 DAG 会执行以下操作:
- 创建 Dataform 编译结果。
- 启动异步 Dataform 工作流调用。
- 使用
DataformWorkflowInvocationStateSensor
轮询工作流的状态,直到其进入预期状态。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID。
- REPOSITORY_ID:您的 Dataform 代码库的名称。
- REGION:Dataform 代码库所在的区域。
- COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称。
- GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git 提交 ID,例如分支或 Git SHA。
- COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称。
添加编译配置参数
您可以向 create_compilation_result
Airflow DAG 对象添加其他编译配置参数。如需详细了解可用参数,请参阅 CodeCompilationConfig
Dataform API 参考文档。
如需将编译配置参数添加到
create_compilation_result
Airflow DAG 对象,请按照以下格式将所选参数添加到code_compilation_config
字段:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID。
- REPOSITORY_ID:您的 Dataform 代码库的名称。
- REGION:Dataform 代码库所在的区域。
- GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git 提交 ID,例如分支或 Git SHA。
- PARAMETER:所选的
CodeCompilationConfig
参数。您可以添加多个参数。 - PARAMETER_VALUE:所选参数的值。
以下代码示例展示了添加到 create_compilation_result
Airflow DAG 对象的 defaultDatabase
参数:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
添加工作流调用配置参数
您可以向 create_workflow_invocation
Airflow DAG 对象添加其他工作流调用配置参数。如需详细了解可用参数,请参阅 InvocationConfig
Dataform API 参考文档。
如需将工作流调用配置参数添加到
create_workflow_invocation
Airflow DAG 对象,请将所选参数以以下格式添加到invocation_config
字段:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID。
- REPOSITORY_ID:您的 Dataform 代码库的名称。
- REGION:Dataform 代码库所在的区域。
- PARAMETER:所选的
InvocationConfig
参数。您可以添加多个参数。 - PARAMETER_VALUE:所选参数的值。
以下代码示例展示了添加到 create_workflow_invocation
Airflow DAG 对象的 includedTags[]
和 transitiveDependenciesIncluded
参数:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
后续步骤
- 如需了解如何配置 Dataform 编译版本配置,请参阅创建版本配置。
- 如需详细了解 Dataform 中的代码生命周期,请参阅 Dataform 中代码生命周期简介。
- 如需详细了解 Dataform API,请参阅 Dataform API。
- 如需详细了解 Cloud Composer 环境,请参阅 Cloud Composer 概览。
- 如需详细了解 Workflows 价格,请参阅 Workflows 价格。