Cloud Composer での実行をスケジュールする

このドキュメントでは、Cloud Composer 2 を使用して Dataform SQL ワークフローをスケジュールに従って実行する方法について説明します。

Cloud Composer 2 を使用して、Dataform の実行をスケジュールできます。Dataform は Cloud Composer 1 をサポートしていません。

Cloud Composer 2 で Dataform の実行スケジュールを管理するには、Airflow 有向非巡回グラフ(DAG)で Dataform 演算子を使用します。Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成できます。

Dataform は、さまざまな Airflow 演算子を提供します。これには、コンパイル結果の取得、ワークフロー呼び出しの取得、ワークフロー呼び出しのキャンセルを行う演算子が含まれます。使用可能な Dataform Airflow オペレーターの完全なリストについては、Google Dataform 演算子をご覧ください。

始める前に

  1. Dataform リポジトリを作成または選択します。
  2. Dataform に BigQuery へのアクセス権を付与します。
  3. Dataform ワークスペースを作成または選択します。
  4. 少なくとも 1 つのテーブルを作成します
  5. Cloud Composer 2 環境を作成する
    1. roles/composer.worker ロールと roles/dataform.editor ロールを Cloud Composer 環境のサービス アカウントに付与します。

google-cloud-dataform PyPi パッケージをインストールする

Cloud Composer 2 バージョン 2.0.25 以降を使用している場合、このパッケージは環境にプリインストールされており、インストールする必要はありません。

以前のバージョンの Cloud Composer 2 を使用している場合は、google-cloud-dataform PyPi パッケージをインストールします。

[PyPI パッケージ] セクションで、バージョン ==0.2.0 を指定します。

Dataform ワークフロー呼び出しをスケジュールする Airflow DAG を作成します

Cloud Composer 2 で Dataform SQL ワークフローのスケジュールされた実行を管理するには:DAG を書き込む使用Dataform Airflow の演算子環境のバケットにアップロードする

次のコードサンプルは、Dataform コンパイル結果を作成し、Dataform ワークフローの呼び出しを開始する Airflow DAG を示しています。

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

次のように置き換えます。

  • PROJECT_ID: Dataform Google Cloud プロジェクト ID
  • REPOSITORY_ID: Dataform リポジトリの名前
  • REGION: Dataform リポジトリが配置されているリージョン
  • COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前
  • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)

次のコードサンプルは、次のような Airflow DAG を示しています。

  1. Dataform のコンパイル結果を作成します。
  2. 非同期 Dataform ワークフローの呼び出しを開始します。
  3. DataformWorkflowInvocationStateSensor を使用して、想定される状態になるまでワークフローのステータスをポーリングします。
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

次のように置き換えます。

  • PROJECT_ID: Dataform Google Cloud プロジェクト ID
  • REPOSITORY_ID: Dataform リポジトリの名前
  • REGION: Dataform リポジトリが配置されているリージョン
  • COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前
  • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)
  • COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前

コンパイル構成パラメータを追加する

コンパイル構成パラメータを create_compilation_result Airflow DAG オブジェクトに追加できます。使用可能なパラメータの詳細については、CodeCompilationConfig Dataform API リファレンスをご覧ください。

  • コンパイル構成パラメータを create_compilation_result Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式で code_compilation_config に追加します。
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

次のように置き換えます。

  • PROJECT_ID: Dataform Google Cloud プロジェクト ID
  • REPOSITORY_ID: Dataform リポジトリの名前
  • REGION は、Dataform リポジトリが配置されているリージョンに置き換えます
  • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)
  • PARAMETER: 選択された CodeCompilationConfig パラメータ。 複数のパラメータを追加できます。
  • PARAMETER_VALUE: 選択したパラメータの値

次のコードサンプルは、create_compilation_result Airflow DAG オブジェクトに追加された defaultDatabase パラメータを示しています。

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

ワークフロー呼び出し構成パラメータを追加する

create_workflow_invocation Airflow DAG オブジェクトにワークフロー呼び出し構成パラメータを追加できます。使用可能なパラメータの詳細については、InvocationConfig Dataform API リファレンスをご覧ください。

  • ワークフロー呼び出し構成パラメータを create_workflow_invocation Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式で invocation_config に追加します。
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

次のように置き換えます。

  • PROJECT_ID: Dataform Google Cloud プロジェクト ID
  • REPOSITORY_ID: Dataform リポジトリの名前
  • REGION: Dataform リポジトリが配置されているリージョン
  • PARAMETER: 選択された InvocationConfig パラメータ。複数のパラメータを追加できます。
  • PARAMETER_VALUE: 選択したパラメータの値

次のサンプルコードは、create_workflow_invocation Airflow DAG オブジェクトに追加された includedTags[] パラメータと transitiveDependenciesIncluded パラメータを示しています。

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

次のステップ