将数据源流与 Dataproc Serverless 搭配使用

本文档介绍了如何在 Dataproc Serverless for Spark 批处理工作负载和 Interactive 会话中启用数据谱系,具体级别包括项目批处理工作负载Interactive 会话

概览

数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

Dataproc Serverless for Spark 工作负载和会话会捕获谱系事件,并将其发布到 Dataplex Data Lineage API。Dataproc Serverless for Spark 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。

您可以使用沿袭图Data Lineage API 通过 Dataplex 访问沿袭信息。如需了解详情,请参阅在 Dataplex 中查看沿袭图

适用范围、功能和限制

数据传承功能支持 BigQuery 和 Cloud Storage 数据源,适用于使用 Dataproc Serverless for Spark 运行时版本 1.11.22.2 运行的工作负载和会话,但存在以下例外情况和限制:

  • 数据沿袭不适用于 SparkR 或 Spark 流式处理工作负载或会话。

准备工作

  1. 在 Google Cloud 控制台中的项目选择器页面上,选择要用于 Dataproc Serverless for Spark 工作负载或会话的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API 和 Dataplex API。

    启用 API

所需的角色

如需获得在 Dataproc Serverless for Spark 中使用数据传承所需的权限,请让您的管理员为您授予 Dataproc 集群 VM 服务账号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在项目级层启用数据沿袭

您可以在项目级启用数据沿袭。在项目级启用后,您在项目中运行的所有后续批处理工作负载和交互式会话都将启用 Spark 谱系。

如何在项目级启用数据沿袭

如需在项目级启用数据传承,请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 来在项目级停用数据谱系。

为 Spark 批处理工作负载启用数据沿袭

您可以在提交批处理工作负载时将 spark.dataproc.lineage.enabled 属性设置为 true,以便为批处理工作负载启用数据传承。

批量工作负载示例

此示例会提交启用了 Spark 谱系的批处理 lineage-example.py 工作负载。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py 会从 BigQuery 表中读取数据,然后将输出写入另一个 BigQuery 表。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

您可以在 Dataplex 界面中查看谱系图。

Spark 沿袭图

为 Spark Interactive 会话启用数据沿袭

您可以在创建会话或会话模板时将 spark.dataproc.lineage.enabled 属性设置为 true,以便在 Spark Interactive 会话中启用数据传承。

交互式会话示例

以下 PySpark 笔记本代码会配置一个启用了 Spark 数据谱系的 Dataproc Serverless Interactive 会话,该会话在 Private Google Access VPC 区域子网上运行。然后,它会创建一个 Spark Connect 会话,对公共 BigQuery Shakespeare 数据集运行字数统计查询,然后将输出写入 BigQuery 表。

from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"

# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("LINEAGE_BQ_TO_BQ")
     .dataprocConfig(session)
     .getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

如需查看数据谱系图,请点击 BigQuery Explorer 页面导航窗格中列出的目标表名称,然后选择表详情窗格中的“谱系”标签页。

Spark 沿袭图

查看 Dataplex 中的沿袭

沿袭图表会显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中查看数据谱系信息,也可以将信息从 Data Lineage API 检索为 JSON 数据。

后续步骤