将数据沿袭与 Serverless for Apache Spark 搭配使用

本文档介绍了如何在项目批处理工作负载交互式会话级层为Google Cloud Serverless for Apache Spark 批处理工作负载和交互式会话启用数据沿袭

概览

数据沿袭是 Dataplex Universal Catalog 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

Google Cloud Serverless for Apache Spark 工作负载和会话会捕获沿袭事件并将其发布到 Dataplex Universal Catalog Data Lineage API。Serverless for Apache Spark 使用 OpenLineage Spark 插件通过 OpenLineage 与 Data Lineage API 集成。

您可以通过 Dataplex Universal Catalog 使用沿袭图Data Lineage API 访问沿袭信息。如需了解详情,请参阅在 Dataplex Universal Catalog 中查看沿袭图

适用范围、功能和限制

数据沿袭支持 BigQuery 和 Cloud Storage 数据源,适用于使用 Serverless for Apache Spark 运行时版本 1.11.22.2 运行的工作负载和会话,但存在以下例外情况和限制:

  • 数据沿袭不适用于 SparkR 或 Spark 流式工作负载或会话。

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面上,选择要用于 Apache Spark 无服务器工作负载或会话的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API。

    启用 API

所需的角色

如果您的批处理工作负载使用默认的 Serverless for Apache Spark 服务账号,该账号将具有 Dataproc Worker 角色,从而启用数据沿袭。除此之外没有其他要求。

不过,如果您的批处理工作负载使用自定义服务账号来启用数据沿袭,则必须按照下段所述向该自定义服务账号授予所需角色。

如需获得将数据沿袭与 Dataproc 搭配使用所需的权限,请让您的管理员为您授予批处理工作负载自定义服务账号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在项目级层启用数据沿袭

您可以在项目级层启用数据沿袭。在项目级启用后,您在该项目中运行的所有后续批处理工作负载和交互式会话都将启用 Spark 沿袭。

如何在项目级层启用数据沿袭

如需在项目级层启用数据沿袭,请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 在项目级层停用数据沿袭。

为 Spark 批处理工作负载启用数据沿袭

您可以在提交批处理工作负载时将 spark.dataproc.lineage.enabled 属性设置为 true,以在该工作负载上启用数据沿袭。

批量工作负载示例

此示例提交了一个启用了 Spark 沿袭的批处理 lineage-example.py 工作负载。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py 从公共 BigQuery 表中读取数据,然后将输出写入现有 BigQuery 数据集中的新表。它使用 Cloud Storage 存储桶进行临时存储。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

进行以下替换:

  • REGION:选择运行工作负载的区域

  • BUCKET:用于存储依赖项的现有 Cloud Storage 存储桶的名称。

  • PROJECT_IDDATASETTABLE:插入您的项目 ID、现有 BigQuery 数据集的名称,以及要在该数据集中创建的新表的名称(该表不得存在)。

您可以在 Dataplex Universal Catalog 界面中查看沿袭图。

Spark 沿袭图

为 Spark 交互式会话启用数据沿袭

您可以在创建会话或会话模板时将 spark.dataproc.lineage.enabled 属性设置为 true,以在 Spark 交互式会话中启用数据沿袭。

交互式会话示例

以下 PySpark 笔记本代码配置了一个启用了 Spark 数据沿袭的 Serverless for Apache Spark 交互式会话。然后,它会创建一个 Spark Connect 会话,该会话对公共 BigQuery Shakespeare 数据集运行字数统计查询,然后将输出写入现有 BigQuery 数据集中的新表。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

进行以下替换:

  • PROJECT_IDDATASETTABLE:插入您的项目 ID、现有 BigQuery 数据集的名称,以及要在该数据集中创建的新表的名称(该表不得存在)。

如需查看数据沿袭图,请在 BigQuery 探索器页面的导航窗格中点击列出的目标表名称,然后选择表详情窗格中的“沿袭”标签页。

Spark 沿袭图

在 Dataplex Universal Catalog 中查看沿袭

沿袭图显示了项目资源与创建这些资源的进程之间的关系。您可以在 Google Cloud 控制台中查看数据沿袭信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索该信息。

后续步骤