Apache Spark 用サーバーレスでデータ リネージを使用する

このドキュメントでは、Google Cloud Serverless for Apache Spark バッチ ワークロードとインタラクティブ セッションでデータリネージプロジェクトバッチ ワークロード、またはインタラクティブ セッション レベルで有効にする方法について説明します。

概要

データリネージは Dataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。

Google Cloud Apache Spark 用サーバーレスのワークロードとセッションは、リネージ イベントをキャプチャして、Dataplex Universal Catalog Data Lineage API にパブリッシュします。Apache Spark 用サーバーレスは、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。

Dataplex Universal Catalog でリネージ情報にアクセスするには、リネージグラフData Lineage API を使用します。詳細については、Dataplex Universal Catalog でリネージグラフを表示するをご覧ください。

対象範囲、機能、制限事項

BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、Apache Spark ランタイム バージョン用のサーバーレス 1.11.22.2 で実行されるワークロードとセッションで使用できますが、次の例外と制限があります。

  • データリネージは、SparkR または Spark ストリーミング ワークロードやセッションでは使用できません。

始める前に

  1. Google Cloud コンソールのプロジェクト セレクタのページで、Apache Spark 向けサーバーレスのワークロードまたはセッションに使用するプロジェクトを選択します。

    プロジェクト セレクタに移動

  2. データリネージ API を有効にします。

    API を有効にする

必要なロール

バッチ ワークロードでデフォルトの Apache Spark サービス アカウントのサーバーレスを使用している場合、データリネージを有効にする Dataproc Worker ロールが付与されます。追加のアクションは不要です。

ただし、バッチ ワークロードでカスタム サービス アカウントを使用してデータリネージを有効にする場合は、次の段落で説明するように、カスタム サービス アカウントに必要なロールを付与する必要があります。

Dataproc でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

プロジェクト レベルでデータリネージを有効にする

データリネージはプロジェクト レベルで有効にできます。プロジェクト レベルで有効にすると、プロジェクトで実行する後続のすべてのバッチ ワークロードとインタラクティブ セッションで Spark リネージが有効になります。

プロジェクト レベルでデータリネージを有効にする方法

プロジェクト レベルでデータリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。

キー
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクト レベルでデータリネージを無効にできます。

Spark バッチ ワークロードのデータリネージを有効にする

バッチ ワークロードのデータ リネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。

バッチ ワークロードの例

この例では、Spark リネージが有効になっているバッチ lineage-example.py ワークロードを送信します。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py は、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

次のように置き換えます。

  • REGION: ワークロードを実行するリージョンを選択します。

  • BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前。

  • PROJECT_IDDATASETTABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。

リネージグラフは、Dataplex Universal Catalog UI で表示できます。

Spark リネージグラフ

Spark インタラクティブ セッションのデータリネージを有効にする

Spark インタラクティブ セッションでデータ リネージを有効にするには、セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを true に設定します。

インタラクティブ セッションの例

次の PySpark ノートブック コードは、Spark データ リネージが有効になっている Apache Spark 用サーバーレスのインタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行し、出力を既存の BigQuery データセットの新しいテーブルに書き込む Spark Connect セッションを作成します。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

次のように置き換えます。

  • PROJECT_IDDATASETTABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。

データ リネージ グラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。

Spark リネージグラフ

Dataplex Universal Catalog でリネージを表示する

リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。 Google Cloud コンソールでデータリネージ情報を表示するか、Data Lineage API から JSON データとして取得できます。

次のステップ