Utiliser le connecteur Spanner avec Spark

Cette page explique comment créer un cluster Dataproc qui utilise le connecteur Spark Spanner pour lire des données à partir de Spanner à l'aide d'Apache Spark.

Calculer les coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataproc
  • Spanner
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Avant d'utiliser le connecteur Spanner dans ce tutoriel, configurez un cluster Dataproc, une instance et une base de données Spanner.

Configurer un cluster Dataproc

Créez un cluster Dataproc ou utilisez un cluster Dataproc existant avec les paramètres suivants:

  • Autorisations du compte de service de VM. Le compte de service de VM du cluster doit être associé aux autorisations Spanner appropriées. Si vous utilisez Data Boost (Data Boost est activé dans l'exemple de code de la section Lire des données à partir de Spanner), le compte de service de la VM doit également disposer des autorisations IAM Data Boost requises.

  • Niveau d'accès. Le cluster doit être créé avec le champ d'application cloud-platform ou le champ d'application spanner approprié activé. Le champ d'application cloud-platform est activé par défaut pour les clusters créés avec la version d'image 2.1 ou une version ultérieure.

    Les instructions suivantes vous expliquent comment définir l'étendue cloud-platform dans le cadre d'une requête de création de cluster qui utilise la console Google Cloud, gcloud CLI ou l'API Dataproc. Pour obtenir des instructions supplémentaires sur la création de clusters, consultez la page Créer un cluster.

    Console Google Cloud

    1. Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster.
    2. Dans le panneau Gérer la sécurité, dans la section Accès au projet, cliquez sur "Active le champ d'application Cloud Platform pour ce cluster".
    3. Renseignez ou confirmez les autres champs de création de cluster, puis cliquez sur Créer.

    CLI gcloud

    Vous pouvez exécuter la commande gcloud dataproc clusters create suivante pour créer un cluster avec la portée cloud-platform activée.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Vous pouvez spécifier GceClusterConfig.serviceAccountScopes dans le cadre d'une requête clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Configurer une instance Spanner avec une table de base de données Singers

Créez une instance Spanner avec une base de données contenant une table Singers. Notez l'ID de l'instance Spanner et l'ID de la base de données.

Utiliser le connecteur Spanner avec Spark

Le connecteur Spanner est disponible pour les versions Spark 3.1+. Vous spécifiez la version du connecteur dans la spécification du fichier JAR du connecteur Cloud Storage lorsque vous envoyez une tâche à un cluster Dataproc.

Exemple:Envoi d'une tâche Spark avec la CLI gcloud avec le connecteur Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Remplacez les éléments suivants :

CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Lire des données depuis Spanner

Vous pouvez utiliser Python ou Scala pour lire les données de Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.

PySpark

Vous pouvez exécuter l'exemple de code PySpark de cette section sur votre cluster en envoyant la tâche au service Dataproc ou en l'exécutant à partir de l'REPL spark-submit sur le nœud maître du cluster.

Tâche Dataproc

  1. Créez un fichier singers.py à l'aide d'un éditeur de texte local ou dans Cloud Shell à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  2. Envoyez la tâche au service Dataproc à l'aide de la console Google Cloud, de gcloud CLI ou de l'API Dataproc.

    Exemple:Envoi d'une tâche avec la CLI gcloud avec le connecteur Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Remplacez les éléments suivants :

    1. CLUSTER_NAME : nom du nouveau cluster.
    2. REGION: région Compute Engine disponible pour exécuter la charge de travail.
    3. CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Job spark-submit

  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster.
      Page "Détails du cluster Dataproc" dans Cloud Console.

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.py sur le nœud maître à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.py. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.py.
  3. Exécutez singers.py avec spark-submit pour créer la table Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Remplacez les éléments suivants :

    1. CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

    Voici le résultat :

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Pour exécuter l'exemple de code Scala sur votre cluster, procédez comme suit:

  1. Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
    1. Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
    2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur SSH à droite du nom du nœud maître du cluster.
      Page "Détails du cluster Dataproc" dans Cloud Console.

      Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Créez un fichier singers.scala sur le nœud maître à l'aide de l'éditeur de texte vi, vim ou nano préinstallé.
    1. Collez le code suivant dans le fichier singers.scala. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Remplacez les éléments suivants :

      1. PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
      2. INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données Singers.
    2. Enregistrez le fichier singers.scala.
  3. Lancez le REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Remplacez les éléments suivants :

    CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Exécutez singers.scala avec la commande :load singers.scala pour créer la table Spanner Singers. La liste de sortie affiche des exemples de la sortie de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Nettoyage

Pour éviter que des frais ne continuent d'être facturés sur votre compte Google Cloud , vous pouvez arrêter ou supprimer votre cluster Dataproc, puis supprimer votre instance Spanner.

Pour en savoir plus