実行をスケジュールする

このドキュメントでは、Dataform で次の操作を行う方法について説明します。

始める前に

ワークフロー構成で実行をスケジュールするか、ワークフローおよび Cloud Scheduler で実行をスケジュールするには、次の操作を行います。

  1. Google Cloud コンソールの [Dataform] ページに移動します。

    Dataform に移動

  2. リポジトリを作成または選択します。

  3. リリース構成を作成する

Cloud Composer で実行をスケジュールするには、次の操作を行います。

  1. Dataform リポジトリを作成または選択します。
  2. Dataform に BigQuery へのアクセス権を付与します
  3. Dataform ワークスペースを作成または選択します。
  4. 少なくとも 1 つのテーブルを作成します。
  5. Cloud Composer 2 環境を作成する

必要なロール

このドキュメントのタスクを完了するために必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

デフォルトの Dataform サービス アカウント以外のサービス アカウントを使用するには、カスタム サービス アカウントへのアクセス権を付与します。

ワークフロー構成で実行をスケジュールする

このセクションでは、Dataform でワークフロー構成を作成し、ワークフローの実行をスケジュールして構成する方法について説明します。ワークフロー構成を使用して、Dataform ワークフローをスケジュールで実行できます。

ワークフロー構成について

BigQuery ですべてのワークフローのアクションまたは一部のワークフローのアクションの Dataform 実行をスケジュールするには、ワークフロー構成を作成します。ワークフロー構成では、コンパイル リリース構成を選択し、実行するワークフロー アクションを選択し、実行スケジュールを設定します。

その後、Dataform は、ワークフロー構成のスケジュールされた実行時に、リリース構成の最新のコンパイル結果から選択したアクションを BigQuery にデプロイします。Dataform API workflowConfigs を使用して、ワークフロー構成の実行を手動でトリガーすることもできます。

Dataform ワークフローの構成には、次の実行設定が含まれています。

  • ワークフロー構成の ID。
  • リリース構成。
  • サービス アカウント。

    これは、ワークフロー構成に関連付けられているサービス アカウントです。デフォルトの Dataform サービス アカウントまたは Google Cloud プロジェクトに関連付けられたサービス アカウントを選択することも、別のサービス アカウントを手動で入力することもできます。デフォルトでは、ワークフローの構成では、リポジトリと同じサービス アカウントが使用されます。

  • 実行するワークフロー アクション:

    • すべてのアクション。
    • アクションの選択
    • タグの選択。
  • 実行スケジュールとタイムゾーン。

ワークフロー構成を作成する

Dataform ワークフロー構成を作成するには、次の操作を行います。

  1. リポジトリで、[リリースとスケジュール] に移動します。
  2. [ワークフロー構成] セクションで、[作成] をクリックします。
  3. [ワークフロー構成を作成] ペインの [構成 ID] フィールドに、ワークフロー構成の一意の ID を入力します。

    ID には数字、英字、ハイフン、アンダースコアのみを使用できます。

  4. [リリース構成] メニューで、コンパイル リリース構成を選択します。

  5. (省略可)[頻度] フィールドに、実行の頻度を unix-cron 形式で入力します。

    Dataform が対応するリリース構成で最新のコンパイル結果を確実に実行するには、コンパイル結果の作成時刻とスケジュールされた実行時刻の間に少なくとも 1 時間の間隔を設けます。

  6. [サービス アカウント] メニューで、ワークフローの構成に使用するサービス アカウントを選択します。

    メニューで、デフォルトの Dataform サービス アカウントまたはアクセス可能な Google Cloud プロジェクトに関連付けられた任意のサービス アカウントを選択できます。サービス アカウントを選択しない場合、ワークフローの構成ではリポジトリのサービス アカウントが使用されます。

  7. 省略可: [タイムゾーン] メニューで、実行のタイムゾーンを選択します。

    デフォルトのタイムゾーンは UTC です。

  8. 実行するワークフロー アクションを選択します。

    • ワークフロー全体を実行するには、[すべてのアクション] をクリックします。
    • ワークフローで選択したアクションを実行するには、[アクションの選択] をクリックして、アクションを選択します。
    • 選択したタグでアクションを実行するには、[タグの選択] をクリックしてタグを選択します。
    • 省略可: 選択したアクションまたはタグとそれらの依存関係を実行するには、[依存関係を含める] オプションを選択します。
    • 省略可: 選択したアクションまたはタグとそれらの依存関係を実行するには、[依存者を含める] オプションを選択します。
    • 省略可: すべてのテーブルをゼロから再構築するには、[フル更新で実行] オプションを選択します。

    このオプションを使用しない場合、Dataform は、増分テーブルをゼロから再構築せずに更新します。

  9. [作成] をクリックします。

