Questo documento descrive come attivare la trasparenza dei dati su Dataproc Serverless per i carichi di lavoro batch e le sessioni interattive di Spark a livello di progetto, carico di lavoro batch o sessione interattiva.
Panoramica
La derivazione dei dati è una funzionalità di Dataplex che consente di monitorare il modo in cui i dati vengono spostati nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.
I carichi di lavoro e le sessioni Dataproc Serverless per Spark acquisiscono gli eventi di derivazione e li pubblicano nell'API Data Lineage di Dataplex. Dataproc Serverless per Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.
Puoi accedere alle informazioni sulla derivazione tramite Dataplex utilizzando grafici di derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici della derivazione in Dataplex.
Disponibilità, funzionalità e limitazioni
La cronologia dei dati, che supporta le origini dati BigQuery e Cloud Storage, è disponibile per i carichi di lavoro e le sessioni eseguiti con le versioni di runtime Dataproc Serverless per Spark 1.1
, 1.2
e 2.2
, con le seguenti eccezioni e limitazioni:
- La derivazione dei dati non è disponibile per i workload o le sessioni di streaming SparkR o Spark.
Prima di iniziare
Nella pagina di selezione del progetto nella console Google Cloud, seleziona il progetto da utilizzare per i carichi di lavoro o le sessioni Dataproc Serverless per Spark.
Abilita l'API Data Lineage e l'API Dataplex.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per utilizzare la consistenza dei dati in Dataproc Serverless per Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM nell'account di servizio VM del cluster Dataproc:
-
Visualizza la derivazione in Dataplex o utilizza l'API Data Lineage:
Data Lineage Viewer (
roles/datalineage.viewer
) -
Genera la cronologia manualmente utilizzando l'API:
Data Lineage Events Producer (
roles/datalineage.producer
) -
Modificare la struttura utilizzando l'API:
Data Lineage Editor (
roles/datalineage.editor
) -
Esegui tutte le operazioni sulla derivazione:
Amministratore di Data Lineage (
roles/datalineage.admin
)
Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Attivare la derivazione dei dati a livello di progetto
Puoi attivare la derivazione dei dati a livello di progetto. Se questa opzione è attivata a livello di progetto, tutti i carichi di lavoro batch e le sessioni interattive successivi eseguiti nel progetto avranno attivato la consistenza Spark.
Come attivare la derivazione dei dati a livello di progetto
Per attivare la definizione della struttura dei dati a livello di progetto, imposta i seguenti metadati del progetto personalizzati.
Chiave | Valore |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Puoi disattivare la definizione dell'origine dati a livello di progetto impostando i metadati DATAPROC_LINEAGE_ENABLED
su false
.
Abilita la derivazione dei dati per un carico di lavoro batch Spark
Puoi attivare la consistenza dei dati su un carico di lavoro batch impostando la proprietà spark.dataproc.lineage.enabled
su true
quando invii il carico di lavoro.
Esempio di carico di lavoro batch
Questo esempio invia un carico di lavoro batch lineage-example.py
con la concatenazione Spark attivata.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
legge i dati da una tabella BigQuery e poi scrive l'output in un'altra tabella BigQuery.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Puoi visualizzare il grafico della struttura in Dataplex.
Attivare la derivazione dei dati per una sessione interattiva Spark
Puoi attivare la concatenazione dei dati in una sessione interattiva Spark impostando la proprietà spark.dataproc.lineage.enabled
su true
quando crei la sessione o il modello di sessione.
Esempio di sessione interattiva
Il seguente codice del notebook PySpark configura una sessione interattiva Dataproc Serverless con la concatenazione dei dati Spark abilitata in esecuzione in una subnet regionale VPC con accesso privato Google. Quindi crea una sessione Spark Connect che esegue una query di conteggio parole su un set di dati pubblico di BigQuery su Shakespeare e poi scrive l'output in una tabella BigQuery.
from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("LINEAGE_BQ_TO_BQ")
.dataprocConfig(session)
.getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Puoi visualizzare il grafico della struttura dei dati facendo clic sul nome della tabella di destinazione elencato nel riquadro di navigazione della pagina Esplora di BigQuery e poi selezionando la scheda della struttura nel riquadro dei dettagli della tabella.
Visualizza la derivazione in Dataplex
Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla cronologia dei dati nella console Google Cloud o recuperare le informazioni dall'API Data Lineage come dati JSON.
Passaggi successivi
- Scopri di più sulla derivazione dei dati.