Utiliser la traçabilité des données avec Serverless pour Apache Spark

Ce document explique comment activer la traçabilité des données surGoogle Cloud sans serveur pour les charges de travail par lot Apache Spark et les sessions interactives au niveau du projet, de la charge de travail par lot ou de la session interactive.

Présentation

La traçabilité des données est une fonctionnalité Dataplex Universal Catalog qui vous permet de suivre la manière dont les données transitent par vos systèmes : leur origine, la cible de transmission, et les transformations qui leur sont appliquées.

Dataproc sans serveur pour les charges de travail et les sessions Apache Spark capture les événements de traçabilité et les publie dans l'API Data Lineage de Dataplex Universal Catalog.Google Cloud Serverless pour Apache Spark s'intègre à l'API Data Lineage via OpenLineage, à l'aide du plug-in OpenLineage Spark.

Vous pouvez accéder aux informations de traçabilité via Dataplex Universal Catalog, à l'aide des graphiques de traçabilité et de l'API Data Lineage. Pour en savoir plus, consultez Afficher les graphiques de traçabilité dans Dataplex Universal Catalog.

Disponibilité, fonctionnalités et limites

Le lineage de données, qui est compatible avec les sources de données BigQuery et Cloud Storage, est disponible pour les charges de travail et les sessions qui s'exécutent avec les versions du runtime Serverless pour Apache Spark 1.1, 1.2 et 2.2, avec les exceptions et limites suivantes :

  • La traçabilité des données n'est pas disponible pour les charges de travail ni les sessions SparkR ou Spark Streaming.

Avant de commencer

  1. Sur la page de sélection du projet dans la console Google Cloud , sélectionnez le projet à utiliser pour vos charges de travail ou sessions Serverless pour Apache Spark.

    Accéder au sélecteur de projet

  2. Activez l'API Data Lineage.

    Activer les API

Rôles requis

Si votre charge de travail par lot utilise le compte de service Serverless pour Apache Spark par défaut, elle dispose du rôle Dataproc Worker, qui permet le traçage des données. Aucune autre action n'est nécessaire.

Toutefois, si votre charge de travail par lot utilise un compte de service personnalisé pour activer le lineage des données, vous devez attribuer un rôle requis au compte de service personnalisé, comme expliqué dans le paragraphe suivant.

Pour obtenir les autorisations nécessaires pour utiliser le lineage de données avec Dataproc, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le compte de service personnalisé de votre charge de travail par lot :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Activer la traçabilité des données au niveau du projet

Vous pouvez activer la traçabilité des données au niveau du projet. Lorsqu'il est activé au niveau du projet, toutes les charges de travail par lot et les sessions interactives que vous exécutez dans le projet sont associées à l'activation de la lineage Spark.

Activer la traçabilité des données au niveau du projet

Pour activer le traçage des données au niveau du projet, définissez les métadonnées personnalisées suivantes pour le projet.

Clé Valeur
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Vous pouvez désactiver le lineage des données au niveau du projet en définissant les métadonnées DATAPROC_LINEAGE_ENABLED sur false.

Activer la traçabilité des données pour une charge de travail par lot Spark

Vous pouvez activer le lineage des données sur une charge de travail par lot en définissant la propriété spark.dataproc.lineage.enabled sur true lorsque vous envoyez la charge de travail.

Exemple de charge de travail par lot

Cet exemple envoie une charge de travail lineage-example.py par lot avec l'héritage Spark activé.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py lit les données d'une table BigQuery publique, puis écrit la sortie dans une nouvelle table d'un ensemble de données BigQuery existant. Il utilise un bucket Cloud Storage pour le stockage temporaire.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Effectuez les remplacements suivants :

  • REGION : sélectionnez une région pour exécuter votre charge de travail.

  • BUCKET : nom d'un bucket Cloud Storage existant dans lequel stocker les dépendances.

  • PROJECT_ID, DATASET et TABLE : insérez votre ID de projet, le nom d'un ensemble de données BigQuery existant et le nom d'une nouvelle table à créer dans l'ensemble de données (la table ne doit pas exister).

Vous pouvez afficher le graphique de traçabilité dans l'UI Dataplex Universal Catalog.

Graphique de traçabilité Spark

Activer la traçabilité des données pour une session interactive Spark

Vous pouvez activer le lineage des données dans une session Spark interactive en définissant la propriété spark.dataproc.lineage.enabled sur true lorsque vous créez la session ou le modèle de session.

Exemple de session interactive

Le code de notebook PySpark suivant configure une session interactive Serverless pour Apache Spark avec la traçabilité des données Spark activée. Il crée ensuite une session Spark Connect qui exécute une requête de décompte de mots sur un ensemble de données Shakespeare public BigQuery, puis écrit la sortie dans une nouvelle table d'un ensemble de données BigQuery existant.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Effectuez les remplacements suivants :

  • PROJECT_ID, DATASET et TABLE : insérez votre ID de projet, le nom d'un ensemble de données BigQuery existant et le nom d'une nouvelle table à créer dans l'ensemble de données (la table ne doit pas exister).

Pour afficher le graphique de traçabilité des données, cliquez sur le nom de la table de destination listé dans le volet de navigation de la page Explorateur de BigQuery, puis sélectionnez l'onglet "Traçabilité" dans le volet d'informations sur la table.

Graphique de traçabilité Spark

Afficher la traçabilité dans Dataplex Universal Catalog

Un graphique de traçabilité affiche les relations entre les ressources de votre projet et les processus qui les ont créées. Vous pouvez afficher les informations sur la lignée des données dans la console Google Cloud ou les récupérer à partir de l'API Data Lineage sous forme de données JSON.

Étapes suivantes