数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。
数据沿袭适用于所有 Dataproc Spark 作业 (SparkR 除外)和 Dataproc Compute Engine 2.0.74+ 和 2.1.22+ 图片。谱系适用于 BigQuery 和 Cloud Storage 数据源。
在 Dataproc 集群中启用此功能后,Dataproc Spark 作业会捕获谱系事件并将其发布到 Dataplex Data Lineage API。Dataproc 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。
您可以使用 以下:
限制
以下内容不支持谱系:
- BigQuery 连接器版本 2(Spark 的数据源 API 版本 2)
- Spark 流式工作负载
准备工作
在 Google Cloud 控制台的项目选择器页面上,选择项目 (包含您要跟踪的 Dataproc 集群) 沿袭。
启用 Data Lineage API 和 Data Catalog API。
所需的角色
如需获得在 Dataproc 中使用数据传承所需的权限,请让您的管理员为 Dataproc 集群虚拟机服务账号授予以下 IAM 角色:
-
在 Data Catalog 中查看沿袭可视化或使用 Data Lineage API:
Data Lineage Viewer (
roles/datalineage.viewer
) -
使用 API 手动生成谱系:
Data Lineage Events Producer (
roles/datalineage.producer
) -
使用 API 修改沿袭:
数据沿袭编辑器 (
roles/datalineage.editor
) -
对沿袭执行所有操作:
数据沿袭管理员 (
roles/datalineage.admin
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
在 Dataproc 中启用数据沿袭
在集群级别启用沿袭, 集群将沿袭信息报告给 Data Lineage API。
创建 Dataproc 集群
创建 Dataproc 集群
并将 dataproc:dataproc.lineage.enabled
属性设为 true
。
gcloud dataproc clusters create CLUSTER_NAME \
--region REGION \
--zone ZONE \
--project PROJECT_ID \
--properties 'dataproc:dataproc.lineage.enabled=true' \
--scopes https://www.googleapis.com/auth/cloud-platform
提交 Spark 作业
在您提交 Spark 作业时 在启用了沿袭创建的 Dataproc 集群上, Dataproc 会捕获沿袭信息并将其报告给 Data Lineage API。
gcloud dataproc jobs submit spark \
--project PROJECT_ID \
--cluster=CLUSTER_NAME \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME
属性 spark.openlineage.namespace
和 spark.openlineage.appName
是可选属性,用于唯一标识作业。如果您没有通过这些
属性,Dataproc 将使用以下默认值:
spark.openlineage.namespace
的默认值:PROJECT_IDspark.openlineage.appName
的默认值:spark.app.name
在 Dataplex 中查看沿袭图
沿袭可视化图表会显示项目资源与创建这些资源的进程之间的关系。您可以在 Google Cloud 控制台中以图表可视化形式查看数据沿袭信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索数据沿袭信息。
如需了解详情,请参阅在 Dataplex 界面中查看沿袭图。
示例
请考虑以下从 BigQuery 表读取数据并将数据写入另一个 BigQuery 表的 Spark 作业:
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
此 Spark 作业会在 Dataplex 界面中创建以下沿袭图:
在 Dataproc 中停用数据沿袭
在创建集群时启用分级后,
无法在集群级别停用沿袭。如需在 Dataproc 集群中停用谱系,请在不使用 dataproc:dataproc.lineage.enabled
属性的情况下重新创建集群。
如需为使用以下工具创建的集群上的特定作业停用沿袭,请使用
沿袭,则必须传递为空的 spark.extraListeners
属性
值。
后续步骤
- 详细了解数据沿袭。