このドキュメントでは、Dataform で次の操作を行う方法について説明します。
始める前に
ワークフロー構成で実行をスケジュールするか、ワークフローおよび Cloud Scheduler で実行をスケジュールするには、次の操作を行います。
Cloud Composer で実行をスケジュールするには、次の操作を行います。
- Dataform リポジトリを作成または選択します。
- Dataform に BigQuery へのアクセス権を付与します。
- Dataform ワークスペースを作成または選択します。
- 少なくとも 1 つのテーブルを作成します。
- Cloud Composer 2 環境を作成する
必要なロール
このドキュメントのタスクを完了するために必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼してください。
-
リポジトリの Dataform 管理者(
roles/dataform.admin
) -
リポジトリと Cloud Composer 環境のサービス アカウントに対する Dataform 編集者 (
roles/dataform.editor
) -
Cloud Composer 環境のサービス アカウントの Composer ワーカー (
roles/composer.worker
)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
デフォルトの Dataform サービス アカウント以外のサービス アカウントを使用するには、カスタム サービス アカウントへのアクセス権を付与します。
ワークフロー構成で実行をスケジュールする
このセクションでは、Dataform でワークフロー構成を作成し、ワークフローの実行をスケジュールして構成する方法について説明します。ワークフロー構成を使用して、Dataform ワークフローをスケジュールで実行できます。
ワークフロー構成について
BigQuery ですべてのワークフローのアクションまたは一部のワークフローのアクションの Dataform 実行をスケジュールするには、ワークフロー構成を作成します。ワークフロー構成では、コンパイル リリース構成を選択し、実行するワークフロー アクションを選択し、実行スケジュールを設定します。
その後、Dataform は、ワークフロー構成のスケジュールされた実行時に、リリース構成の最新のコンパイル結果から選択したアクションを BigQuery にデプロイします。Dataform API workflowConfigs を使用して、ワークフロー構成の実行を手動でトリガーすることもできます。
Dataform ワークフローの構成には、次の実行設定が含まれています。
- ワークフロー構成の ID。
- リリース構成。
サービス アカウント。
これは、ワークフロー構成に関連付けられているサービス アカウントです。デフォルトの Dataform サービス アカウントまたは Google Cloud プロジェクトに関連付けられたサービス アカウントを選択することも、別のサービス アカウントを手動で入力することもできます。デフォルトでは、ワークフローの構成では、リポジトリと同じサービス アカウントが使用されます。
実行するワークフロー アクション:
- すべてのアクション。
- アクションの選択
- タグの選択。
実行スケジュールとタイムゾーン。
ワークフロー構成を作成する
Dataform ワークフロー構成を作成するには、次の操作を行います。
- リポジトリで、[リリースとスケジュール] に移動します。
- [ワークフロー構成] セクションで、[作成] をクリックします。
[ワークフロー構成を作成] ペインの [構成 ID] フィールドに、ワークフロー構成の一意の ID を入力します。
ID には数字、英字、ハイフン、アンダースコアのみを使用できます。
[リリース構成] メニューで、コンパイル リリース構成を選択します。
(省略可)[頻度] フィールドに、実行の頻度を unix-cron 形式で入力します。
Dataform が対応するリリース構成で最新のコンパイル結果を確実に実行するには、コンパイル結果の作成時刻とスケジュールされた実行時刻の間に少なくとも 1 時間の間隔を設けます。
[サービス アカウント] メニューで、ワークフローの構成に使用するサービス アカウントを選択します。
メニューで、デフォルトの Dataform サービス アカウントまたはアクセス可能な Google Cloud プロジェクトに関連付けられた任意のサービス アカウントを選択できます。サービス アカウントを選択しない場合、ワークフローの構成ではリポジトリのサービス アカウントが使用されます。
省略可: [タイムゾーン] メニューで、実行のタイムゾーンを選択します。
デフォルトのタイムゾーンは UTC です。
実行するワークフロー アクションを選択します。
- ワークフロー全体を実行するには、[すべてのアクション] をクリックします。
- ワークフローで選択したアクションを実行するには、[アクションの選択] をクリックして、アクションを選択します。
- 選択したタグでアクションを実行するには、[タグの選択] をクリックしてタグを選択します。
- 省略可: 選択したアクションまたはタグとそれらの依存関係を実行するには、[依存関係を含める] オプションを選択します。
- 省略可: 選択したアクションまたはタグとそれらの依存関係を実行するには、[依存者を含める] オプションを選択します。
- 省略可: すべてのテーブルをゼロから再構築するには、[フル更新で実行] オプションを選択します。
このオプションを使用しない場合、Dataform は、増分テーブルをゼロから再構築せずに更新します。
[作成] をクリックします。
たとえば、次のワークフロー構成では、CEST タイムゾーンで hourly
タグを使用してアクションが 1 時間ごとに実行されます。
- 構成 ID:
production-hourly
- リリース構成:
- 頻度:
0 * * * *
- タイムゾーン:
Central European Summer Time (CEST)
- ワークフロー アクションの選択: タグの選択、
hourly
タグ
ワークフロー構成を編集する
ワークフロー構成を編集するには、次の手順を行います。
- リポジトリで、[リリースとスケジュール] に移動します。
- 編集するワークフロー構成で、 [その他] メニューをクリックし、[編集] をクリックします。
- [ワークフロー構成を編集] ペインで、リリース構成の設定を編集し、[保存] をクリックします。
ワークフロー構成を削除する
ワークフロー構成を作成するには、次の手順を行います。
- リポジトリで、[リリースとスケジュール] に移動します。
- 削除するワークフロー構成で、 [その他] メニューをクリックし、[削除] をクリックします。
- [リリース構成の削除] ダイアログで、[削除] をクリックします。
Workflows と Cloud Scheduler を使用して実行をスケジュールする
このセクションでは、Workflows と Cloud Scheduler を使用して Dataform ワークフローの実行をスケジュールする方法について説明します。
スケジュール設定されたワークフローの実行について
Workflows ワークフローをトリガーする Cloud Scheduler ジョブを作成することで、Dataform ワークフローの実行頻度を設定できます。Workflows は、定義したオーケストレーション ワークフローでサービスを実行します。
Workflows は、Dataform ワークフローを 2 段階のプロセスで実行します。まず、Git プロバイダから Dataform リポジトリ コードを pull してコンパイルし、コンパイル結果を導きます。次に、そのコンパイル結果を使用して Dataform ワークフローを作成し、設定した頻度で実行します。
スケジュール設定されたオーケストレーション ワークフローを作成する
Dataform ワークフローの実行をスケジュールするには、Workflows を使用してオーケストレーション ワークフローを作成し、Cloud Scheduler ジョブをトリガーとして追加します。
Workflows はサービス アカウントを使用して、ワークフローがGoogle Cloud リソースにアクセスできるようにします。サービス アカウントを作成し、Dataform 編集者(
roles/dataform.editor
)Identity and Access Management ロールと、オーケストレーション ワークフローの管理に必要な最小限の権限を付与します。詳細については、 Google Cloud リソースにアクセスする権限をワークフローに付与するをご覧ください。オーケストレーション ワークフローを作成し、次の YAML ソースコードをワークフロー定義として使用します。
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクトの ID。
- REPOSITORY_LOCATION: Dataform リポジトリのロケーション。
- REPOSITORY_ID: Dataform リポジトリの名前。
- GIT_COMMITISH: Dataform コードを実行する Git ブランチ。新しく作成したリポジトリの場合は、
main
に置き換えます。
Dataform ワークフローの作成コンパイル結果リクエストをカスタマイズする
既存のオケストレーション ワークフローを更新し、Dataform ワークフローの作成コンパイル結果リクエスト設定を YAML 形式で定義できます。設定の詳細については、projects.locations.repositories.compilationResults
REST リソース リファレンスをご覧ください。
たとえば、コンパイル時にすべてのアクションに _dev
schemaSuffix
設定を追加するには、createCompilationResult
ステップの本文を次のコード スニペットに置き換えます。
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
追加の設定を Workflows 実行リクエストでランタイム引数として渡し、変数を使用してこれらの引数にアクセスすることもできます。詳細については、実行リクエストでランタイム引数を渡すをご覧ください。
Dataform ワークフロー呼び出しリクエストをカスタマイズする
既存のオケストレーション ワークフローを更新し、YAML 形式で Dataform ワークフローの呼び出しリクエスト設定を定義できます。呼び出しリクエストの設定の詳細については、projects.locations.repositories.workflowInvocations
REST リソース リファレンスをご覧ください。
たとえば、すべての伝播依存関係を含む hourly
タグでアクションのみを実行するには、createWorkflowInvocation
の本文を次のコード スニペットに置き換えます。
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
追加の設定を Workflows 実行リクエストでランタイム引数として渡し、変数を使用してこれらの引数にアクセスすることもできます。詳細については、実行リクエストでランタイム引数を渡すをご覧ください。
Cloud Composer で実行をスケジュールする
Cloud Composer 2 を使用して、Dataform の実行をスケジュールできます。Dataform は Cloud Composer 1 をサポートしていません。
Cloud Composer 2 で Dataform の実行スケジュールを管理するには、Airflow 有向非巡回グラフ(DAG)で Dataform 演算子を使用します。Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成できます。
Dataform は、さまざまな Airflow 演算子を提供します。これには、コンパイル結果の取得、ワークフロー呼び出しの取得、ワークフロー呼び出しのキャンセルを行う演算子が含まれます。使用可能な Dataform Airflow オペレーターの完全なリストについては、Google Dataform 演算子をご覧ください。
google-cloud-dataform
PyPi パッケージをインストールする
Cloud Composer 2 バージョン 2.0.25
以降を使用している場合、このパッケージは環境にプリインストールされており、インストールは不要です。
以前のバージョンの Cloud Composer 2 を使用している場合は、google-cloud-dataform
PyPi パッケージをインストールします。
[PyPI パッケージ] セクションで、バージョン ==0.2.0
を指定します。
Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成する
Cloud Composer 2 で Dataform ワークフローのスケジュール実行を管理するには、Dataform Airflow 演算子を使用して DAG を記述し、環境のバケットにアップロードします。
次のコードサンプルは、Dataform コンパイル結果を作成し、Dataform ワークフローの呼び出しを開始する Airflow DAG を示しています。
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
次のように置き換えます。
- PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
- REPOSITORY_ID: Dataform リポジトリの名前。
- REGION: Dataform リポジトリが配置されているリージョン。
- COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。
次のコードサンプルは、次のことを実行する Airflow DAG を示しています。
- Dataform コンパイル結果を作成します。
- 非同期の Dataform ワークフローの呼び出しを開始します。
DataformWorkflowInvocationStateSensor
を使用して、ワークフローが期待される状態になるまでステータスをポーリングします。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
次のように置き換えます。
- PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
- REPOSITORY_ID: Dataform リポジトリの名前。
- REGION: Dataform リポジトリが配置されているリージョン。
- COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。
- COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。
コンパイル構成パラメータを追加する
create_compilation_result
Airflow DAG オブジェクトに、コンパイル構成パラメータを追加できます。使用可能なパラメータの詳細については、CodeCompilationConfig
Dataform API リファレンスをご覧ください。
コンパイル構成パラメータを
create_compilation_result
Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式でcode_compilation_config
フィールドに追加します。create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
次のように置き換えます。
- PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
- REPOSITORY_ID: Dataform リポジトリの名前。
- REGION: Dataform リポジトリが配置されているリージョン。
- GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。
- PARAMETER: 選択した
CodeCompilationConfig
パラメータ。複数のパラメータを追加できます。 - PARAMETER_VALUE: 選択したパラメータの値。
次のコードサンプルは、create_compilation_result
Airflow DAG オブジェクトに追加された defaultDatabase
パラメータを示しています。
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
ワークフローの呼び出し構成パラメータを追加する
ワークフローの呼び出し構成パラメータを create_workflow_invocation
Airflow DAG オブジェクトに追加できます。使用可能なパラメータの詳細については、InvocationConfig
Dataform API リファレンスをご覧ください。
ワークフローの呼び出し構成パラメータを
create_workflow_invocation
Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式でinvocation_config
フィールドに追加します。create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
次のように置き換えます。
- PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
- REPOSITORY_ID: Dataform リポジトリの名前。
- REGION: Dataform リポジトリが配置されているリージョン。
- PARAMETER: 選択した
InvocationConfig
パラメータ。複数のパラメータを追加できます。 - PARAMETER_VALUE: 選択したパラメータの値。
次のコードサンプルは、create_workflow_invocation
Airflow DAG オブジェクトに追加された includedTags[]
パラメータと transitiveDependenciesIncluded
パラメータを示しています。
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
次のステップ
- Dataform コンパイル リリース構成を構成する方法については、リリース構成を作成するをご覧ください。
- Dataform のコード ライフサイクルの詳細については、Dataform のコード ライフサイクルの概要をご覧ください。
- Dataform API の詳細については、Dataform API をご覧ください。
- Cloud Composer 環境の詳細については、Cloud Composer の概要をご覧ください。
- Workflows の料金の詳細については、Workflows の料金をご覧ください。