Dataproc Serverless でデータリネージを使用する

このドキュメントでは、Dataproc Serverless for Spark のバッチ ワークロードとインタラクティブ セッションで、プロジェクトバッチ ワークロードインタラクティブ セッションレベルでデータリネージを有効にする方法について説明します。

概要

データリネージは DataPlex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。

Dataproc Serverless for Spark のワークロードとセッションは、リネージ イベントをキャプチャして Dataplex Data Lineage API にパブリッシュします。Spark 向け Dataproc サーバーレスは、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。

Dataplex でリネージ情報にアクセスするには、 リネージ可視化グラフおよび Data Lineage API をご覧ください。詳細については、Dataplex でリネージグラフを表示するをご覧ください。

提供状況、機能、制限事項

BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、Dataproc Serverless for Spark ランタイム バージョン 1.11.22.2 で実行されるワークロードとセッションで利用できます。ただし、次の例外と制限があります。

  • データリネージは、SparkR または Spark ストリーミング ワークロードまたはセッションでは使用できません。

始める前に

  1. Google Cloud コンソールのプロジェクト セレクタのページで、Spark 向け Dataproc サーバーレスのワークロードまたはセッションに使用するプロジェクトを選択します。

    プロジェクト セレクタに移動

  2. Data Lineage API と Data Catalog API を有効にします。

    API を有効にする

必要なロール

Dataproc Serverless for Spark でデータリネージを使用するために必要な権限を取得するには、Dataproc クラスタの VM サービス アカウントに対して次の IAM ロールを付与するよう、管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

プロジェクト レベルでデータリネージを有効にする

データリネージはプロジェクト レベルで有効にできます。プロジェクト レベルで有効にすると、プロジェクトで実行する後続のバッチ ワークロードとインタラクティブ セッションで Spark リネージが有効になります。

プロジェクト レベルでデータリネージを有効にする方法

プロジェクト レベルでデータリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。

キー
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクト レベルでデータリネージを無効にできます。

Spark バッチ ワークロードのデータリネージを有効にする

バッチ ワークロードでデータリネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。

バッチ ワークロードの例

この例では、Spark リネージを有効にしてバッチ lineage-example.py ワークロードを送信します。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py は、BigQuery テーブルからデータを読み取り、出力を別の BigQuery テーブルに書き込みます。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

リネージグラフは Dataplex UI で表示できます。

Spark リネージグラフ

Spark インタラクティブ セッションのデータリネージを有効にする

Spark インタラクティブ セッションでデータリネージを有効にするには、セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを true に設定します。

インタラクティブ セッションの例

次の PySpark ノートブック コードは、限定公開の Google アクセス VPC リージョン サブネットで実行される Spark データリネージが有効になっている Dataproc Serverless インタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行し、出力を BigQuery テーブルに書き込む Spark Connect セッションを作成します。

from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"

# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("LINEAGE_BQ_TO_BQ")
     .dataprocConfig(session)
     .getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

データリネージ グラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション ペインに表示されている宛先テーブル名をクリックし、テーブルの詳細ペインでリネージタブを選択します。

Spark リネージグラフ

Dataplex でリネージグラフを表示する

リネージの可視化グラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。データリネージ情報は、Google Cloud コンソールのグラフの可視化で表示するか、Data Lineage API から JSON データとして取得します。

詳細については、システムでデータリネージを使用する Google Cloud をご覧ください。

次のステップ