Usa el linaje de datos con Dataproc Serverless

En este documento, se describe cómo habilitar el linaje de datos para tus cargas de trabajo por lotes de Dataproc Serverless para Spark, ya sea a nivel del proyecto o de la carga de trabajo por lotes.

Descripción general

El linaje de datos es una función de Dataplex que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.

Las cargas de trabajo de Dataproc Serverless para Spark capturan eventos de linaje y los publican en la API de Data Lineage de Dataplex. Dataproc Serverless para Spark se integra a la API de Data Lineage a través de OpenLineage con el complemento OpenLineage para Spark.

Puedes acceder a la información de linaje a través de Dataplex con los gráficos de visualización de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Dataplex.

Disponibilidad, funciones y limitaciones

El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo que se ejecutan con las versiones del entorno de ejecución de Dataproc sin servidor para Spark 1.1.50+, 1.2.29+ y 2.2.29+, con las siguientes excepciones y limitaciones:

  • El linaje de datos no está disponible para las cargas de trabajo de transmisión de SparkR ni de Spark.

Antes de comenzar

  1. En la página del selector de proyectos de la consola de Google Cloud, selecciona el proyecto que usarás para tus cargas de trabajo de Dataproc Serverless para Spark.

    Ir al selector de proyectos

  2. Habilita las APIs de Data Lineage y Data Catalog.

    Habilitar las API

Roles obligatorios

Para obtener los permisos que necesitas para usar el linaje de datos en Dataproc Serverless para Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM en la cuenta de servicio de la VM del clúster de Dataproc:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Habilita el linaje de datos a nivel del proyecto

Puedes habilitar el linaje de datos a nivel del proyecto. Cuando se habilita a nivel del proyecto, todas las cargas de trabajo por lotes posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.

Cómo habilitar el linaje de datos a nivel del proyecto

Para habilitar el linaje de datos a nivel del proyecto, configura los siguientes metadatos personalizados del proyecto.

Clave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puedes inhabilitar el linaje de datos a nivel del proyecto si estableces los metadatos DATAPROC_LINEAGE_ENABLED en false.

Habilita el linaje de datos para una carga de trabajo por lotes de Spark

Para habilitar el linaje de datos en una carga de trabajo por lotes, configura la propiedad spark.dataproc.lineage.enabled como true cuando envíes la carga de trabajo.

Ejemplo de la CLI de gcloud:

gcloud dataproc batches submit pyspark FILENAME.py
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

Visualiza gráficos de linaje en Dataplex

Un gráfico de visualización de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en una visualización de gráficos en la consola de Google Cloud o recuperar la información de la API de Data Lineage como datos JSON.

Para obtener más información, consulta Usa el linaje de datos con sistemas de Google Cloud .

Ejemplo:

La siguiente carga de trabajo de Spark lee datos de una tabla de BigQuery y, luego, escribe el resultado en una tabla de BigQuery diferente.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Esta carga de trabajo de Spark crea el siguiente gráfico de linaje en la IU de Dataplex:

Gráfico de linaje de muestra

¿Qué sigue?