在 Dataproc 中使用数据沿袭

数据沿袭是一项 Dataplex 功能,可让您跟踪数据在系统中的移动方式:数据来自何处、被传递到何处以及已对这些数据应用了哪些转换。

数据沿袭适用于除 SparkR 之外的所有 Dataproc Spark 作业,以及 Dataproc Compute Engine 2.0.74+ 映像和 2.1.22+ 映像。沿袭适用于 BigQuery 和 Cloud Storage 数据源。

在 Dataproc 集群中启用该功能后,Dataproc Spark 作业会捕获沿袭事件并将其发布到 Dataplex Data Lineage API。Dataproc 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。

您可以通过以下方式通过 Dataplex 访问沿袭信息:

限制

以下资源不支持沿袭:

  • BigQuery 连接器版本 2(数据源 API 版本 2,Spark)
  • Spark 流式工作负载

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面上,选择包含要跟踪其沿袭的 Dataproc 集群的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API 和 Data Catalog API。

    启用 API

所需的角色

如需获取在 Dataproc 中使用数据沿袭所需的权限,请让管理员向您授予对 Dataproc 集群虚拟机服务帐号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在 Dataproc 中启用数据沿袭

在集群级别启用沿袭,以便集群中所有提交的 Spark 作业向 Data Lineage API 报告沿袭信息。

创建 Dataproc 集群

创建 Dataproc 集群,并将属性 dataproc:dataproc.lineage.enabled 设置为 true

gcloud dataproc clusters create CLUSTER_NAME \
--region REGION \
--zone ZONE \
--project PROJECT_ID \
--properties 'dataproc:dataproc.lineage.enabled=true' \
--scopes https://www.googleapis.com/auth/cloud-platform

提交 Spark 作业

如果您在启用了沿袭的 Dataproc 集群上提交 Spark 作业,Dataproc 会捕获沿袭信息并将其报告给 Data Lineage API。

gcloud dataproc jobs submit spark \
--project PROJECT_ID \
--cluster=CLUSTER_NAME \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

spark.openlineage.namespacespark.openlineage.appName 属性是可选的,用于对作业进行唯一标识。如果您未传递这些属性,Dataproc 将使用以下默认值:

  • spark.openlineage.namespace 的默认值:PROJECT_ID
  • spark.openlineage.appName 的默认值:spark.app.name

在 Dataplex 中查看沿袭图

沿袭可视化图可显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中以图表可视化的形式查看数据沿袭信息,也可以以 JSON 数据的形式从 Data Lineage API 检索此信息。

如需了解详情,请参阅在 Dataplex 界面中查看沿袭图

示例

请考虑以下 Spark 作业,该作业从 BigQuery 表中读取数据,并向另一个 BigQuery 表写入数据:

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

此 Spark 作业会在 Dataplex 界面中创建以下沿袭图:

沿袭图示例

在 Dataproc 中停用数据沿袭

创建集群时启用沿袭后,便无法在集群级层停用沿袭。如需在 Dataproc 集群中停用沿袭,请重新创建不带 dataproc:dataproc.lineage.enabled 属性的集群。

如需为在启用沿袭的情况下创建的集群上的特定作业停用沿袭,您必须在提交作业时传递 spark.extraListeners 属性为空值。

后续步骤