Datenabfolge in Dataproc aktivieren

In diesem Dokument wird beschrieben, wie Sie die Datenabstammung für Ihre Dataproc Spark-Jobs entweder auf Projekt- oder Clusterebene aktivieren.

Übersicht

Die Datenherkunft ist eine Dataplex-Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden.

Die Datenabfolge ist für alle Dataproc Spark-Jobs mit Ausnahme von SparkR mit Dataproc Compute Engine-Images der Version 2.0.74 und höher sowie 2.1.22 und höher verfügbar. Außerdem werden BigQuery- und Cloud Storage-Datenquellen unterstützt.

Nachdem Sie die Funktion in Ihrem Dataproc-Cluster aktiviert haben, erfassen Dataproc-Spark-Jobs Datenabstammungsereignisse und veröffentlichen sie in der Data Lineage API von Dataplex. Dataproc wird über OpenLineage mit der Data Lineage API über das OpenLineage Spark-Plug-in integriert.

Sie können über Dataplex auf Informationen zur Datenabfolge zugreifen. Dazu haben Sie folgende Möglichkeiten:

Beschränkungen

Die Datenabfolge ist für SparkR- oder Spark-Streamingjobs nicht verfügbar.

Hinweise

  1. Wählen Sie in der Google Cloud Console auf der Projektauswahlseite das Projekt aus, das den Dataproc-Cluster enthält, dessen Herkunft Sie verfolgen möchten.

    Zur Projektauswahl

  2. Aktivieren Sie die Data Lineage API und die Data Catalog API.

    APIs aktivieren

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das VM-Dienstkonto des Dataproc-Clusters zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung der Datenabfolge in Dataproc benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Datenabfolge auf Projektebene aktivieren

Sie können die Datenherkunft auf Projektebene aktivieren. Für unterstützte Spark-Jobs, die in Clustern ausgeführt werden, die nach der Aktivierung der Datenherkunft in einem Projekt erstellt wurden, ist die Datenherkunft aktiviert. Für Jobs, die auf vorhandenen Clustern ausgeführt werden, also Clustern, die erstellt wurden, bevor die Datenabfolge auf Projektebene aktiviert wurde, ist die Datenabfolge nicht aktiviert.

Datenabfolge auf Projektebene aktivieren

Wenn Sie die Datenabfolge auf Projektebene aktivieren möchten, legen Sie die folgenden benutzerdefinierten Projektmetadaten fest:

Schlüssel Wert
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Sie können die Datenabfolge auf Projektebene deaktivieren, indem Sie die Metadaten DATAPROC_LINEAGE_ENABLED auf false festlegen.

Datenabfolge auf Clusterebene aktivieren

Sie können die Datenabfolge beim Erstellen eines Clusters aktivieren, damit für alle unterstützten Spark-Jobs, die an den Cluster gesendet werden, die Datenabfolge aktiviert ist.

Data Lineage auf Clusterebene aktivieren

Wenn Sie die Datenabfolge in einem Cluster aktivieren möchten, erstellen Sie einen Dataproc-Cluster, bei dem die Clustereigenschaft dataproc:dataproc.lineage.enabled auf true festgelegt ist.

Cluster mit der Image-Version 2.0:Für die Datenabfolge ist der Zugriffsbereich cloud-platform für Dataproc-Cluster-VMs erforderlich. Bei Dataproc-Clustern mit der Image-Version 2.1 und höher ist cloud-platform aktiviert. Wenn Sie beim Erstellen eines Clusters die Dataproc-Imageversion 2.0 angeben, legen Sie den Umfang auf cloud-platform fest.

Beispiel für die gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Datenabfolge für einen Job deaktivieren

Wenn Sie die Datenabfolge auf Clusterebene aktivieren, können Sie sie für einen bestimmten Job deaktivieren, indem Sie beim Einreichen des Jobs das Attribut spark.extraListeners mit einem leeren Wert („""“) übergeben.

Nach der Aktivierung kann die Datenabfolge im Cluster nicht mehr deaktiviert werden. Wenn Sie die Datenabfolge für alle Clusterjobs entfernen möchten, können Sie den Cluster ohne das Attribut dataproc:dataproc.lineage.enabled neu erstellen.

Spark-Job senden

Wenn Sie einen Spark-Job in einem Dataproc-Cluster einreichen, der mit aktivierter Datenabfolge erstellt wurde, erfasst Dataproc die Informationen zur Datenabfolge und meldet sie an die Data Lineage API.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Hinweise:

  • Das Hinzufügen der Properties spark.openlineage.namespace und spark.openlineage.appName, die zum eindeutigen Identifizieren des Jobs verwendet werden, ist optional. Wenn Sie diese Eigenschaften nicht hinzufügen, verwendet Dataproc die folgenden Standardwerte:
    • Standardwert für spark.openlineage.namespace: PROJECT_ID
    • Standardwert für spark.openlineage.appName: spark.app.name

Herkunftsdiagramme in Dataplex ansehen

In einem Liniendiagramm werden die Beziehungen zwischen Ihren Projektressourcen und den Prozessen dargestellt, mit denen sie erstellt wurden. Sie können Informationen zur Datenherkunft in Form einer Grafikvisualisierung in der Google Cloud Console aufrufen oder sie in JSON-Datenformat über die Data Lineage API abrufen.

Weitere Informationen finden Sie unter Herkunftsdiagramme in der Dataplex-Benutzeroberfläche ansehen.

Beispiel:

Im folgenden Spark-Job werden Daten aus einer BigQuery-Tabelle gelesen und in eine andere BigQuery-Tabelle geschrieben.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

Der Spark-Job erstellt das folgende Herkunftsdiagramm in der Dataplex-Benutzeroberfläche:

Beispiel für ein Stammbaumdiagramm

Nächste Schritte