概要
データリネージは Dataplex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。
データリネージを有効にすると、Spark 向け Dataproc サーバーレスのワークロードはリネージ イベントをキャプチャして、Dataplex Data Lineage API に公開します。Spark 向け Dataproc サーバーレスは、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。
Dataplex でリネージ情報にアクセスするには、 リネージ可視化グラフおよび Data Lineage API をご覧ください。詳細については、Dataplex でリネージグラフを表示するをご覧ください。
可用性、機能、制限事項
BigQuery および Cloud Storage データソースを含む Spark データリネージは、サポートされている Dataproc Serverless for Spark ランタイム バージョンで実行されているワークロードで利用可能ですが、次の例外と制限があります。
- Spark データリネージは、以下に対して提供されていません。
- Dataproc サーバーレス Spark ランタイム バージョン 1.2 ワークロード
- Spark ストリーミング ワークロード
- BigQuery コネクタ バージョン 2(Spark データソース API バージョン 2)
始める前に
Google Cloud コンソールのプロジェクト セレクタのページで、Spark 向け Dataproc サーバーレスのワークロードに使用するプロジェクトを選択します。
Data Lineage API と Data Catalog API を有効にします。
必要なロール
Dataproc Serverless for Spark でデータリネージを使用するために必要な権限を取得するには、Dataproc クラスタ VM サービス アカウントに対して次の IAM ロールを付与するよう、管理者に依頼してください。
- Data Catalog でリネージの可視化を表示するか、Data Lineage API を使用する: データリネージ閲覧者(
roles/datalineage.viewer
) - API を使用してリネージを手動で生成する: データ リネージ イベント プロデューサー(
roles/datalineage.producer
) - API を使用してリネージを編集する: データリネージ編集者(
roles/datalineage.editor
) - リネージですべてのオペレーションを実行する: データリネージ管理者(
roles/datalineage.admin
)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Spark バッチ ワークロードのデータリネージを有効にする
バッチ ワークロードの Spark データ リネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled
プロパティを true
に設定します。
Google Cloud CLI の例:
gcloud dataproc batches submit pyspark FILENAME.py --region=REGION \ --version=1.1 \ --properties=spark.dataproc.lineage.enabled=true \ other args ...
Dataplex でリネージグラフを表示する
リネージの可視化グラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。データリネージ情報は、Google Cloud コンソールのグラフの可視化で表示するか、Data Lineage API から JSON データとして取得します。
詳細については、Google Cloud システムでデータリネージを使用するをご覧ください。
例:
次の Spark ワークロードは、BigQuery テーブルからデータを読み取り、出力を別の BigQuery テーブルに書き込みます。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
この Spark ワークロードにより、Dataplex UI に次のリネージグラフが作成されます。
次のステップ
- 詳しくはデータリネージをご覧ください。