搭配使用資料沿襲和 Serverless for Apache Spark

本文說明如何在Google Cloud 無伺服器 Apache Spark 批次工作負載和互動式工作階段中,啟用資料沿襲功能,啟用層級包括專案批次工作負載互動式工作階段

總覽

資料歷程是 Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。

Google Cloud Serverless for Apache Spark 工作負載和工作階段會擷取沿襲事件,並發布至 Dataplex Universal Catalog Data Lineage API。Serverless for Apache Spark 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式

您可以透過 Dataplex Universal Catalog 存取歷程資訊,方法是使用歷程圖Data Lineage API。詳情請參閱「在 Dataplex Universal Catalog 中查看歷程圖」。

適用情形、功能和限制

資料沿襲支援 BigQuery 和 Cloud Storage 資料來源,適用於使用 Serverless for Apache Spark 執行階段版本 1.11.22.2 執行的工作負載和工作階段,但有以下例外狀況和限制:

  • SparkR、Spark 串流工作負載或工作階段不支援資料歷程。

事前準備

  1. 在 Google Cloud 控制台的專案選取器頁面中,選取要用於 Serverless for Apache Spark 工作負載或工作階段的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

必要的角色

如果批次工作負載使用預設的 Serverless for Apache Spark 服務帳戶,則該帳戶會具備 Dataproc Worker 角色,可啟用資料沿襲功能。你不需要採取其他動作。

不過,如果批次工作負載使用自訂服務帳戶啟用資料沿襲,您必須為自訂服務帳戶授予必要角色,如下一段所述。

如要取得使用 Dataproc 資料沿襲所需的權限,請要求管理員在批次工作負載自訂服務帳戶中,授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

在專案層級啟用資料歷程

您可以在專案層級啟用資料歷程。在專案層級啟用後,您在專案中執行的所有後續批次工作負載和互動式工作階段,都會啟用 Spark 沿襲。

如何在專案層級啟用資料歷程

如要在專案層級啟用資料沿襲,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false,在專案層級停用資料沿襲。

為 Spark 批次工作負載啟用資料歷程

提交批次工作負載時,將 spark.dataproc.lineage.enabled 屬性設為 true,即可啟用資料沿襲功能。

批次工作負載範例

這個範例會提交批次 lineage-example.py 工作負載,並啟用 Spark 沿襲。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py 會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集中的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

請將下列項目改為對應的值:

  • REGION:選取要執行工作負載的區域

  • BUCKET:現有 Cloud Storage bucket 的名稱,用於儲存依附元件。

  • PROJECT_IDDATASETTABLE:插入您的專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)。

您可以在 Dataplex Universal Catalog UI 中查看歷程圖。

Spark 歷程圖

為 Spark 互動工作階段啟用資料歷程

如要在 Spark 互動式工作階段中啟用資料沿襲,請在建立工作階段或工作階段範本時,將 spark.dataproc.lineage.enabled 屬性設為 true

互動式工作階段範例

下列 PySpark 筆記本程式碼會設定 Serverless for Apache Spark 互動式工作階段,並啟用 Spark 資料沿襲。接著,系統會建立 Spark Connect 工作階段,對公開的 BigQuery 莎士比亞資料集執行字數統計查詢,然後將輸出內容寫入現有 BigQuery 資料集的新資料表。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

請將下列項目改為對應的值:

  • PROJECT_IDDATASETTABLE:插入您的專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)。

如要查看資料沿襲圖,請在 BigQuery「Explorer」頁面的導覽窗格中,按一下目的地資料表名稱,然後選取資料表詳細資料窗格中的「沿襲」分頁標籤。

Spark 歷程圖

在 Dataplex Universal Catalog 中查看歷程

歷程圖會顯示專案資源之間的關係,以及建立這些資源的程序。您可以在 Google Cloud 控制台中查看資料沿襲資訊,也可以從 Data Lineage API 擷取 JSON 資料格式的資訊。

後續步驟