En este documento, se describe cómo habilitar el linaje de datos en las cargas de trabajo por lotes y las sesiones interactivas deGoogle Cloud Serverless para Apache Spark a nivel del proyecto, la carga de trabajo por lotes o la sesión interactiva.
Descripción general
El linaje de datos es una función de Dataplex Universal Catalog que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.
Google Cloud Serverless para cargas de trabajo y sesiones de Apache Spark captura eventos de linaje y los publica en la API de Data Lineage de Dataplex Universal Catalog. Serverless for Apache Spark se integra con la API de Data Lineage a través de OpenLineage, con el complemento de OpenLineage Spark.
Puedes acceder a la información del linaje a través de Dataplex Universal Catalog, con gráficos de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Dataplex Universal Catalog.
Disponibilidad, capacidades y limitaciones
El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con las versiones de tiempo de ejecución de Serverless for Apache Spark 1.1
, 1.2
y 2.2
, con las siguientes excepciones y limitaciones:
- El linaje de datos no está disponible para las cargas de trabajo ni las sesiones de SparkR o Spark Streaming.
Antes de comenzar
En la página del selector de proyectos de la consola de Google Cloud , selecciona el proyecto que deseas usar para tus cargas de trabajo o sesiones de Serverless para Apache Spark.
Habilita la API de Data Lineage.
Roles requeridos
Si tu carga de trabajo por lotes usa la cuenta de servicio predeterminada de Serverless para Apache Spark, tiene el rol de Dataproc Worker
, que habilita el linaje de datos. No es necesario realizar ninguna otra acción.
Sin embargo, si tu carga de trabajo por lotes usa una cuenta de servicio personalizada para habilitar el linaje de datos, debes otorgar un rol obligatorio a la cuenta de servicio personalizada, como se explica en el siguiente párrafo.
Para obtener los permisos que necesitas para usar el linaje de datos con Dataproc, pídele a tu administrador que te otorgue los siguientes roles de IAM en la cuenta de servicio personalizada de tu carga de trabajo por lotes:
-
Otorga uno de los siguientes roles:
-
Trabajador de Dataproc (
roles/dataproc.worker
) -
Editor de linaje de datos (
roles/datalineage.editor
) -
Productor de linaje de datos (
roles/datalineage.producer
) -
Administrador de linaje de datos (
roles/datalineage.admin
)
-
Trabajador de Dataproc (
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.
Habilita el linaje de datos a nivel del proyecto
Puedes habilitar el linaje de datos a nivel del proyecto. Cuando se habilita a nivel del proyecto, todas las cargas de trabajo por lotes y las sesiones interactivas posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.
Cómo habilitar el linaje de datos a nivel del proyecto
Para habilitar el linaje de datos a nivel del proyecto, establece los siguientes metadatos personalizados del proyecto.
Clave | Valor |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Puedes inhabilitar el linaje de datos a nivel del proyecto si estableces los metadatos DATAPROC_LINEAGE_ENABLED
en false
.
Habilita el linaje de datos para una carga de trabajo por lotes de Spark
Puedes habilitar el linaje de datos en una carga de trabajo por lotes configurando la propiedad spark.dataproc.lineage.enabled
como true
cuando envíes la carga de trabajo.
Ejemplo de carga de trabajo por lotes
En este ejemplo, se envía una carga de trabajo de lineage-example.py
por lotes con el linaje de Spark habilitado.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
lee datos de una tabla pública de BigQuery y, luego, escribe el resultado en una tabla nueva de un conjunto de datos existente de BigQuery. Usa un bucket de Cloud Storage para el almacenamiento temporal.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Realiza los siguientes reemplazos:
REGION: Selecciona una región para ejecutar tu carga de trabajo.
BUCKET: Es el nombre de un bucket de Cloud Storage existente en el que se almacenarán las dependencias.
PROJECT_ID, DATASET y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir).
Puedes ver el gráfico de linaje en la IU de Dataplex Universal Catalog.
Habilita el linaje de datos para una sesión interactiva de Spark
Puedes habilitar el linaje de datos en una sesión interactiva de Spark si configuras la propiedad spark.dataproc.lineage.enabled
en true
cuando crees la sesión o la plantilla de sesión.
Ejemplo de sesión interactiva
El siguiente código de notebook de PySpark configura una sesión interactiva de Serverless for Apache Spark con el linaje de datos de Spark habilitado. Luego, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos públicos de Shakespeare de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos existente de BigQuery.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Realiza los siguientes reemplazos:
- PROJECT_ID, DATASET y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir).
Para ver el gráfico de linaje de datos, haz clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, luego, selecciona la pestaña Linaje en el panel de detalles de la tabla.
Consulta el linaje en Dataplex Universal Catalog
Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la consola de Google Cloud o recuperar la información de la API de Data Lineage como datos JSON.
¿Qué sigue?
- Obtén más información sobre el linaje de datos.