Cloud Composer 2 を使用して、Dataform の実行をスケジュールできます。Dataform は Cloud Composer 1 をサポートしていません。
Cloud Composer 2 で Dataform の実行スケジュールを管理するには、Airflow 有向非巡回グラフ(DAG)で Dataform 演算子を使用します。Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成できます。
Dataform は、さまざまな Airflow 演算子を提供します。これには、コンパイル結果の取得、ワークフロー呼び出しの取得、ワークフロー呼び出しのキャンセルを行う演算子が含まれます。使用可能な Dataform Airflow オペレーターの完全なリストについては、Google Dataform 演算子をご覧ください。
始める前に
- Dataform リポジトリを作成または選択します。
- Dataform に BigQuery へのアクセス権を付与します。
- Dataform ワークスペースを作成または選択します。
- 少なくとも 1 つのテーブルを作成します。
- Cloud Composer 2 環境を作成する
- roles/composer.worker ロールと roles/dataform.editor ロールを Cloud Composer 環境のサービス アカウントに付与します。
google-cloud-dataform
PyPi パッケージをインストールする
Cloud Composer 2 バージョン 2.0.25
以降を使用している場合、このパッケージは環境にプリインストールされており、インストールする必要はありません。
以前のバージョンの Cloud Composer 2 を使用している場合は、google-cloud-dataform
PyPi パッケージをインストールします。
[PyPI パッケージ] セクションで、バージョン ==0.2.0
を指定します。
Dataform ワークフロー呼び出しをスケジュールする Airflow DAG を作成します
Cloud Composer 2 で Dataform SQL ワークフローのスケジュールされた実行を管理するには:DAG を書き込む使用Dataform Airflow の演算子、 環境のバケットにアップロードする。
次のコードサンプルは、Dataform コンパイル結果を作成し、Dataform ワークフローの呼び出しを開始する Airflow DAG を示しています。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
次のように置き換えます。
- PROJECT_ID: Dataform Google Cloud プロジェクト ID
- REPOSITORY_ID: Dataform リポジトリの名前
- REGION: Dataform リポジトリが配置されているリージョン
- COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)
次のコードサンプルは、次のような Airflow DAG を示しています。
- Dataform のコンパイル結果を作成します。
- 非同期 Dataform ワークフローの呼び出しを開始します。
DataformWorkflowInvocationStateSensor
を使用して、想定される状態になるまでワークフローのステータスをポーリングします。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
次のように置き換えます。
- PROJECT_ID: Dataform Google Cloud プロジェクト ID
- REPOSITORY_ID: Dataform リポジトリの名前
- REGION: Dataform リポジトリが配置されているリージョン
- COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)
- COMPILATION_RESULT: このワークフロー呼び出しに使用するコンパイル結果の名前
コンパイル構成パラメータを追加する
コンパイル構成パラメータを create_compilation_result
Airflow DAG オブジェクトに追加できます。使用可能なパラメータの詳細については、CodeCompilationConfig
Dataform API リファレンスをご覧ください。
- コンパイル構成パラメータを
create_compilation_result
Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式でcode_compilation_config
に追加します。
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
次のように置き換えます。
- PROJECT_ID: Dataform Google Cloud プロジェクト ID
- REPOSITORY_ID: Dataform リポジトリの名前
- REGION は、Dataform リポジトリが配置されているリージョンに置き換えます
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)
- PARAMETER: 選択された
CodeCompilationConfig
パラメータ。 複数のパラメータを追加できます。 - PARAMETER_VALUE: 選択したパラメータの値
次のコードサンプルは、create_compilation_result
Airflow DAG オブジェクトに追加された defaultDatabase
パラメータを示しています。
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
ワークフロー呼び出し構成パラメータを追加する
create_workflow_invocation
Airflow DAG オブジェクトにワークフロー呼び出し構成パラメータを追加できます。使用可能なパラメータの詳細については、InvocationConfig
Dataform API リファレンスをご覧ください。
- ワークフロー呼び出し構成パラメータを
create_workflow_invocation
Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式でinvocation_config
に追加します。
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
次のように置き換えます。
- PROJECT_ID: Dataform Google Cloud プロジェクト ID
- REPOSITORY_ID: Dataform リポジトリの名前
- REGION: Dataform リポジトリが配置されているリージョン
- PARAMETER: 選択された
InvocationConfig
パラメータ。複数のパラメータを追加できます。 - PARAMETER_VALUE: 選択したパラメータの値
次のサンプルコードは、create_workflow_invocation
Airflow DAG オブジェクトに追加された includedTags[]
パラメータと transitiveDependenciesIncluded
パラメータを示しています。
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
次のステップ
- ワークフローの実行用にコンパイルのオーバーライドを構成する方法については、コンパイルのオーバーライドを構成するをご覧ください。
- Dataform API の詳細については、Dataform API をご覧ください。
- Cloud Composer 環境の詳細については、Cloud Composer の概要をご覧ください。
- Workflows と Cloud Scheduler を使用して実行をスケジュールする方法については、Workflows と Cloud Scheduler を使用して実行をスケジュールするをご覧ください。