たとえば、次のワークフロー構成では、CEST タイムゾーンで hourly タグを使用してアクションが 1 時間ごとに実行されます。

  • 構成 IDproduction-hourly
  • リリース構成
  • 頻度0 * * * *
  • タイムゾーンCentral European Summer Time (CEST)
  • ワークフロー アクションの選択: タグの選択、hourly タグ

ワークフロー構成を編集する

ワークフロー構成を編集するには、次の手順を行います。

  1. リポジトリで、[リリースとスケジュール] に移動します。
  2. 編集するワークフロー構成で、 [その他] メニューをクリックし、[編集] をクリックします。
  3. [ワークフロー構成を編集] ペインで、リリース構成の設定を編集し、[保存] をクリックします。

ワークフロー構成を削除する

ワークフロー構成を作成するには、次の手順を行います。

  1. リポジトリで、[リリースとスケジュール] に移動します。
  2. 削除するワークフロー構成で、 [その他] メニューをクリックし、[削除] をクリックします。
  3. [リリース構成の削除] ダイアログで、[削除] をクリックします。

Workflows と Cloud Scheduler を使用して実行をスケジュールする

このセクションでは、Workflows と Cloud Scheduler を使用して Dataform ワークフローの実行をスケジュールする方法について説明します。

スケジュール設定されたワークフローの実行について

Workflows ワークフローをトリガーする Cloud Scheduler ジョブを作成することで、Dataform ワークフローの実行頻度を設定できます。Workflows は、定義したオーケストレーション ワークフローでサービスを実行します。

Workflows は、Dataform ワークフローを 2 段階のプロセスで実行します。まず、Git プロバイダから Dataform リポジトリ コードを pull してコンパイルし、コンパイル結果を導きます。次に、そのコンパイル結果を使用して Dataform ワークフローを作成し、設定した頻度で実行します。

スケジュール設定されたオーケストレーション ワークフローを作成する

Dataform ワークフローの実行をスケジュールするには、Workflows を使用してオーケストレーション ワークフローを作成し、Cloud Scheduler ジョブをトリガーとして追加します。

  1. Workflows はサービス アカウントを使用して、ワークフローがGoogle Cloud リソースにアクセスできるようにします。サービス アカウントを作成し、Dataform 編集者roles/dataform.editor)Identity and Access Management ロールと、オーケストレーション ワークフローの管理に必要な最小限の権限を付与します。詳細については、 Google Cloud リソースにアクセスする権限をワークフローに付与するをご覧ください。

  2. オーケストレーション ワークフローを作成し、次の YAML ソースコードをワークフロー定義として使用します。

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    次のように置き換えます。

    • PROJECT_ID: Google Cloud プロジェクトの ID。
    • REPOSITORY_LOCATION: Dataform リポジトリのロケーション。
    • REPOSITORY_ID: Dataform リポジトリの名前。
    • GIT_COMMITISH: Dataform コードを実行する Git ブランチ。新しく作成したリポジトリの場合は、main に置き換えます。
  3. Cloud Scheduler を使用してオーケストレーション ワークフローをスケジュールする

Dataform ワークフローの作成コンパイル結果リクエストをカスタマイズする

既存のオケストレーション ワークフローを更新し、Dataform ワークフローの作成コンパイル結果リクエスト設定を YAML 形式で定義できます。設定の詳細については、projects.locations.repositories.compilationResults REST リソース リファレンスをご覧ください。

たとえば、コンパイル時にすべてのアクションに _dev schemaSuffix 設定を追加するには、createCompilationResult ステップの本文を次のコード スニペットに置き換えます。

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

追加の設定を Workflows 実行リクエストでランタイム引数として渡し、変数を使用してこれらの引数にアクセスすることもできます。詳細については、実行リクエストでランタイム引数を渡すをご覧ください。

Dataform ワークフロー呼び出しリクエストをカスタマイズする

既存のオケストレーション ワークフローを更新し、YAML 形式で Dataform ワークフローの呼び出しリクエスト設定を定義できます。呼び出しリクエストの設定の詳細については、projects.locations.repositories.workflowInvocations REST リソース リファレンスをご覧ください。

たとえば、すべての伝播依存関係を含む hourly タグでアクションのみを実行するには、createWorkflowInvocation の本文を次のコード スニペットに置き換えます。

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

