本文档介绍了如何在项目级别或批处理工作负载级别为 Dataproc Serverless for Spark 批处理工作负载启用数据传承。
概览
数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。
Dataproc Serverless for Spark 工作负载会捕获谱系事件并将其发布到 Dataplex Data Lineage API。Dataproc Serverless for Spark 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。
您可以使用沿袭可视化图表和 Data Lineage API 通过 Dataplex 访问沿袭信息。如需了解详情,请参阅在 Dataplex 中查看沿袭图。
适用范围、功能和限制
数据传承功能支持 BigQuery 和 Cloud Storage 数据源,适用于使用 Dataproc Serverless for Spark 运行时版本
1.1.50+
、1.2.29+
和 2.2.29+
运行的工作负载,但存在以下例外情况和限制:
- 数据沿袭不适用于 SparkR 或 Spark 流式工作负载。
准备工作
在 Google Cloud 控制台中的项目选择器页面上,选择要用于 Dataproc Serverless for Spark 工作负载的项目。
启用 Data Lineage API 和 Data Catalog API。
所需的角色
如需获得在 Dataproc Serverless for Spark 中使用数据传承所需的权限,请让您的管理员为您授予 Dataproc 集群 VM 服务账号的以下 IAM 角色:
-
在 Data Catalog 中查看谱系可视化图表或使用 Data Lineage API:
Data Lineage Viewer (
roles/datalineage.viewer
) -
使用 API 手动生成谱系:
Data Lineage Events Producer (
roles/datalineage.producer
) -
使用 API 修改谱系:Data Lineage Editor (
roles/datalineage.editor
) -
对数据沿袭执行所有操作:Data Lineage Administrator (
roles/datalineage.admin
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
在项目级启用数据沿袭
您可以在项目级启用数据沿袭。在项目级启用后,您在项目中运行的所有后续批处理工作负载都将启用 Spark 谱系。
如何在项目级启用数据沿袭
如需在项目级启用数据传承,请设置以下自定义项目元数据。
键 | 值 |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
您可以通过将 DATAPROC_LINEAGE_ENABLED
元数据设置为 false
来在项目级停用数据传承。
为 Spark 批处理工作负载启用数据沿袭
您可以在提交批处理工作负载时将 spark.dataproc.lineage.enabled
属性设置为 true
,以便为批处理工作负载启用数据传承。
gcloud CLI 示例:
gcloud dataproc batches submit pyspark FILENAME.py --region=REGION \ --properties=spark.dataproc.lineage.enabled=true
在 Dataplex 中查看沿袭图
沿袭可视化图表会显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中的图表可视化中查看数据传承信息,也可以将信息从 Data Lineage API 检索为 JSON 数据。
如需了解详情,请参阅在 Google Cloud 系统中使用数据谱系 。
示例:
以下 Spark 工作负载会从 BigQuery 表中读取数据,然后将输出写入另一个 BigQuery 表。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
此 Spark 工作负载会在 Dataplex 界面中创建以下谱系图:
后续步骤
- 详细了解数据沿袭。