在 Dataproc 中启用数据沿袭

本文档介绍了如何在项目集群级别为 Dataproc Spark 作业启用数据谱系

概览

数据沿袭Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

数据传承适用于所有 Dataproc Spark 作业(SparkR 除外),且需要使用 Dataproc Compute Engine 2.0.74 及更高版本和 2.1.22 及更高版本映像,并且支持 BigQuery 和 Cloud Storage 数据源。

在 Dataproc 集群中启用此功能后,Dataproc Spark 作业会捕获数据源流事件并将其发布到 Dataplex Data Lineage API。Dataproc 使用 OpenLineage Spark 插件,通过 OpenLineage 与 Data Lineage API 集成。

您可以通过以下方式通过 Dataplex 访问数据沿袭信息:

限制

数据沿袭不适用于 SparkR 或 Spark 流式作业。

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面上,选择包含您要跟踪其沿袭的 Dataproc 集群的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API 和 Data Catalog API。

    启用 API

所需的角色

如需获得在 Dataproc 中使用数据源流水线所需的权限,请让您的管理员为 Dataproc 集群 VM 服务账号授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在项目级启用数据沿袭

您可以在项目级启用数据沿袭。在项目上启用数据沿袭后,在创建的集群上运行的受支持 Spark 作业将启用数据沿袭。请注意,在项目级启用数据传承之前创建的现有集群上运行的作业不会启用数据传承。

如何在项目级启用数据沿袭

如需在项目级启用数据传承,请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 来在项目级停用数据传承。

在集群级启用数据沿袭

您可以在创建集群时启用数据传承,以便提交到集群的所有受支持的 Spark 作业都启用数据传承。

如何在集群级启用数据沿袭

如需在集群上启用数据传承,请创建 Dataproc 集群,并将 dataproc:dataproc.lineage.enabled 集群属性设置为 true

2.0 映像版本集群:数据传承需要 Dataproc 集群虚拟机访问 cloud-platform 范围。使用映像版本 2.1 及更高版本创建的 Dataproc 映像版本集群已启用 cloud-platform。如果您在创建集群时指定了 Dataproc 映像版本 2.0,请将范围设置为 cloud-platform

gcloud CLI 示例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

停用作业的数据沿袭

如果您在集群一级启用了数据源流,则可以在提交作业时传递值为空(“”)的 spark.extraListeners 属性,以停用特定作业的数据源流。

启用后,您便无法在集群上停用数据传承。如需消除所有集群作业的数据谱系,您可以重新创建集群,而不使用 dataproc:dataproc.lineage.enabled 属性。

提交 Spark 作业

当您在启用了数据传承功能创建的 Dataproc 集群上提交 Spark 作业时,Dataproc 会捕获数据传承信息并将其报告给 Data Lineage API。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 添加用于唯一标识作业的 spark.openlineage.namespacespark.openlineage.appName 属性是可选的。如果您不添加这些属性,Dataproc 会使用以下默认值:
    • spark.openlineage.namespace 的默认值:PROJECT_ID
    • spark.openlineage.appName 的默认值:spark.app.name

在 Dataplex 中查看沿袭图

沿袭可视化图表会显示项目资源与创建这些资源的流程之间的关系。您可以在 Google Cloud 控制台中以图表可视化形式查看数据传承信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索数据传承信息。

如需了解详情,请参阅在 Dataplex 界面中查看沿袭图

示例

以下 Spark 作业会从 BigQuery 表中读取数据,并将其写入另一个 BigQuery 表。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Spark 作业会在 Dataplex 界面中创建以下谱系图:

沿袭图示例

后续步骤