Utiliser le connecteur Spark Spanner

Cette page explique comment créer un cluster Dataproc qui utilise le connecteur Spark Spanner pour lire des données depuis Spanner à l'aide d'Apache Spark.

Le connecteur Spanner fonctionne avec Spark pour lire les données de la base de données Spanner à l'aide de la bibliothèque Java Spanner. Le connecteur Spanner permet de lire les tables et les graphiques Spanner dans les DataFrames et les GraphFrames Spark.

Coûts

Dans ce document, vous utilisez les composants facturables de Google Cloudsuivants :

  • Dataproc
  • Spanner
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.

Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Avant d'utiliser le connecteur Spanner dans ce tutoriel, configurez un cluster Dataproc ainsi qu'une instance et une base de données Spanner.

Configurer un cluster Dataproc

Créez un cluster Dataproc ou utilisez un cluster Dataproc existant avec les paramètres suivants :

Configurer une instance Spanner avec une table de base de données "Singers"

Créez une instance Spanner avec une base de données contenant une table Singers. Notez l'ID de l'instance Spanner et l'ID de la base de données.

Utiliser le connecteur Spanner avec Spark

Le connecteur Spanner est disponible pour les versions Spark 3.1+. Vous spécifiez la version du connecteur dans la spécification du fichier JAR du connecteur Cloud Storage lorsque vous envoyez un job à un cluster Dataproc.

Exemple : Envoi d'un job Spark gcloud CLI avec le connecteur Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Remplacez les éléments suivants :

CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Lire des tables Spanner

Vous pouvez utiliser Python ou Scala pour lire les données de table Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.

PySpark

Vous pouvez exécuter l'exemple de code PySpark de cette section sur votre cluster en envoyant le job au service Dataproc ou en exécutant le job à partir du REPL spark-submit sur le nœud maître du cluster.

Job Dataproc

  1. Créez un fichier singers.py à l'aide d'un éditeur de texte local ou dans Cloud Shell à l'aide de l'éditeur de texte préinstallé vi, vim ou nano.
    1. Après avoir renseigné les variables d'espace réservé, collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Remplacez les éléments suivants :

      1. PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  2. Envoyez le job au service Dataproc à l'aide de la console Google Cloud , de gcloud CLI ou de l'API Dataproc.

    Exemple : Envoi d'un job gcloud CLI avec le connecteur Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Remplacez les éléments suivants :

    1. CLUSTER_NAME : nom du nouveau cluster.
    2. REGION : Région Compute Engine disponible pour exécuter la charge de travail.
    3. CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Tâche spark-submit

  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters de Dataproc dans la console Google Cloud , puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster.
      Capture d'écran de la page "Détails du cluster Dataproc" dans la console Google Cloud , montrant le bouton SSH utilisé pour se connecter au nœud maître du cluster.

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.py sur le nœud maître à l'aide de l'éditeur de texte préinstallé vi, vim ou nano.
    1. Collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Remplacez les éléments suivants :

      1. PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  3. Exécutez singers.py avec spark-submit pour créer la table Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Remplacez les éléments suivants :

    1. CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

    Voici le résultat :

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Pour exécuter l'exemple de code Scala sur votre cluster, procédez comme suit :

  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters de Dataproc dans la console Google Cloud , puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster. Page "Détails du cluster Dataproc" dans la console Google Cloud .

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.scala sur le nœud maître à l'aide de l'éditeur de texte préinstallé vi, vim ou nano.
    1. Collez le code suivant dans le fichier singers.scala. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Remplacez les éléments suivants :

      1. PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.scala.
  3. Lancez le REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Remplacez les éléments suivants :

    CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Exécutez singers.scala avec la commande :load singers.scala pour créer la table Spanner Singers. La liste de sortie affiche des exemples de la sortie "Singers".
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Lire les graphiques Spanner

Le connecteur Spanner permet d'exporter le graphique dans des DataFrames de nœuds et d'arêtes distincts, ainsi que directement dans GraphFrames.

L'exemple suivant exporte une base de données Spanner dans un GraphFrame. Il utilise la classe Python SpannerGraphConnector, incluse dans le fichier JAR du connecteur Spanner, pour lire Spanner Graph.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Remplacez les éléments suivants :

  • CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
  • INSTANCE_ID, DATABASE_ID et TABLE_NAME : insérez les ID de l'instance, de la base de données et du graphique.

Pour exporter les DataFrames de nœuds et d'arêtes au lieu de GraphFrames, utilisez plutôt load_dfs :

df_vertices, df_edges, df_id_map = connector.load_dfs()

Effectuer un nettoyage

Pour éviter que des frais ne continuent d'être facturés sur votre compte Google Cloud , vous pouvez arrêter ou supprimer votre cluster Dataproc, et supprimer votre instance Spanner.

Étapes suivantes