使用 Cloud Composer 安排执行

本文档介绍了如何使用 Cloud Composer 2 运行 Dataform SQL 工作流的预定执行作业。

您可以使用 Cloud Composer 2 来安排 Dataform 的执行。Dataform 不支持 Cloud Composer 1

如需使用 Cloud Composer 2 管理 Dataform 执行时间表,您可以在 Airflow 有向无环图 (DAG) 中使用 Dataform 运算符。您可以创建 Airflow DAG 来安排 Dataform 工作流调用。

Dataform 提供各种 Airflow 运算符。其中包括用于获取编译结果、获取工作流调用和取消工作流调用的操作。如需查看可用 Dataform Airflow 运算符的完整列表,请参阅 Google Dataform 运算符

准备工作

  1. 选择或创建 Dataform 代码库
  2. 授予 Dataform 对 BigQuery 的访问权限
  3. 选择或创建 Dataform 工作区
  4. 请至少创建一个表
  5. 创建一个 Cloud Composer 2 环境
    1. 向您的 Cloud Composer 环境的服务帐号授予 roles/composer.workerroles/dataform.editor 角色。

安装 google-cloud-dataform PyPi 软件包

如果您使用的是 Cloud Composer 2 版 2.0.25 及更高版本,则此软件包已预安装在您的环境中。您无需安装。

如果您使用的是早期版本的 Cloud Composer 2,请安装 google-cloud-dataform PyPi 软件包

在“PyPI 软件包”部分中,指定版本 ==0.2.0

创建用于安排 Dataform 工作流调用的 Airflow DAG

如需使用 Cloud Composer 2 管理 Dataform SQL 工作流的计划执行,请使用 Dataform Airflow 运算符编写 DAG,然后将其上传到您环境的存储桶

以下代码示例展示了一个 Airflow DAG,用于创建 Dataform 编译结果并启动 Dataform 工作流调用:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

请替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 代码库所在的区域
  • COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称
  • GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git 提交密钥,例如分支或 Git SHA

以下代码示例展示了一个符合以下条件的 Airflow DAG:

  1. 创建 Dataform 编译结果。
  2. 启动异步 Dataform 工作流调用。
  3. 使用 DataformWorkflowInvocationStateSensor 轮询工作流的状态,直到它进入预期状态。
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

请替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 代码库所在的区域
  • COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称
  • GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git Commitish,例如分支或 Git SHA。
  • COMPILATION_RESULT:您要用于此工作流调用的编译结果的名称

添加编译配置参数

您可以向 create_compilation_result Airflow DAG 对象添加其他编译配置参数。如需详细了解可用参数,请参阅 CodeCompilationConfig Dataform API 参考文档

  • 如需将编译配置参数添加到 create_compilation_result Airflow DAG 对象,请按以下格式将所选参数添加到 code_compilation_config
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

请替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION 替换为 Dataform 代码库所在的区域
  • GIT_COMMITISH:您要使用的代码版本的远程 Git 代码库中的 Git Commitish,例如分支或 Git SHA。
  • PARAMETER:选定的 CodeCompilationConfig 参数。 您可以添加多个参数。
  • PARAMETER_VALUE:所选参数的值

以下代码示例展示了添加到 create_compilation_result Airflow DAG 对象的 defaultDatabase 参数:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

添加工作流调用配置参数

您可以向 create_workflow_invocation Airflow DAG 对象添加其他工作流调用配置参数。如需详细了解可用参数,请参阅 InvocationConfig Dataform API 参考文档

  • 如需向 create_workflow_invocation Airflow DAG 对象添加工作流调用配置参数,请按以下格式将所选参数添加到 invocation_config 中:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

请替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 代码库所在的区域
  • PARAMETER:已选择 InvocationConfig 参数。您可以添加多个参数。
  • PARAMETER_VALUE:所选参数的值

以下代码示例展示了添加到 create_workflow_invocation Airflow DAG 对象的 includedTags[]transitiveDependenciesIncluded 参数:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

后续步骤