Habilitar el linaje de datos de Spark en Dataproc

En este documento se describe cómo habilitar el linaje de datos de tus tareas de Spark de Dataproc a nivel de proyecto o de clúster.

El linaje de los datos es una función de Dataplex Universal Catalog que te permite monitorizar cómo se mueven los datos por tus sistemas: de dónde proceden, a dónde se transfieren y qué transformaciones se les aplican.

El linaje de datos está disponible para todos los trabajos de Spark de Dataproc, excepto para los trabajos de SparkR y Spark Streaming, y es compatible con las fuentes de datos de BigQuery y Cloud Storage. Se incluye en Dataproc en Compute Engine 2.0.74+, 2.1.22+, 2.2.50 y versiones posteriores de la imagen.

Una vez que habilites la función en tu clúster de Dataproc, los trabajos de Spark de Dataproc capturarán eventos de linaje de datos y los publicarán en la API Data Lineage de Dataplex Universal Catalog. Dataproc se integra con la API Data Lineage a través de OpenLineage mediante el plugin OpenLineage Spark.

Puedes acceder a la información del linaje de datos a través de Dataplex Universal Catalog mediante lo siguiente:

Antes de empezar

  1. En la Google Cloud consola, en la página del selector de proyectos, selecciona el proyecto que contiene el clúster de Dataproc del que quieres hacer un seguimiento del linaje.

    Ir al selector de proyectos

  2. Habilita la API Data Lineage.

    Habilitar las APIs

Roles obligatorios

Si creas un clúster de Dataproc con la cuenta de servicio de VM predeterminada, tiene el rol Dataproc Worker, que habilita el linaje de datos. No es necesario que hagas nada más.

Sin embargo, si creas un clúster de Dataproc que usa una cuenta de servicio personalizada, para habilitar el linaje de datos en el clúster, debes asignar un rol obligatorio a la cuenta de servicio personalizada, tal como se explica en el siguiente párrafo.

Para obtener los permisos que necesitas para usar el linaje de datos con Dataproc, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos en la cuenta de servicio personalizada de tu clúster:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Habilitar el linaje de datos de Spark a nivel de proyecto

Puedes habilitar el linaje de datos de Spark a nivel de proyecto. Los trabajos de Spark compatibles que se ejecuten en clústeres creados después de que se haya habilitado el linaje de datos en un proyecto tendrán habilitado el linaje de datos. Ten en cuenta que los trabajos que se ejecuten en clústeres que se hayan creado antes de habilitar el linaje de datos a nivel de proyecto no tendrán habilitado el linaje de datos.

Habilitar el linaje de datos de Spark a nivel de proyecto

Para habilitar el linaje de datos de Spark a nivel de proyecto, define los siguientes metadatos de proyecto personalizados:

Clave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puedes inhabilitar el linaje de datos de Spark a nivel de proyecto definiendo los metadatos DATAPROC_LINEAGE_ENABLED en false.

Habilitar el linaje de datos de Spark a nivel de clúster

Puedes habilitar el linaje de datos de Spark al crear un clúster para que todas las tareas de Spark admitidas que se envíen al clúster tengan habilitado el linaje de datos.

Habilitar el linaje de datos de Spark a nivel de clúster

Para habilitar el linaje de datos de Spark en un clúster, crea un clúster de Dataproc con la propiedad de clúster dataproc:dataproc.lineage.enabled definida como true.

Clústeres con la versión de imagen 2.0: se requiere el cloud-platformámbito de acceso a las VMs del clúster de Dataproc para el linaje de datos de Spark. Los clústeres de la versión de imagen de Dataproc creados con la versión de imagen 2.1 y posteriores tienen cloud-platform habilitado. Si especifica la versión de imagen de Dataproc 2.0 al crear un clúster, defina el ámbito en cloud-platform.

Ejemplo de la CLI gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Inhabilitar el linaje de datos de Spark en una tarea

Si habilitas el linaje de datos de Spark a nivel de clúster, puedes inhabilitarlo en una tarea específica. Para ello, debes transferir la propiedad spark.extraListeners con un valor vacío ("") al enviar la tarea.

Una vez habilitada, no puede inhabilitar el linaje de datos de Spark en el clúster. Para eliminar el linaje de datos de Spark en todos los trabajos del clúster, puedes volver a crear el clúster sin la propiedad dataproc:dataproc.lineage.enabled.

Enviar una tarea de Spark

Cuando envías una tarea de Spark en un clúster de Dataproc que se ha creado con el linaje de datos de Spark habilitado, Dataproc captura y registra la información del linaje de datos en la API Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Notas:

  • Es opcional añadir las propiedades spark.openlineage.namespace y spark.openlineage.appName, que se usan para identificar de forma única el trabajo. Si no añade estas propiedades, Dataproc usará los siguientes valores predeterminados:
    • Valor predeterminado de spark.openlineage.namespace: PROJECT_ID
    • Valor predeterminado de spark.openlineage.appName: spark.app.name

Ver el linaje en Dataplex Universal Catalog

Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver información sobre el linaje de datos en la Google Cloud consola o recuperarla de la API Data Lineage en forma de datos JSON.

Código de ejemplo de PySpark:

La siguiente tarea de PySpark lee datos de una tabla pública de BigQuery y, a continuación, escribe la salida en una tabla nueva de un conjunto de datos de BigQuery. Utiliza un segmento de Cloud Storage para el almacenamiento temporal.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Haz las siguientes sustituciones:

  • BUCKET: nombre de un segmento de Cloud Storage.

  • PROJECT_ID, DATASET y TABLE: inserta el ID de tu proyecto, el nombre de un conjunto de datos de BigQuery y el nombre de una tabla que quieras crear en el conjunto de datos (la tabla no debe existir).

Puedes ver el gráfico de linaje en la interfaz de usuario de Dataplex Universal Catalog.

Gráfico de linaje de ejemplo

Siguientes pasos