データリネージの統合を有効にする

Cloud Composer 1 | Cloud Composer 2

データリネージ統合について

データリネージDataplex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。 データリネージは、次の環境で利用できます。

Cloud Composer 環境でこの機能が有効になり、サポートされているオペレーターを利用する DAG を実行すると、Cloud Composer が Data Lineage API にリネージ情報を報告します。

この情報には以下の対象を使用してアクセスできます。

サポートされている演算子

次のオペレータは、Cloud Composer でのリネージの自動レポートをサポートしています。

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

たとえば、次のタスクを実行します。

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

その結果、Dataplex UI で次のリネージグラフが作成されます。

Dataplex UI のリネージグラフの例。
図 1: Dataplex UI での BigQuery テーブルのリネージグラフの例。

Cloud Composer の機能に関する考慮事項

Airflow タスクの実行によりデータリネージが報告されるたびに、次の処理が行われます。

  • リネージ プロセス用の 1 つの作成 / 更新 RPC リクエスト
  • リネージ実行用の 1 つの作成 / 更新 RPC リクエスト
  • リネージ イベントを作成するための 1 つ以上の RPC リクエスト(ほとんどの場合 0 または 1)

これらのエンティティの詳細については、Dataplex のドキュメントのリネージ情報モデルリネージ API リファレンスをご覧ください。

出力されたリネージ トラフィックは、Data Lineage API の割り当ての対象になります。Cloud Composer は書き込み割り当てを消費します。

リネージデータの処理に関連する料金は、リネージの料金の対象となります。データリネージに関する考慮事項をご覧ください。

パフォーマンスへの影響

データリネージは、Airflow タスクの実行の終了時に報告されます。データリネージ レポートの平均的な所要時間は、約 1~2 秒です。

これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。

データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。

コンプライアンス

データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項を参照して、サポートレベルが環境要件を満たしていることを確認してください。

データリネージ統合を操作する

Cloud Composer のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。

  1. プロジェクトで Data Lineage API を有効にする。
  2. 特定の Cloud Composer 環境でデータリネージ統合を有効にする。

準備

環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効になります。

  • プロジェクトで Data Lineage API が有効になっている。詳細については、Dataplex のドキュメントの Data Lineage API の有効化をご覧ください。

  • カスタム リネージ バックエンドが Airflow で構成されていません。

既存の環境では、データリネージ統合をいつでも有効または無効にできます。

必要なロール

データリネージと統合するには、Cloud Composer 環境のサービス アカウントに次の権限を追加する必要があります。

  • デフォルトのサービス アカウントの場合: 変更は必要ありません。デフォルトのサービス アカウントには、必要な権限が含まれています。
  • ユーザー管理のサービス アカウントの場合: Composer ワーカー(roles/composer.worker)ロールをサービス アカウントに付与します。このロールには、データリネージに関して必要な権限がすべて含まれています。

詳細については、Dataplex ドキュメントのリネージのロールと権限をご覧ください。

Cloud Composer でデータリネージを有効にする

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブを選択します。

  4. [Dataplex データリネージ統合] セクションで、[編集] をクリックします。

  5. [Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を有効にする] をオンにして、[保存] をクリックします。

gcloud

--enable-cloud-data-lineage-integration 引数を使用します。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。

    名前は先頭を小文字にして、その後に 62 文字以下の小文字、数字、ハイフンで構成します。末尾をハイフンにすることはできません。環境名は環境のサブコンポーネントの作成に使用されるため、Cloud Storage バケット名としても有効な名前を指定する必要があります。制限事項の一覧については、バケットの命名ガイドラインをご覧ください。

  • LOCATION は、環境のリージョンに置き換えます。

    ロケーションは、環境の GKE クラスタが配置されるリージョンです。

例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

カスタム リネージ イベントを送信する

自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。

たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。

  • BashOperator、タスク定義の inlets パラメータまたは outlets パラメータを変更します。
  • PythonOperator。タスク定義の task.inlets パラメータまたは task.outlets パラメータを変更します。inlets パラメータに AUTO を使用すると、値はアップストリーム タスクの outlets と同じ値に設定されます。

たとえば、次のタスクを実行します。


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

…

bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)


def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))


python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

その結果、Dataplex UI で次のリネージグラフが作成されます。

Dataplex UI でのカスタム イベントのリネージグラフの例。
図 2: Dataplex UI で複数の BigQuery テーブルのリネージグラフを表示したサンプル。

Cloud Composer でデータリネージを無効にする

Cloud Composer 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブを選択します。

  4. [Dataplex データリネージ統合] セクションで、[編集] をクリックします。

  5. [Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を無効にする] を選択して [保存] をクリックします。

gcloud

--disable-cloud-data-lineage-integration 引数を使用します。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。

    名前は先頭を小文字にして、その後に 62 文字以下の小文字、数字、ハイフンで構成します。末尾をハイフンにすることはできません。環境名は環境のサブコンポーネントの作成に使用されるため、Cloud Storage バケット名としても有効な名前を指定する必要があります。制限事項の一覧については、バケットの命名ガイドラインをご覧ください。

  • LOCATION は、環境のリージョンに置き換えます。

    ロケーションは、環境の GKE クラスタが配置されるリージョンです。

例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Cloud Composer でリネージログを表示する

[Dataplex データリネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。

トラブルシューティング

リネージデータが Lineage API に報告されない場合や、Dataplex で表示されない場合は、次のトラブルシューティング手順を試してください。

  • Cloud Composer 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
  • Cloud Composer 環境でデータリネージ統合が有効になっているかどうかを確認します。
  • 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。サポートされる Airflow オペレーターをご覧ください。
  • Cloud Composer のリネージログで、発生する可能性のある問題を確認します。