将数据源流与 Dataproc Serverless 搭配使用

本文档介绍了如何在项目级别或批处理工作负载级别为 Dataproc Serverless for Spark 批处理工作负载启用数据传承

概览

数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

Dataproc Serverless for Spark 工作负载会捕获谱系事件并将其发布到 Dataplex Data Lineage API。Dataproc Serverless for Spark 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。

您可以使用沿袭可视化图表Data Lineage API 通过 Dataplex 访问沿袭信息。如需了解详情,请参阅在 Dataplex 中查看沿袭图

适用范围、功能和限制

数据传承功能支持 BigQuery 和 Cloud Storage 数据源,适用于使用 Dataproc Serverless for Spark 运行时版本 1.1.50+1.2.29+2.2.29+ 运行的工作负载,但存在以下例外情况和限制:

  • 数据沿袭不适用于 SparkR 或 Spark 流式工作负载。

准备工作

  1. 在 Google Cloud 控制台中的项目选择器页面上,选择要用于 Dataproc Serverless for Spark 工作负载的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API 和 Data Catalog API。

    启用 API

所需的角色

如需获得在 Dataproc Serverless for Spark 中使用数据传承所需的权限,请让您的管理员为您授予 Dataproc 集群 VM 服务账号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在项目级启用数据沿袭

您可以在项目级启用数据沿袭。在项目级启用后,您在项目中运行的所有后续批处理工作负载都将启用 Spark 谱系。

如何在项目级启用数据沿袭

如需在项目级启用数据传承,请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 来在项目级停用数据传承。

为 Spark 批处理工作负载启用数据沿袭

您可以在提交批处理工作负载时将 spark.dataproc.lineage.enabled 属性设置为 true,以便为批处理工作负载启用数据传承。

gcloud CLI 示例

gcloud dataproc batches submit pyspark FILENAME.py
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

在 Dataplex 中查看沿袭图

沿袭可视化图表会显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中的图表可视化中查看数据传承信息,也可以将信息从 Data Lineage API 检索为 JSON 数据。

如需了解详情,请参阅在 Google Cloud 系统中使用数据谱系

示例

以下 Spark 工作负载会从 BigQuery 表中读取数据,然后将输出写入另一个 BigQuery 表。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

此 Spark 工作负载会在 Dataplex 界面中创建以下谱系图:

沿袭图示例

后续步骤