使用 Cloud Composer 安排执行

本文档介绍了如何使用 Cloud Composer 2 运行 Dataform SQL 工作流的计划执行作业。

关于使用 Cloud Composer 2 执行计划的工作流

如需使用 Cloud Composer 2 管理 Dataform 的执行计划,您可以在 Airflow DAG 中使用 Dataform 运算符。您可以创建一个 Airflow DAG,以安排 Dataform 工作流调用。

Dataform 提供各种 Airflow 运算符,例如,用于获取编译结果、获取工作流调用或取消工作流调用的运算符。如需查看可用 Dataform Airflow 运算符的完整列表,请参阅 Google Dataform 运算符

准备工作

  1. 选择或创建 Dataform 代码库
  2. 向 Dataform 授予对 BigQuery 的访问权限
  3. 选择或创建 Dataform 工作区
  4. 请至少创建一个表
  5. 创建一个 Cloud Composer 2 环境
    1. roles/composer.workerroles/dataform.editor 角色授予您的 Cloud Composer 环境的服务帐号

安装 google-cloud-dataform PyPi 软件包

如果您使用 Cloud Composer 2 及更高版本 2.0.25,则此软件包已预安装在您的环境中。您无需安装。

如果您使用的是早期版本的 Cloud Composer 2,请安装 google-cloud-dataform PyPi 软件包

在 PyPI 软件包部分中,指定版本 ==0.2.0

创建用于安排 Dataform 工作流调用的 Airflow DAG

如需使用 Cloud Composer 2 管理 Dataform SQL 工作流的计划执行,请使用 Dataform Airflow 运算符写入 DAG,然后将其上传到环境的存储桶

以下代码示例展示了 Airflow DAG,后者创建了 Dataform 编译结果并启动了 Dataform 工作流调用:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

请替换以下内容:

  • PROJECT_ID 替换为您的 Dataform Google Cloud 项目 ID。
  • REPOSITORY_ID 替换为 Dataform 代码库的名称。
  • REGION 替换为 Dataform 代码库所在的区域。
  • COMPILATION_RESULT 替换为要用于此工作流调用的编译结果的名称。
  • GIT_COMMITISH 与 Git 提交者(例如,分支)或 Git SHA(在您要使用的代码版本的远程 Git 代码库中)中。

以下代码示例展示了一个 Airflow DAG:

  1. 创建 Dataform 编译结果。
  2. 启动异步 Dataform 工作流调用。
  3. 使用 DataformWorkflowInvocationStateSensor 轮询工作流的状态,直到其进入预期状态。
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

请替换以下内容:

  • PROJECT_ID 替换为您的 Dataform Google Cloud 项目 ID。
  • REPOSITORY_ID 替换为 Dataform 代码库的名称。
  • REGION 替换为 Dataform 代码库所在的区域。
  • COMPILATION_RESULT 替换为要用于此工作流调用的编译结果的名称。
  • 将 Git 与要使用的代码版本的远程 Git 代码库中的 GIT_COMMITISH(带有 Git 提交分支)关联。
  • COMPILATION_RESULT 替换为要用于此工作流调用的编译结果的名称。

添加编译配置参数

您可以向 create_compilation_result Airflow DAG 对象中添加其他编译配置参数。如需详细了解可用的参数,请参阅 CodeCompilationConfig Dataform API 参考文档

  • 如需将编译配置参数添加到 create_compilation_result Airflow DAG 对象,请按以下格式将所选参数添加到 code_compilation_config
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

请替换以下内容:

  • PROJECT_ID 替换为您的 Dataform Google Cloud 项目 ID。
  • REPOSITORY_ID 替换为 Dataform 代码库的名称。
  • REGION 替换为 Dataform 代码库所在的区域。
  • 将 Git 与要使用的代码版本的远程 Git 代码库中的 GIT_COMMITISH(带有 Git 提交分支)关联。
  • PARAMETER 与选定的 CodeCompilationConfig 参数。您可以添加多个参数。
  • PARAMETER_VALUE,其中包含所选参数的值。

以下代码示例显示了添加到 create_compilation_result Airflow DAG 对象中的 defaultDatabase 参数:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

添加工作流调用配置参数

您可以将其他工作流调用配置参数添加到 create_workflow_invocation Airflow DAG 对象中。如需详细了解可用的参数,请参阅 InvocationConfig Dataform API 参考文档

  • 如需将工作流调用配置参数添加到 create_workflow_invocation Airflow DAG 对象,请按照以下格式将所选参数添加到 invocation_config
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

请替换以下内容:

  • PROJECT_ID 替换为您的 Dataform Google Cloud 项目 ID。
  • REPOSITORY_ID 替换为 Dataform 代码库的名称。
  • REGION 替换为 Dataform 代码库所在的区域。
  • 具有所选 InvocationConfig 参数的 PARAMETER。您可以添加多个参数。
  • PARAMETER_VALUE,其中包含所选参数的值。

以下代码示例显示了添加到 create_workflow_invocation Airflow DAG 对象中的 includedTags[]transitiveDependenciesIncluded 参数:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

后续步骤