使用 Cloud Composer 安排执行

本文档介绍了如何运行 Dataform 的预定执行作业 使用 Cloud Composer 2 的 SQL 工作流。

您可以使用 Cloud Composer 2 来安排 Dataform 的执行。 Dataform 不支持 Cloud Composer 1

如需使用 Cloud Composer 2 管理 Dataform 执行时间表,请执行以下操作: 您可以在 Airflow 有向无环图 (DAG) 中使用 Dataform 运算符。你可以创建一个 Dataform 工作流调用。

Dataform 提供各种 Airflow 运算符。这包括运算符 用于获取编译结果、获取工作流调用和取消 工作流调用查看可用 Dataform 的完整列表 Airflow 运算符,请参阅 Google Dataform 运算符

准备工作

  1. 选择或 创建一个 Dataform 代码库
  2. 授予 Dataform 对 BigQuery 的访问权限
  3. 选择或创建 Dataform 工作区
  4. 请至少创建一个表
  5. 创建一个 Cloud Composer 2 环境
    1. 授予 roles/composer.workerroles/dataform.editor 您的 Cloud Composer 环境的服务账号

安装 google-cloud-dataform PyPi 软件包

如果您使用的是 Cloud Composer 2 2.0.25 及更高版本,则此软件包 已预安装在您的环境中。您无需安装。

如果您使用的是早期版本的 Cloud Composer 2, 安装 google-cloud-dataform PyPi 软件包

在“PyPI 软件包”部分中,指定版本 ==0.2.0

创建用于安排 Dataform 工作流调用的 Airflow DAG

使用以下命令管理 Dataform SQL 工作流的预定执行: Cloud Composer 2,编写 DAG 使用 Dataform Airflow 运算符将其上传到环境的存储桶

以下代码示例展示了用于创建 Dataform 的 Airflow DAG 编译结果并启动 Dataform 工作流调用:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 所在的区域 代码库位于
  • COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用
  • GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 输入要使用的代码

以下代码示例展示了一个符合以下条件的 Airflow DAG:

  1. 创建 Dataform 编译结果。
  2. 启动异步 Dataform 工作流调用。
  3. 轮询工作流状态,直到它进入预期状态 使用 DataformWorkflowInvocationStateSensor
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 所在的区域 代码库位于
  • COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用
  • GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 代码
  • COMPILATION_RESULT:编译结果的名称 您要用于此工作流调用

添加编译配置参数

您可以将其他编译配置参数添加到 create_compilation_result Airflow DAG 对象。如需详细了解 请参阅 CodeCompilationConfig Dataform API 参考文档

  • 将编译配置参数添加到 create_compilation_result Airflow DAG 对象,请将您选择的参数添加到 code_compilation_config 格式如下:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION 替换为 Dataform 代码库所在的区域 位于
  • GIT_COMMITISH:Git Commitish,例如 一个分支或 Git SHA, 代码
  • PARAMETER:已选择 CodeCompilationConfig 参数。 您可以添加多个参数。
  • PARAMETER_VALUE:所选参数的值

以下代码示例显示了将 defaultDatabase 参数添加到 create_compilation_result Airflow DAG 对象:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

添加工作流调用配置参数

您可以将额外的工作流调用配置参数添加到 create_workflow_invocation Airflow DAG 对象。如需详细了解 请参阅 InvocationConfig Dataform API 参考文档

  • 要将工作流调用配置参数添加到 create_workflow_invocation Airflow DAG 对象,添加您选择的参数 转换为 invocation_config,格式如下:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

替换以下内容:

  • PROJECT_ID:您的 Dataform Google Cloud 项目 ID
  • REPOSITORY_ID:Dataform 代码库的名称
  • REGION:Dataform 代码库所在的区域 位于
  • PARAMETER:已选择 InvocationConfig 参数。您可以添加多个参数。
  • PARAMETER_VALUE:所选参数的值

以下代码示例显示了 includedTags[]transitiveDependenciesIncluded 参数已添加到 create_workflow_invocation Airflow DAG 对象:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

后续步骤