Datenabfolge mit Dataproc Serverless verwenden

In diesem Dokument wird beschrieben, wie Sie die Datenabstammung in Dataproc Serverless für Spark-Batcharbeitslasten und interaktive Sitzungen auf Projekt-, Batcharbeitslast- oder interaktiver Sitzung-Ebene aktivieren.

Übersicht

Die Datenherkunft ist eine Dataplex-Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden.

Mit Dataproc Serverless für Spark-Arbeitslasten und ‑Sitzungen werden Herkunftsereignisse erfasst und in der Data Lineage API von Dataplex veröffentlicht. Dataproc Serverless for Spark lässt sich über OpenLineage mit der Data Lineage API verbinden. Dazu wird das OpenLineage Spark-Plug-in verwendet.

Sie können über Dataplex auf Herkunftsinformationen zugreifen, indem Sie Herkunftsdiagramme und die Data Lineage API verwenden. Weitere Informationen finden Sie unter Herkunftsdiagramme in Dataplex ansehen.

Verfügbarkeit, Funktionen und Einschränkungen

Die Datenabfolge, die BigQuery- und Cloud Storage-Datenquellen unterstützt, ist für Arbeitslasten und Sitzungen verfügbar, die mit den Dataproc Serverless for Spark-Laufzeitversionen 1.1, 1.2 und 2.2 ausgeführt werden, mit folgenden Ausnahmen und Einschränkungen:

  • Die Datenabfolge ist für SparkR- oder Spark-Streaming-Arbeitslasten oder ‑sitzungen nicht verfügbar.

Hinweise

  1. Wählen Sie in der Google Cloud Console auf der Seite zur Projektauswahl das Projekt aus, das Sie für Ihre Dataproc Serverless for Spark-Arbeitslasten oder ‑sitzungen verwenden möchten.

    Zur Projektauswahl

  2. Aktivieren Sie die Data Lineage API und die Dataplex API.

    APIs aktivieren

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das VM-Dienstkonto des Dataproc-Clusters zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung der Datenabfolge in Dataproc Serverless für Spark benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Datenabfolge auf Projektebene aktivieren

Sie können die Datenherkunft auf Projektebene aktivieren. Wenn die Option auf Projektebene aktiviert ist, ist die Spark-Abstammung für alle nachfolgenden Batcharbeitslasten und interaktiven Sitzungen aktiviert, die Sie im Projekt ausführen.

Datenabfolge auf Projektebene aktivieren

Wenn Sie die Datenabfolge auf Projektebene aktivieren möchten, legen Sie die folgenden benutzerdefinierten Projektmetadaten fest.

Schlüssel Wert
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Sie können die Datenabfolge auf Projektebene deaktivieren, indem Sie die Metadaten DATAPROC_LINEAGE_ENABLED auf false festlegen.

Datenabfolge für eine Spark-Batcharbeitslast aktivieren

Sie können die Datenabfolge für eine Batcharbeitslast aktivieren, indem Sie die Eigenschaft spark.dataproc.lineage.enabled beim Einreichen der Arbeitslast auf true festlegen.

Beispiel für eine Batch-Arbeitslast

In diesem Beispiel wird eine Batch-lineage-example.py-Arbeitslast mit aktivierter Spark-Abstammung gesendet.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py liest Daten aus einer BigQuery-Tabelle und schreibt die Ausgabe in eine andere BigQuery-Tabelle.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Sie können das Lineage-Diagramm in der Dataplex-Benutzeroberfläche aufrufen.

Spark-Abstammungsdiagramm

Datenabfolge für eine interaktive Spark-Sitzung aktivieren

Sie können die Datenabfolge in einer interaktiven Spark-Sitzung aktivieren, indem Sie die spark.dataproc.lineage.enabled-Eigenschaft beim Erstellen der Sitzung oder Sitzungsvorlage auf true festlegen.

Beispiel für eine interaktive Sitzung

Im folgenden PySpark-Notebook-Code wird eine interaktive Dataproc Serverless-Sitzung mit aktivierter Spark-Datenabstammung konfiguriert, die in einem regionalen VPC-Subnetz mit privatem Google-Zugriff ausgeführt wird. Anschließend wird eine Spark Connect-Sitzung erstellt, in der eine Wortzählung für ein öffentliches BigQuery-Shakespeare-Dataset ausgeführt und die Ausgabe in eine BigQuery-Tabelle geschrieben wird.

from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"

# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("LINEAGE_BQ_TO_BQ")
     .dataprocConfig(session)
     .getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Sie können sich das Datenabstammungsdiagramm ansehen, indem Sie auf der BigQuery-Seite Explorer im Navigationsbereich auf den Namen der Zieltabelle klicken und dann im Bereich mit den Tabellendetails den Tab „Abstammung“ auswählen.

Spark-Abstammungsdiagramm

Lineage in Dataplex ansehen

Ein Liniendiagramm zeigt die Beziehungen zwischen Ihren Projektressourcen und den Prozessen, mit denen sie erstellt wurden. Sie können Informationen zur Datenherkunft in der Google Cloud Console aufrufen oder sie als JSON-Daten über die Data Lineage API abrufen.

Nächste Schritte