このドキュメントでは、Dataproc Serverless for Spark のバッチ ワークロードとインタラクティブ セッションで、プロジェクト、バッチ ワークロード、インタラクティブ セッションレベルでデータリネージを有効にする方法について説明します。
概要
データリネージは DataPlex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。
Dataproc Serverless for Spark のワークロードとセッションは、リネージ イベントをキャプチャして Dataplex Data Lineage API にパブリッシュします。Spark 向け Dataproc サーバーレスは、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。
Dataplex でリネージ情報にアクセスするには、 リネージ可視化グラフおよび Data Lineage API をご覧ください。詳細については、Dataplex でリネージグラフを表示するをご覧ください。
提供状況、機能、制限事項
BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、Dataproc Serverless for Spark ランタイム バージョン 1.1
、1.2
、2.2
で実行されるワークロードとセッションで利用できます。ただし、次の例外と制限があります。
- データリネージは、SparkR または Spark ストリーミング ワークロードまたはセッションでは使用できません。
始める前に
Google Cloud コンソールのプロジェクト セレクタのページで、Spark 向け Dataproc サーバーレスのワークロードまたはセッションに使用するプロジェクトを選択します。
Data Lineage API と Data Catalog API を有効にします。
必要なロール
Dataproc Serverless for Spark でデータリネージを使用するために必要な権限を取得するには、Dataproc クラスタの VM サービス アカウントに対して次の IAM ロールを付与するよう、管理者に依頼してください。
- Data Catalog でリネージの可視化を表示するか、Data Lineage API を使用する: データリネージ閲覧者(
roles/datalineage.viewer
) - API を使用してリネージを手動で生成する: データ リネージ イベント プロデューサー(
roles/datalineage.producer
) - API を使用してリネージを編集する: データリネージ編集者(
roles/datalineage.editor
) - リネージですべてのオペレーションを実行する: データリネージ管理者(
roles/datalineage.admin
)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
プロジェクト レベルでデータリネージを有効にする
データリネージはプロジェクト レベルで有効にできます。プロジェクト レベルで有効にすると、プロジェクトで実行する後続のバッチ ワークロードとインタラクティブ セッションで Spark リネージが有効になります。
プロジェクト レベルでデータリネージを有効にする方法
プロジェクト レベルでデータリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。
キー | 値 |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
DATAPROC_LINEAGE_ENABLED
メタデータを false
に設定すると、プロジェクト レベルでデータリネージを無効にできます。
Spark バッチ ワークロードのデータリネージを有効にする
バッチ ワークロードでデータリネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled
プロパティを true
に設定します。
バッチ ワークロードの例
この例では、Spark リネージを有効にしてバッチ lineage-example.py
ワークロードを送信します。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
は、BigQuery テーブルからデータを読み取り、出力を別の BigQuery テーブルに書き込みます。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
リネージグラフは Dataplex UI で表示できます。
Spark インタラクティブ セッションのデータリネージを有効にする
Spark インタラクティブ セッションでデータリネージを有効にするには、セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled
プロパティを true
に設定します。
インタラクティブ セッションの例
次の PySpark ノートブック コードは、限定公開の Google アクセス VPC リージョン サブネットで実行される Spark データリネージが有効になっている Dataproc Serverless インタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行し、出力を BigQuery テーブルに書き込む Spark Connect セッションを作成します。
from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("LINEAGE_BQ_TO_BQ")
.dataprocConfig(session)
.getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
データリネージ グラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション ペインに表示されている宛先テーブル名をクリックし、テーブルの詳細ペインでリネージタブを選択します。
Dataplex でリネージグラフを表示する
リネージの可視化グラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。データリネージ情報は、Google Cloud コンソールのグラフの可視化で表示するか、Data Lineage API から JSON データとして取得します。
詳細については、システムでデータリネージを使用する Google Cloud をご覧ください。
次のステップ
- 詳しくはデータリネージをご覧ください。