Spark データリネージ

概要

データリネージは Dataplex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。

データリネージを有効にすると、Spark 向け Dataproc サーバーレスのワークロードはリネージ イベントをキャプチャして、Dataplex Data Lineage API に公開します。Spark 向け Dataproc サーバーレスは、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。

Dataplex でリネージ情報にアクセスするには、 リネージ可視化グラフおよび Data Lineage API をご覧ください。詳細については、Dataplex でリネージグラフを表示するをご覧ください。

可用性、機能、制限事項

BigQuery および Cloud Storage データソースを含む Spark データリネージは、サポートされている Dataproc Serverless for Spark ランタイム バージョンで実行されているワークロードで利用可能ですが、次の例外と制限があります。

  • Spark データリネージは、以下に対して提供されていません。
    • Dataproc サーバーレス Spark ランタイム バージョン 1.2 ワークロード
    • Spark ストリーミング ワークロード
    • BigQuery コネクタ バージョン 2(Spark データソース API バージョン 2)

始める前に

  1. Google Cloud コンソールのプロジェクト セレクタのページで、Spark 向け Dataproc サーバーレスのワークロードに使用するプロジェクトを選択します。

    プロジェクト セレクタに移動

  2. Data Lineage API と Data Catalog API を有効にします。

    API を有効にする

必要なロール

Dataproc Serverless for Spark でデータリネージを使用するために必要な権限を取得するには、Dataproc クラスタ VM サービス アカウントに対して次の IAM ロールを付与するよう、管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Spark バッチ ワークロードのデータリネージを有効にする

バッチ ワークロードの Spark データ リネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。

Google Cloud CLI の例:

gcloud dataproc batches submit pyspark FILENAME.py
    --region=REGION \
    --version=1.1 \
    --properties=spark.dataproc.lineage.enabled=true \
    other args ...

Dataplex でリネージグラフを表示する

リネージの可視化グラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。データリネージ情報は、Google Cloud コンソールのグラフの可視化で表示するか、Data Lineage API から JSON データとして取得します。

詳細については、Google Cloud システムでデータリネージを使用するをご覧ください。

例:

次の Spark ワークロードは、BigQuery テーブルからデータを読み取り、出力を別の BigQuery テーブルに書き込みます。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

この Spark ワークロードにより、Dataplex UI に次のリネージグラフが作成されます。

リネージグラフの例

次のステップ