Utilizzare la derivazione dei dati con Serverless per Apache Spark

Questo documento descrive come attivare data lineage su Google Cloud batch workload e sessioni interattive serverless per Apache Spark a livello di progetto, batch workload o sessione interattiva.

Panoramica

La derivazione dei dati è una funzionalità di Dataplex Universal Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.

Google Cloud Serverless per i workload e le sessioni Apache Spark acquisisce gli eventi di lineage e li pubblica nell'API Data Lineage di Dataplex Universal Catalog. Serverless per Apache Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.

Puoi accedere alle informazioni sulla derivazione tramite Dataplex Universal Catalog, utilizzando i grafici di derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici di derivazione nel Catalogo universale Dataplex.

Disponibilità, funzionalità e limitazioni

La derivazione dei dati, che supporta le origini dati BigQuery e Cloud Storage, è disponibile per i workload e le sessioni eseguiti con le versioni del runtime Serverless per Apache Spark 1.1, 1.2 e 2.2, con le seguenti eccezioni e limitazioni:

  • La derivazione dei dati non è disponibile per i carichi di lavoro o le sessioni SparkR o Spark Streaming.

Prima di iniziare

  1. Nella pagina di selezione del progetto nella console Google Cloud , seleziona il progetto da utilizzare per i carichi di lavoro o le sessioni di Serverless per Apache Spark.

    Vai al selettore dei progetti

  2. Abilita l'API Data Lineage.

    Abilita le API

Ruoli obbligatori

Se il tuo workload batch utilizza il service account Serverless for Apache Spark predefinito, ha il ruolo Dataproc Worker, che attiva la lineage dei dati. Non sono necessari ulteriori interventi.

Tuttavia, se il tuo batch utilizza un account di servizio personalizzato per attivare la derivazione dei dati, devi concedere un ruolo obbligatorio al service account personalizzato come spiegato nel paragrafo seguente.

Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Dataproc, chiedi all'amministratore di concederti i seguenti ruoli IAM sul account di servizio personalizzato del carico di lavoro batch:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Abilitare la derivazione dei dati a livello di progetto

Puoi abilitare la derivazione dei dati a livello di progetto. Se abilitata a livello di progetto, tutte le sessioni batch e interattive successive eseguite nel progetto avranno la lineage Spark abilitata.

Come attivare la derivazione dei dati a livello di progetto

Per attivare la derivazione dei dati a livello di progetto, imposta i seguenti metadati di progetto personalizzati.

Chiave Valore
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puoi disattivare la derivazione dei dati a livello di progetto impostando i metadati DATAPROC_LINEAGE_ENABLED su false.

Abilita la derivazione dei dati per un workload batch Spark

Puoi attivare la derivazione dei dati su un workload batch impostando la proprietà spark.dataproc.lineage.enabled su true quando invii il workload.

Esempio di carico di lavoro batch

Questo esempio invia un carico di lavoro batch lineage-example.py con la derivazione Spark attivata.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py legge i dati da una tabella BigQuery pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Effettua le seguenti sostituzioni:

  • REGION: seleziona una regione per eseguire il workload.

  • BUCKET: il nome di un bucket Cloud Storage esistente in cui archiviare le dipendenze.

  • PROJECT_ID, DATASET e TABLE: inserisci l'ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere).

Puoi visualizzare il grafico della derivazione nella UI di Dataplex Universal Catalog.

Grafico di derivazione Spark

Abilita la derivazione dei dati per una sessione interattiva Spark

Puoi attivare la derivazione dei dati in una sessione interattiva Spark impostando la proprietà spark.dataproc.lineage.enabled su true quando crei la sessione o il modello di sessione.

Esempio di sessione interattiva

Il seguente codice del notebook PySpark configura una sessione interattiva di Serverless per Apache Spark con la derivazione dei dati Spark abilitata. Poi crea una sessione Spark Connect che esegue una query di conteggio parole su un set di dati pubblico di BigQuery Shakespeare e scrive l'output in una nuova tabella in un set di dati BigQuery esistente.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Effettua le seguenti sostituzioni:

  • PROJECT_ID, DATASET e TABLE: inserisci l'ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere).

Puoi visualizzare il grafico della derivazione dei dati facendo clic sul nome della tabella di destinazione elencato nel riquadro di navigazione della pagina Esplora di BigQuery, quindi selezionando la scheda Derivazione nel riquadro dei dettagli della tabella.

Grafico di derivazione Spark

Visualizza la derivazione nel Catalogo universale Dataplex

Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella console Google Cloud o recuperare le informazioni dall'API Data Lineage come dati JSON.

Passaggi successivi