Ce document explique comment activer la lignée de données sur les charges de travail par lot et les sessions interactives Dataproc sans serveur pour Spark au niveau du projet, de la charge de travail par lot ou de la session interactive.
Présentation
La traçabilité des données est une fonctionnalité Dataplex qui vous permet de suivre la manière dont les données transitent par vos systèmes: leur origine, la cible de transmission, et les transformations qui leur sont appliquées.
Les charges de travail et les sessions Dataproc sans serveur pour Spark capturent les événements de traçabilité et les publient dans l'API Data Lineage de Dataplex. Dataproc sans serveur pour Spark s'intègre à l'API Data Lineage via OpenLineage, à l'aide du plug-in Spark OpenLineage.
Vous pouvez accéder aux informations sur la traçabilité via Dataplex, à l'aide de graphiques de traçabilité et de l'API Data Lineage. Pour en savoir plus, consultez Afficher des graphiques de traçabilité dans Dataplex.
Disponibilité, fonctionnalités et limites
La lignée de données, qui est compatible avec les sources de données BigQuery et Cloud Storage, est disponible pour les charges de travail et les sessions exécutées avec les versions d'exécution Dataproc sans serveur pour Spark 1.1
, 1.2
et 2.2
, avec les exceptions et les limites suivantes:
- La traçabilité des données n'est pas disponible pour les sessions ou les charges de travail de streaming SparkR ou Spark.
Avant de commencer
Sur la page de sélection du projet dans la console Google Cloud, sélectionnez le projet à utiliser pour vos sessions ou charges de travail Dataproc Serverless for Spark.
Activez les API Data Lineage et Dataplex.
Rôles requis
Pour obtenir les autorisations nécessaires pour utiliser la lignée de données dans Dataproc sans serveur pour Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le compte de service de VM du cluster Dataproc:
-
Pour afficher la traçabilité dans Dataplex ou utiliser l'API Data Lineage :
Lecteur de traçabilité des données (
roles/datalineage.viewer
) -
Générer manuellement la traçabilité à l'aide de l'API :
Producteur d'événements de traçabilité des données (
roles/datalineage.producer
) -
Modifier la traçabilité à l'aide de l'API :
Éditeur de traçabilité des données (
roles/datalineage.editor
) -
Effectuez toutes les opérations sur la traçabilité :
Administrateur de la traçabilité des données (
roles/datalineage.admin
)
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Activer la traçabilité des données au niveau du projet
Vous pouvez activer la traçabilité des données au niveau du projet. Lorsqu'il est activé au niveau du projet, le lignage Spark est activé pour toutes les charges de travail par lot et sessions interactives ultérieures que vous exécutez dans le projet.
Activer la généalogie des données au niveau du projet
Pour activer la lignée des données au niveau du projet, définissez les métadonnées de projet personnalisées suivantes.
Clé | Valeur |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Vous pouvez désactiver la lignée des données au niveau du projet en définissant les métadonnées DATAPROC_LINEAGE_ENABLED
sur false
.
Activer la traçabilité des données pour une charge de travail par lot Spark
Vous pouvez activer la lignée de données sur une charge de travail par lot en définissant la propriété spark.dataproc.lineage.enabled
sur true
lorsque vous envoyez la charge de travail.
Exemple de charge de travail par lot
Cet exemple envoie une charge de travail lineage-example.py
par lot avec la lignée Spark activée.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
lit les données d'une table BigQuery, puis écrit la sortie dans une autre table BigQuery.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = lineage-demo
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = sample.source
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Vous pouvez afficher le graphique de la traçabilité dans l'interface utilisateur de Dataplex.
Activer la traçabilité des données pour une session interactive Spark
Vous pouvez activer la lignée de données sur une session interactive Spark en définissant la propriété spark.dataproc.lineage.enabled
sur true
lorsque vous créez la session ou le modèle de session.
Exemple de session interactive
Le code de notebook PySpark suivant configure une session interactive Dataproc sans serveur avec la lignée de données Spark activée, exécutée sur un sous-réseau régional VPC avec accès privé à Google. Il crée ensuite une session Spark Connect qui exécute une requête de décompte de mots sur un ensemble de données Shakespeare BigQuery public, puis écrit la sortie dans une table BigQuery.
from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
# Configure the Dataproc Serverless interactive session. Enable Spark data lineage.
project_id = "sample-project-id"
region = "us-central1"
subnet_name = "sample-private-google-access-subnet"
session.environment_config.execution_config.subnetwork_uri = f"projects/{project_id}/regions/{region}/subnetworks/{subnet_name}"
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
session.runtime_config.version = "2.2"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("LINEAGE_BQ_TO_BQ")
.dataprocConfig(session)
.getOrCreate()
)
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination = sample.destination
word_count.write.format('bigquery') \
.option('table', destination) \
.save()
Pour afficher le graphique de la lignée des données, cliquez sur le nom de la table de destination indiqué dans le volet de navigation de la page Explorer de BigQuery, puis sélectionnez l'onglet "Lignée" dans le volet d'informations sur la table.
Afficher la traçabilité dans Dataplex
Un graphique de traçabilité affiche les relations entre les ressources de votre projet et les processus qui les ont créées. Vous pouvez afficher des informations sur la lignée des données dans la console Google Cloud ou récupérer les informations de l'API Data Lineage sous forme de données JSON.
Étape suivante
- En savoir plus sur la traçabilité des données