追加の設定を Workflows 実行リクエストでランタイム引数として渡し、変数を使用してこれらの引数にアクセスすることもできます。詳細については、実行リクエストでランタイム引数を渡すをご覧ください。

Cloud Composer で実行をスケジュールする

Cloud Composer 2 を使用して、Dataform の実行をスケジュールできます。Dataform は Cloud Composer 1 をサポートしていません。

Cloud Composer 2 で Dataform の実行スケジュールを管理するには、Airflow 有向非巡回グラフ(DAG)で Dataform 演算子を使用します。Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成できます。

Dataform は、さまざまな Airflow 演算子を提供します。これには、コンパイル結果の取得、ワークフロー呼び出しの取得、ワークフロー呼び出しのキャンセルを行う演算子が含まれます。使用可能な Dataform Airflow オペレーターの完全なリストについては、Google Dataform 演算子をご覧ください。

google-cloud-dataform PyPi パッケージをインストールする

Cloud Composer 2 バージョン 2.0.25 以降を使用している場合、このパッケージは環境にプリインストールされており、インストールは不要です。

以前のバージョンの Cloud Composer 2 を使用している場合は、google-cloud-dataform PyPi パッケージをインストールします。

[PyPI パッケージ] セクションで、バージョン ==0.2.0 を指定します。

Dataform ワークフローの呼び出しをスケジュールする Airflow DAG を作成する

Cloud Composer 2 で Dataform ワークフローのスケジュール実行を管理するには、Dataform Airflow 演算子を使用して DAG を記述し、環境のバケットにアップロードします。

次のコードサンプルは、Dataform コンパイル結果を作成し、Dataform ワークフローの呼び出しを開始する Airflow DAG を示しています。

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

次のように置き換えます。

  • PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
  • REPOSITORY_ID: Dataform リポジトリの名前。
  • REGION: Dataform リポジトリが配置されているリージョン。
  • COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。
  • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。

次のコードサンプルは、次のことを実行する Airflow DAG を示しています。

  1. Dataform コンパイル結果を作成します。
  2. 非同期の Dataform ワークフローの呼び出しを開始します。
  3. DataformWorkflowInvocationStateSensor を使用して、ワークフローが期待される状態になるまでステータスをポーリングします。
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

次のように置き換えます。

  • PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
  • REPOSITORY_ID: Dataform リポジトリの名前。
  • REGION: Dataform リポジトリが配置されているリージョン。
  • COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。
  • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。
  • COMPILATION_RESULT: このワークフローの呼び出しに使用するコンパイル結果の名前。

コンパイル構成パラメータを追加する

create_compilation_result Airflow DAG オブジェクトに、コンパイル構成パラメータを追加できます。使用可能なパラメータの詳細については、CodeCompilationConfig Dataform API リファレンスをご覧ください。

  • コンパイル構成パラメータを create_compilation_result Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式で code_compilation_config フィールドに追加します。

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    次のように置き換えます。

    • PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
    • REPOSITORY_ID: Dataform リポジトリの名前。
    • REGION: Dataform リポジトリが配置されているリージョン。
    • GIT_COMMITISH: 使用するコードのバージョンのリモート Git リポジトリ内の Git Commitish(ブランチ、Git SHA など)。
    • PARAMETER: 選択した CodeCompilationConfig パラメータ。複数のパラメータを追加できます。
    • PARAMETER_VALUE: 選択したパラメータの値。

次のコードサンプルは、create_compilation_result Airflow DAG オブジェクトに追加された defaultDatabase パラメータを示しています。

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

ワークフローの呼び出し構成パラメータを追加する

ワークフローの呼び出し構成パラメータを create_workflow_invocation Airflow DAG オブジェクトに追加できます。使用可能なパラメータの詳細については、InvocationConfig Dataform API リファレンスをご覧ください。

  • ワークフローの呼び出し構成パラメータを create_workflow_invocation Airflow DAG オブジェクトに追加するには、選択したパラメータを次の形式で invocation_config フィールドに追加します。

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    次のように置き換えます。

    • PROJECT_ID: Dataform の Google Cloud プロジェクト ID。
    • REPOSITORY_ID: Dataform リポジトリの名前。
    • REGION: Dataform リポジトリが配置されているリージョン。
    • PARAMETER: 選択した InvocationConfig パラメータ。複数のパラメータを追加できます。
    • PARAMETER_VALUE: 選択したパラメータの値。

次のコードサンプルは、create_workflow_invocation Airflow DAG オブジェクトに追加された includedTags[] パラメータと transitiveDependenciesIncluded パラメータを示しています。

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

次のステップ