En este documento, se describe cómo habilitar el linaje de datos en las cargas de trabajo por lotes y las sesiones interactivas de Dataproc Serverless para Spark en el nivel del proyecto, la carga de trabajo por lotes o la sesión interactiva.
Descripción general
El linaje de datos es una función de Dataplex que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.
Las cargas de trabajo y las sesiones de Dataproc Serverless para Spark capturan eventos de linaje y los publican en la API de Data Lineage de Dataplex. Dataproc Serverless para Spark se integra en la API de Data Lineage a través de OpenLineage, con el complemento OpenLineage Spark.
Puedes acceder a la información de linaje a través de Dataplex con los grafos de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Dataplex.
Disponibilidad, funciones y limitaciones
El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con las versiones del entorno de ejecución de Spark de Dataproc sin servidor 1.1
, 1.2
y 2.2
, con las siguientes excepciones y limitaciones:
- El linaje de datos no está disponible para las cargas de trabajo o sesiones de transmisión de SparkR o Spark.
Antes de comenzar
En la página del selector de proyectos de la consola de Google Cloud, selecciona el proyecto que usarás para tus cargas de trabajo o sesiones de Dataproc sin servidor para Spark.
Habilita las APIs de Data Lineage y Dataplex.
Roles obligatorios
Para obtener los permisos que necesitas para usar el linaje de datos en Dataproc Serverless para Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM en la cuenta de servicio de la VM del clúster de Dataproc:
-
Consulta el linaje en Dataplex o usa la API de Data Lineage:
Visualizador de linaje de datos (
roles/datalineage.viewer
) -
Genera el linaje de forma manual con la API:
Productor de eventos de linaje de datos (
roles/datalineage.producer
) -
Edita el linaje con la API:
Editor de linaje de datos (
roles/datalineage.editor
) -
Realizar todas las operaciones en el linaje:
Administrador de linaje de datos (
roles/datalineage.admin
)
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.
Habilita el linaje de datos a nivel del proyecto
Puedes habilitar el linaje de datos a nivel del proyecto. Cuando se habilita a nivel del proyecto, todas las cargas de trabajo por lotes y las sesiones interactivas posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.
Cómo habilitar el linaje de datos a nivel del proyecto
Para habilitar el linaje de datos a nivel del proyecto, configura los siguientes metadatos personalizados del proyecto.
Clave | Valor |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Puedes inhabilitar el linaje de datos a nivel del proyecto si estableces los metadatos DATAPROC_LINEAGE_ENABLED
en false
.
Habilita el linaje de datos para una carga de trabajo por lotes de Spark
Para habilitar el linaje de datos en una carga de trabajo por lotes, configura la propiedad spark.dataproc.lineage.enabled
en true
cuando envíes la carga de trabajo.
Ejemplo de carga de trabajo por lotes
En este ejemplo, se envía una carga de trabajo lineage-example.py
por lotes con el linaje de Spark
habilitado.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
lee datos de una tabla de BigQuery y, luego, escribe el resultado en una tabla de BigQuery diferente.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Puedes ver el gráfico de linaje en la IU de Dataplex.
Habilita el linaje de datos para una sesión interactiva de Spark
Para habilitar el linaje de datos en una sesión interactiva de Spark, configura la propiedad spark.dataproc.lineage.enabled
en true
cuando crees la sesión o la plantilla de sesión.
Ejemplo de sesión interactiva
El siguiente código de notebook de PySpark configura una sesión interactiva de Dataproc Serverless con el linaje de datos de Spark habilitado que se ejecuta en una subred regional de VPC de Acceso privado a Google. Luego, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos público de Shakespeare de BigQuery y, luego, escribe el resultado en una tabla de BigQuery.
from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("LINEAGE_BQ_TO_BQ")
.dataprocConfig(session)
.getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Para ver el gráfico del linaje de datos, haz clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, luego, selecciona la pestaña de linaje en el panel de detalles de la tabla.
Consultar el linaje en Dataplex
Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la consola de Google Cloud o recuperar la información de la API de Data Lineage como datos JSON.
¿Qué sigue?
- Obtén más información sobre el linaje de datos.