您可以使用 Cloud Composer 2 来安排 Dataform 的执行。 Dataform 不支持 Cloud Composer 1。
如需使用 Cloud Composer 2 管理 Dataform 执行时间表,请执行以下操作: 您可以在 Airflow 有向无环图 (DAG) 中使用 Dataform 运算符。你可以创建一个 Dataform 工作流调用。
Dataform 提供各种 Airflow 运算符。这包括运算符 用于获取编译结果、获取工作流调用和取消 工作流调用查看可用 Dataform 的完整列表 Airflow 运算符,请参阅 Google Dataform 运算符。
准备工作
- 选择或 创建一个 Dataform 代码库。
- 授予 Dataform 对 BigQuery 的访问权限。
- 选择或创建 Dataform 工作区。
- 请至少创建一个表。
- 创建一个 Cloud Composer 2 环境。
安装 google-cloud-dataform
PyPi 软件包
如果您使用的是 Cloud Composer 2 2.0.25
及更高版本,则此软件包
已预安装在您的环境中。您无需安装。
如果您使用的是早期版本的 Cloud Composer 2,
安装 google-cloud-dataform
PyPi 软件包。
在“PyPI 软件包”部分中,指定版本 ==0.2.0
。
创建用于安排 Dataform 工作流调用的 Airflow DAG
使用以下命令管理 Dataform SQL 工作流的预定执行: Cloud Composer 2,编写 DAG 使用 Dataform Airflow 运算符, 将其上传到环境的存储桶。
以下代码示例展示了用于创建 Dataform 的 Airflow DAG 编译结果并启动 Dataform 工作流调用:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID
- REPOSITORY_ID:Dataform 代码库的名称
- REGION:Dataform 所在的区域 代码库位于
- COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用
- GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 输入要使用的代码
以下代码示例展示了一个符合以下条件的 Airflow DAG:
- 创建 Dataform 编译结果。
- 启动异步 Dataform 工作流调用。
- 轮询工作流状态,直到它进入预期状态
使用
DataformWorkflowInvocationStateSensor
。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID
- REPOSITORY_ID:Dataform 代码库的名称
- REGION:Dataform 所在的区域 代码库位于
- COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用
- GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 代码
- COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用
添加编译配置参数
您可以将其他编译配置参数添加到
create_compilation_result
Airflow DAG 对象。如需详细了解
请参阅 CodeCompilationConfig
Dataform API 参考文档。
- 将编译配置参数添加到
create_compilation_result
Airflow DAG 对象,请将您选择的参数添加到code_compilation_config
格式如下:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID
- REPOSITORY_ID:Dataform 代码库的名称
- 将 REGION 替换为 Dataform 代码库所在的区域 位于
- GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 代码
- PARAMETER:已选择
CodeCompilationConfig
参数。 您可以添加多个参数。 - PARAMETER_VALUE:所选参数的值
以下代码示例显示了将 defaultDatabase
参数添加到
create_compilation_result
Airflow DAG 对象:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
添加工作流调用配置参数
您可以将额外的工作流调用配置参数添加到
create_workflow_invocation
Airflow DAG 对象。如需详细了解
请参阅 InvocationConfig
Dataform API 参考文档。
- 要将工作流调用配置参数添加到
create_workflow_invocation
Airflow DAG 对象,添加您选择的参数 转换为invocation_config
,格式如下:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
替换以下内容:
- PROJECT_ID:您的 Dataform Google Cloud 项目 ID
- REPOSITORY_ID:Dataform 代码库的名称
- REGION:Dataform 代码库所在的区域 位于
- PARAMETER:已选择
InvocationConfig
参数。您可以添加多个参数。 - PARAMETER_VALUE:所选参数的值
以下代码示例显示了 includedTags[]
和
transitiveDependenciesIncluded
参数已添加到
create_workflow_invocation
Airflow DAG 对象:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
后续步骤
- 如需了解如何为工作流执行配置编译替换,请参阅 配置编译替换。
- 如需详细了解 Dataform API,请参阅 Dataform API。
- 如需详细了解 Cloud Composer 环境,请参阅 Cloud Composer 概览。
- 了解如何使用 Workflows 和 Cloud Scheduler,请参阅 使用 Workflows 和 Cloud Scheduler 安排执行作业。