Utilizzare la derivazione dei dati con Dataproc Serverless

Questo documento descrive come attivare la trasparenza dei dati per i carichi di lavoro batch di Dataproc Serverless per Spark a livello di progetto o di carico di lavoro batch.

Panoramica

La derivazione dei dati è una funzionalità di Dataplex che consente di monitorare il modo in cui i dati vengono spostati nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.

I carichi di lavoro Dataproc Serverless per Spark acquisiscono gli eventi di derivazione e li pubblicano nell'API Data Lineage di Dataplex. Dataproc Serverless per Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.

Puoi accedere alle informazioni sulla derivazione tramite Dataplex utilizzando i grafici di visualizzazione della derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici della derivazione in Dataplex.

Disponibilità, funzionalità e limitazioni

La cronologia dei dati, che supporta le origini dati BigQuery e Cloud Storage, è disponibile per i carichi di lavoro eseguiti con le versioni del runtime Dataproc Serverless per Spark 1.1.50+, 1.2.29+ e 2.2.29+, con le seguenti eccezioni e limitazioni:

  • La derivazione dei dati non è disponibile per i workload SparkR o Spark streaming.

Prima di iniziare

  1. Nella pagina di selezione del progetto della console Google Cloud, seleziona il progetto da utilizzare per i carichi di lavoro Dataproc Serverless per Spark.

    Vai al selettore dei progetti

  2. Abilita le API Data Lineage e Data Catalog.

    Abilita le API

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per utilizzare la consistenza dei dati in Dataproc Serverless per Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM nell'account di servizio VM del cluster Dataproc:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Attivare la derivazione dei dati a livello di progetto

Puoi attivare la derivazione dei dati a livello di progetto. Se abilitato a livello di progetto, tutti i carichi di lavoro batch successivi eseguiti nel progetto avranno attivato la consistenza di Spark.

Come attivare la derivazione dei dati a livello di progetto

Per attivare la definizione della struttura dei dati a livello di progetto, imposta i seguenti metadati del progetto personalizzati.

Chiave Valore
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puoi disattivare la definizione della struttura dei dati a livello di progetto impostando i metadati DATAPROC_LINEAGE_ENABLED su false.

Abilita la derivazione dei dati per un carico di lavoro batch Spark

Puoi attivare la concatenazione dei dati su un carico di lavoro batch impostando la proprietà spark.dataproc.lineage.enabled su true quando invii il carico di lavoro.

Esempio di interfaccia a riga di comando gcloud:

gcloud dataproc batches submit pyspark FILENAME.py
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

Visualizzare i grafici della derivazione in Dataplex

Un grafico di visualizzazione della struttura mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla cronologia dei dati in una visualizzazione di grafici nella console Google Cloud o recuperare le informazioni dall'API Data Lineage come dati JSON.

Per ulteriori informazioni, consulta Utilizzare la concatenazione dei dati con i sistemi Google Cloud .

Esempio:

Il seguente carico di lavoro Spark legge i dati da una tabella BigQuery e poi scrive l'output in un'altra tabella BigQuery.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Questo carico di lavoro Spark crea il seguente grafico della struttura nella UI di Dataplex:

Grafo di derivazione di esempio

Passaggi successivi