En este documento se describe cómo habilitar el linaje de datos en las cargas de trabajo por lotes y las sesiones interactivas deGoogle Cloud Serverless para Apache Spark a nivel de proyecto, carga de trabajo por lotes o sesión interactiva.
Información general
El linaje de los datos es una función de Dataplex Universal Catalog que te permite hacer un seguimiento de cómo se mueven los datos por tus sistemas: de dónde proceden, a dónde se transfieren y qué transformaciones se les aplican.
Google Cloud Sin servidor para cargas de trabajo y sesiones de Apache Spark, captura eventos de linaje y los publica en la API Data Lineage de Dataplex Universal Catalog. Serverless para Apache Spark se integra con la API Data Lineage a través de OpenLineage mediante el plugin OpenLineage Spark.
Puedes acceder a la información del linaje a través de Dataplex Universal Catalog mediante gráficos de linaje y la API Data Lineage. Para obtener más información, consulta Ver gráficos de linaje en Dataplex Universal Catalog.
Disponibilidad, funciones y limitaciones
El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con las versiones del tiempo de ejecución de Serverless para Apache Spark 1.1
, 1.2
y 2.2
, con las siguientes excepciones y limitaciones:
- El linaje de datos no está disponible para las cargas de trabajo ni las sesiones de SparkR o Spark Streaming.
Antes de empezar
En la página del selector de proyectos de la Google Cloud consola, selecciona el proyecto que quieras usar para tus cargas de trabajo o sesiones de Serverless para Apache Spark.
Habilita la API Data Lineage.
Roles obligatorios
Si tu carga de trabajo por lotes usa la cuenta de servicio predeterminada de Serverless para Apache Spark, tiene el rol Dataproc Worker
, que habilita el linaje de datos. No es necesario que hagas nada más.
Sin embargo, si tu carga de trabajo por lotes usa una cuenta de servicio personalizada para habilitar el linaje de datos, debes conceder un rol obligatorio a la cuenta de servicio personalizada, tal como se explica en el siguiente párrafo.
Para obtener los permisos que necesitas para usar el linaje de datos con Dataproc, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos en tu cuenta de servicio personalizada de carga de trabajo por lotes:
-
Asigna uno de los siguientes roles:
-
Trabajador de Dataproc (
roles/dataproc.worker
) -
Editor de linaje de datos (
roles/datalineage.editor
) -
Productor de linaje de datos (
roles/datalineage.producer
) -
Administrador de linaje de datos (
roles/datalineage.admin
)
-
Trabajador de Dataproc (
Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Habilitar el linaje de datos a nivel de proyecto
Puede habilitar el linaje de datos a nivel de proyecto. Si se habilita a nivel de proyecto, todas las cargas de trabajo por lotes y las sesiones interactivas posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.
Cómo habilitar el linaje de datos a nivel de proyecto
Para habilitar el linaje de datos a nivel de proyecto, define los siguientes metadatos de proyecto personalizados.
Clave | Valor |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Para inhabilitar el linaje de datos a nivel de proyecto, asigna el valor false
a los metadatos DATAPROC_LINEAGE_ENABLED
.
Habilitar el linaje de datos de una carga de trabajo por lotes de Spark
Para habilitar el linaje de datos en una carga de trabajo por lotes, defina la propiedad spark.dataproc.lineage.enabled
en true
cuando envíe la carga de trabajo.
Ejemplo de carga de trabajo por lotes
En este ejemplo se envía una carga de trabajo por lotes lineage-example.py
con el linaje de Spark habilitado.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
lee datos de una tabla pública de BigQuery y, a continuación, escribe la salida en una tabla nueva de un conjunto de datos de BigQuery. Utiliza un segmento de Cloud Storage para el almacenamiento temporal.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Haz las siguientes sustituciones:
REGION: selecciona una región para ejecutar tu carga de trabajo.
BUCKET: nombre de un segmento de Cloud Storage en el que se almacenarán las dependencias.
PROJECT_ID, DATASET y TABLE: inserta el ID de tu proyecto, el nombre de un conjunto de datos de BigQuery y el nombre de una tabla que quieras crear en el conjunto de datos (la tabla no debe existir).
Puedes ver el gráfico de linaje en la interfaz de usuario de Dataplex Universal Catalog.
Habilitar el linaje de datos en una sesión interactiva de Spark
Puedes habilitar el linaje de datos en una sesión interactiva de Spark
asignando el valor true
a la propiedad spark.dataproc.lineage.enabled
cuando
crees la sesión o la plantilla de sesión.
Ejemplo de sesión interactiva
El siguiente código de cuaderno de PySpark configura una sesión interactiva de Serverless para Apache Spark con el linaje de datos de Spark habilitado. A continuación, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos público de Shakespeare de BigQuery y, después, escribe el resultado en una tabla nueva de un conjunto de datos de BigQuery.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Haz las siguientes sustituciones:
- PROJECT_ID, DATASET y TABLE: inserta el ID de tu proyecto, el nombre de un conjunto de datos de BigQuery y el nombre de una tabla que quieras crear en el conjunto de datos (la tabla no debe existir).
Para ver el gráfico de linaje de datos, haga clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, a continuación, seleccione la pestaña Linaje en el panel de detalles de la tabla.
Ver el linaje en Dataplex Universal Catalog
Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver información sobre el linaje de datos en la consola Google Cloud o recuperar la información de la API Data Lineage como datos JSON.