Utiliser le connecteur Spanner avec Spark

Cette page explique comment créer un cluster Dataproc qui utilise le connecteur Spark Spanner pour lire des données à partir de Spanner à l'aide d'Apache Spark.

Calculer les coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataproc
  • Spanner
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Avant d'utiliser le connecteur Spanner dans ce tutoriel, configurez un cluster Dataproc, une instance et une base de données Spanner.

Configurer un cluster Dataproc

Créez un cluster Dataproc ou utilisez un cluster Dataproc existant avec les paramètres suivants:

Configurer une instance Spanner avec une table de base de données Singers

Créez une instance Spanner avec une base de données contenant une table Singers. Notez l'ID de l'instance Spanner et l'ID de la base de données.

Utiliser le connecteur Spanner avec Spark

Le connecteur Spanner est disponible pour les versions Spark 3.1+. Vous spécifiez la version du connecteur dans la spécification du fichier JAR du connecteur Cloud Storage lorsque vous envoyez une tâche à un cluster Dataproc.

Exemple:Envoi d'une tâche Spark avec la CLI gcloud avec le connecteur Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Remplacez les éléments suivants :

CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Lire des données depuis Spanner

Vous pouvez utiliser Python ou Scala pour lire les données de Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.

Vous pouvez exécuter l'exemple de code PySpark de cette section sur votre cluster en envoyant la tâche au service Dataproc ou en l'exécutant à partir de l'REPL spark-submit sur le nœud maître du cluster.

  1. Créez un fichier singers.py à l'aide d'un éditeur de texte local ou dans Cloud Shell à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  2. Envoyez la tâche au service Dataproc à l'aide de la console Google Cloud, de gcloud CLI ou de l'API Dataproc.

    Exemple:Envoi d'une tâche avec la CLI gcloud avec le connecteur Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Remplacez les éléments suivants :

    1. CLUSTER_NAME : nom du nouveau cluster.
    2. REGION: région Compute Engine disponible pour exécuter la charge de travail.
    3. CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.
  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster.
      Page "Détails du cluster Dataproc" dans Cloud Console.

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.py sur le nœud maître à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  3. Exécutez singers.py avec spark-submit pour créer la table Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Remplacez les éléments suivants :

    1. CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

    Voici le résultat :

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Pour exécuter l'exemple de code Scala sur votre cluster, procédez comme suit:

  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster.
      Page "Détails du cluster Dataproc" dans Cloud Console.

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.scala sur le nœud maître à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.scala. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.scala.
  3. Lancez le REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Remplacez les éléments suivants :

    CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Exécutez singers.scala avec la commande :load singers.scala pour créer la table Spanner Singers. La liste de sortie affiche des exemples de la sortie de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Nettoyage

Pour éviter que des frais ne continuent d'être facturés sur votre compte Google Cloud , vous pouvez arrêter ou supprimer votre cluster Dataproc, puis supprimer votre instance Spanner.

Pour en savoir plus