Usa el linaje de datos con Serverless para Apache Spark

En este documento, se describe cómo habilitar el linaje de datos en las cargas de trabajo por lotes y las sesiones interactivas deGoogle Cloud Serverless para Apache Spark a nivel del proyecto, la carga de trabajo por lotes o la sesión interactiva.

Descripción general

El linaje de datos es una función de Dataplex Universal Catalog que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.

Google Cloud Serverless para cargas de trabajo y sesiones de Apache Spark captura eventos de linaje y los publica en la API de Data Lineage de Dataplex Universal Catalog. Serverless for Apache Spark se integra con la API de Data Lineage a través de OpenLineage, con el complemento de OpenLineage Spark.

Puedes acceder a la información del linaje a través de Dataplex Universal Catalog, con gráficos de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Dataplex Universal Catalog.

Disponibilidad, capacidades y limitaciones

El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con las versiones de tiempo de ejecución de Serverless for Apache Spark 1.1, 1.2 y 2.2, con las siguientes excepciones y limitaciones:

  • El linaje de datos no está disponible para las cargas de trabajo ni las sesiones de SparkR o Spark Streaming.

Antes de comenzar

  1. En la página del selector de proyectos de la consola de Google Cloud , selecciona el proyecto que deseas usar para tus cargas de trabajo o sesiones de Serverless para Apache Spark.

    Ir al selector de proyectos

  2. Habilita la API de Data Lineage.

    Habilitar las API

Roles requeridos

Si tu carga de trabajo por lotes usa la cuenta de servicio predeterminada de Serverless para Apache Spark, tiene el rol de Dataproc Worker, que habilita el linaje de datos. No es necesario realizar ninguna otra acción.

Sin embargo, si tu carga de trabajo por lotes usa una cuenta de servicio personalizada para habilitar el linaje de datos, debes otorgar un rol obligatorio a la cuenta de servicio personalizada, como se explica en el siguiente párrafo.

Para obtener los permisos que necesitas para usar el linaje de datos con Dataproc, pídele a tu administrador que te otorgue los siguientes roles de IAM en la cuenta de servicio personalizada de tu carga de trabajo por lotes:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Habilita el linaje de datos a nivel del proyecto

Puedes habilitar el linaje de datos a nivel del proyecto. Cuando se habilita a nivel del proyecto, todas las cargas de trabajo por lotes y las sesiones interactivas posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.

Cómo habilitar el linaje de datos a nivel del proyecto

Para habilitar el linaje de datos a nivel del proyecto, establece los siguientes metadatos personalizados del proyecto.

Clave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puedes inhabilitar el linaje de datos a nivel del proyecto si estableces los metadatos DATAPROC_LINEAGE_ENABLED en false.

Habilita el linaje de datos para una carga de trabajo por lotes de Spark

Puedes habilitar el linaje de datos en una carga de trabajo por lotes configurando la propiedad spark.dataproc.lineage.enabled como true cuando envíes la carga de trabajo.

Ejemplo de carga de trabajo por lotes

En este ejemplo, se envía una carga de trabajo de lineage-example.py por lotes con el linaje de Spark habilitado.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py lee datos de una tabla pública de BigQuery y, luego, escribe el resultado en una tabla nueva de un conjunto de datos existente de BigQuery. Usa un bucket de Cloud Storage para el almacenamiento temporal.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Realiza los siguientes reemplazos:

  • REGION: Selecciona una región para ejecutar tu carga de trabajo.

  • BUCKET: Es el nombre de un bucket de Cloud Storage existente en el que se almacenarán las dependencias.

  • PROJECT_ID, DATASET y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir).

Puedes ver el gráfico de linaje en la IU de Dataplex Universal Catalog.

Gráfico de linaje de Spark

Habilita el linaje de datos para una sesión interactiva de Spark

Puedes habilitar el linaje de datos en una sesión interactiva de Spark si configuras la propiedad spark.dataproc.lineage.enabled en true cuando crees la sesión o la plantilla de sesión.

Ejemplo de sesión interactiva

El siguiente código de notebook de PySpark configura una sesión interactiva de Serverless for Apache Spark con el linaje de datos de Spark habilitado. Luego, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos públicos de Shakespeare de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos existente de BigQuery.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Realiza los siguientes reemplazos:

  • PROJECT_ID, DATASET y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir).

Para ver el gráfico de linaje de datos, haz clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, luego, selecciona la pestaña Linaje en el panel de detalles de la tabla.

Gráfico de linaje de Spark

Consulta el linaje en Dataplex Universal Catalog

Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la consola de Google Cloud o recuperar la información de la API de Data Lineage como datos JSON.

¿Qué sigue?