在 Dataproc 中使用数据沿袭

数据沿袭是 Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

数据沿袭适用于所有 Dataproc Spark 作业 (SparkR 除外)和 Dataproc Compute Engine 2.0.74+ 和 2.1.22+ 图片。谱系适用于 BigQuery 和 Cloud Storage 数据源。

在 Dataproc 集群中启用此功能后,Dataproc Spark 作业会捕获谱系事件并将其发布到 Dataplex Data Lineage API。Dataproc 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。

您可以使用 以下:

限制

以下内容不支持谱系:

  • BigQuery 连接器版本 2(Spark 的数据源 API 版本 2)
  • Spark 流式工作负载

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面上,选择项目 (包含您要跟踪的 Dataproc 集群) 沿袭。

    转到“项目选择器”

  2. 启用 Data Lineage API 和 Data Catalog API。

    启用 API

所需的角色

如需获得在 Dataproc 中使用数据传承所需的权限,请让您的管理员为 Dataproc 集群虚拟机服务账号授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在 Dataproc 中启用数据沿袭

在集群级别启用沿袭, 集群将沿袭信息报告给 Data Lineage API。

创建 Dataproc 集群

创建 Dataproc 集群 并将 dataproc:dataproc.lineage.enabled 属性设为 true

gcloud dataproc clusters create CLUSTER_NAME \
--region REGION \
--zone ZONE \
--project PROJECT_ID \
--properties 'dataproc:dataproc.lineage.enabled=true' \
--scopes https://www.googleapis.com/auth/cloud-platform

提交 Spark 作业

在您提交 Spark 作业时 在启用了沿袭创建的 Dataproc 集群上, Dataproc 会捕获沿袭信息并将其报告给 Data Lineage API。

gcloud dataproc jobs submit spark \
--project PROJECT_ID \
--cluster=CLUSTER_NAME \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

属性 spark.openlineage.namespacespark.openlineage.appName 是可选属性,用于唯一标识作业。如果您没有通过这些 属性,Dataproc 将使用以下默认值:

  • spark.openlineage.namespace 的默认值:PROJECT_ID
  • spark.openlineage.appName 的默认值:spark.app.name

在 Dataplex 中查看沿袭图

沿袭可视化图表会显示项目资源与创建这些资源的进程之间的关系。您可以在 Google Cloud 控制台中以图表可视化形式查看数据沿袭信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索数据沿袭信息。

如需了解详情,请参阅在 Dataplex 界面中查看沿袭图

示例

请考虑以下从 BigQuery 表读取数据并将数据写入另一个 BigQuery 表的 Spark 作业:

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

此 Spark 作业会在 Dataplex 界面中创建以下沿袭图:

示例沿袭图

在 Dataproc 中停用数据沿袭

创建集群时启用分级后, 无法在集群级别停用沿袭。如需在 Dataproc 集群中停用谱系,请在不使用 dataproc:dataproc.lineage.enabled 属性的情况下重新创建集群。

如需为使用以下工具创建的集群上的特定作业停用沿袭,请使用 沿袭,则必须传递为空的 spark.extraListeners 属性 值。

后续步骤