Dataproc でデータリネージを使用する

データリネージは Dataplex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。

データリネージは、Dataproc Compute Engine 2.0.74 以降と 2.1.22 以降のイメージで、SparkR を除くすべての Dataproc Spark ジョブで使用できます。リネージは、BigQuery と Cloud Storage のデータソースで使用できます。

Dataproc クラスタでこの機能を有効にすると、Dataproc Spark ジョブはリネージ イベントをキャプチャし、Dataplex Data Lineage API にパブリッシュします。Dataproc は、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。

リネージの情報には、以下を使用して Dataplex を介してアクセスできます。

制限事項

リネージは以下ではサポートされていません。

  • BigQuery Connector バージョン 2(Spark のデータソース API バージョン 2)
  • Spark ストリーミング ワークロード

準備

  1. Google Cloud コンソールの [プロジェクト セレクタ] ページで、リネージを追跡する Dataproc クラスタを含むプロジェクトを選択します。

    プロジェクト セレクタに移動

  2. Data Lineage API と Data Catalog API を有効にします。

    API を有効にする

必要なロール

Dataproc でデータリネージを使用するために必要な権限を取得するには、管理者に Dataproc クラスタ VM サービス アカウントに対して次の IAM ロールを付与するよう依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Dataproc でデータリネージを有効にする

クラスタ内の送信されたすべての Spark ジョブで、リネージ情報が Data Lineage API に報告されるように、クラスタレベルでリネージを有効にします。

Dataproc クラスタを作成する

プロパティ dataproc:dataproc.lineage.enabledtrue に設定して Dataproc クラスタを作成します。

gcloud dataproc clusters create CLUSTER_NAME \
--region REGION \
--zone ZONE \
--project PROJECT_ID \
--properties 'dataproc:dataproc.lineage.enabled=true' \
--scopes https://www.googleapis.com/auth/cloud-platform

Spark ジョブの送信

リネージを有効にして作成された Dataproc クラスタで Spark ジョブを送信すると、Dataproc はリネージ情報をキャプチャして Data Lineage API に報告します。

gcloud dataproc jobs submit spark \
--project PROJECT_ID \
--cluster=CLUSTER_NAME \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

プロパティ spark.openlineage.namespacespark.openlineage.appName は省略可能で、ジョブを一意に識別するために使用されます。これらのプロパティを渡さない場合、Dataproc は次のデフォルト値を使用します。

  • spark.openlineage.namespace のデフォルト値: PROJECT_ID
  • spark.openlineage.appName のデフォルト値: spark.app.name

Dataplex でリネージグラフを表示する

リネージの可視化グラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。データリネージ情報は、Google Cloud コンソールでグラフの可視化の形式で、または JSON データの形式で Data Lineage API から取得できます。

詳細については、Dataplex UI でリネージグラフを表示するをご覧ください。

BigQuery テーブルからデータを読み取り、別の BigQuery テーブルに書き込む、次の Spark ジョブを考えてみます。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

この Spark ジョブは、Dataplex UI に次のリネージグラフを作成します。

リネージグラフの例

Dataproc でデータリネージを無効にする

クラスタを作成するときにリネージを有効にした後は、クラスタレベルでリネージを無効にすることはできません。Dataproc クラスタでリネージを無効にするには、dataproc:dataproc.lineage.enabled プロパティなしでクラスタを再作成します。

リネージを有効にして作成されたクラスタで特定のジョブのリネージを無効にするには、ジョブの送信時に空の値で spark.extraListeners プロパティを渡す必要があります。

次のステップ