本文档介绍了如何在项目或集群级别为 Dataproc Spark 作业启用数据谱系。
概览
数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。
数据传承适用于所有 Dataproc Spark 作业(SparkR 除外),且需要使用 Dataproc Compute Engine 2.0.74 及更高版本和 2.1.22 及更高版本映像,并且支持 BigQuery 和 Cloud Storage 数据源。
在 Dataproc 集群中启用此功能后,Dataproc Spark 作业会捕获数据源流事件并将其发布到 Dataplex Data Lineage API。Dataproc 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。
您可以通过以下方式通过 Dataplex 访问数据沿袭信息:
限制
数据沿袭不适用于 SparkR 或 Spark 流式作业。
准备工作
在 Google Cloud 控制台的项目选择器页面上,选择包含您要跟踪其沿袭的 Dataproc 集群的项目。
启用 Data Lineage API 和 Data Catalog API。
所需的角色
如需获得在 Dataproc 中使用数据源流水线所需的权限,请让您的管理员为 Dataproc 集群 VM 服务账号授予以下 IAM 角色:
-
在 Data Catalog 中查看数据源流水图,或使用 Data Lineage API:
Data Lineage Viewer (
roles/datalineage.viewer
) -
使用 API 手动生成数据源流:
Data Lineage Events Producer (
roles/datalineage.producer
) -
使用 API 修改谱系:Data Lineage Editor (
roles/datalineage.editor
) -
对数据沿袭执行所有操作:Data Lineage Administrator (
roles/datalineage.admin
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
在项目级启用数据沿袭
您可以在项目级启用数据沿袭。在项目上启用数据沿袭后,在创建的集群上运行的受支持 Spark 作业将启用数据沿袭。请注意,在项目级启用数据传承之前创建的现有集群上运行的作业不会启用数据传承。
如何在项目级启用数据沿袭
如需在项目级启用数据传承,请设置以下自定义项目元数据:
键 | 值 |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
您可以通过将 DATAPROC_LINEAGE_ENABLED
元数据设置为 false
来在项目级停用数据传承。
在集群级启用数据沿袭
您可以在创建集群时启用数据传承,以便提交到集群的所有受支持的 Spark 作业都启用数据传承。
如何在集群级启用数据沿袭
如需在集群上启用数据传承,请创建 Dataproc 集群,并将 dataproc:dataproc.lineage.enabled
集群属性设置为 true
。
2.0 映像版本集群:数据传承需要 Dataproc 集群虚拟机访问 cloud-platform
范围。使用映像版本 2.1 及更高版本创建的 Dataproc 映像版本集群已启用 cloud-platform
。如果您在创建集群时指定了 Dataproc 映像版本 2.0
,请将范围设置为 cloud-platform
。
gcloud CLI 示例:
gcloud dataproc clusters create CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--properties 'dataproc:dataproc.lineage.enabled=true'
停用作业的数据沿袭
如果您在集群一级启用了数据源流,则可以在提交作业时传递值为空(“”)的 spark.extraListeners
属性,以停用特定作业的数据源流。
启用后,您便无法在集群上停用数据传承。如需消除所有集群作业的数据谱系,您可以重新创建集群,而不使用 dataproc:dataproc.lineage.enabled
属性。
提交 Spark 作业
当您在启用了数据传承功能创建的 Dataproc 集群上提交 Spark 作业时,Dataproc 会捕获数据传承信息并将其报告给 Data Lineage API。
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME
注意:
- 添加用于唯一标识作业的
spark.openlineage.namespace
和spark.openlineage.appName
属性是可选的。如果您不添加这些属性,Dataproc 会使用以下默认值:spark.openlineage.namespace
的默认值:PROJECT_IDspark.openlineage.appName
的默认值:spark.app.name
在 Dataplex 中查看沿袭图
沿袭可视化图表会显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中以图表可视化形式查看数据传承信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索数据传承信息。
如需了解详情,请参阅在 Dataplex 界面中查看沿袭图。
示例:
以下 Spark 作业会从 BigQuery 表中读取数据,并将其写入另一个 BigQuery 表。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Spark 作业会在 Dataplex 界面中创建以下谱系图:
后续步骤
- 详细了解数据沿袭。