Utiliser le connecteur Spanner avec Spark

Cette page explique comment utiliser le connecteur Spark Spanner pour lire les données de Spanner à l'aide d'Apache Spark

Calculer les coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataproc
  • Spanner
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Avant d'exécuter le tutoriel, assurez-vous de connaître la version du connecteur et d'obtenir un URI de connecteur.

Spécifier l'URI du fichier JAR du connecteur

Les versions des connecteurs Spark Spanner sont répertoriées dans le dépôt GitHub GoogleCloudDataproc/spark-spanner-connector.

Spécifiez le fichier JAR du connecteur en remplaçant les informations de version du connecteur dans la chaîne d'URI suivante:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Le connecteur est disponible pour les versions de Spark 3.1+.

Exemple de gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Préparer la base de données Spanner

Si vous ne disposez pas d'une table Spanner, vous pouvez suivre le tutoriel pour en créer une. Vous disposez ensuite d'un ID d'instance, d'un ID de base de données et d'une table Singers.

Créer un cluster Dataproc

Tout cluster Dataproc utilisant le connecteur doit disposer des champs d'application spanner ou cloud-platform. Les clusters Dataproc disposent du niveau d'accès par défaut cloud-platform pour l'image 2.1 ou version ultérieure. Si vous utilisez une ancienne version, vous pouvez utiliser la console Google Cloud, Google Cloud CLI et l'API Dataproc pour créer un cluster Dataproc.

Console

  1. Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster.
  2. Dans l'onglet "Gérer la sécurité", cliquez sur "Active le champ d'application de Cloud Platform pour ce cluster" dans la section "Accès au projet".
  3. Renseignez ou confirmez les autres champs de création du cluster, puis cliquez sur "Créer".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Vous pouvez spécifier GceClusterConfig.serviceAccountScopes dans le cadre d'une requête clusters.create. Exemple :
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Vous devez vous assurer que l'autorisation Spanner correspondante est attribuée au compte de service de la VM Dataproc. Si vous utilisez Data Boost dans le tutoriel, reportez-vous à la section sur l'autorisation IAM Data Boost.

Lire des données de Spanner

Vous pouvez utiliser Scala et Python pour lire les données de Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.

Scala

  1. Examinez le code et remplacez les espaces réservés [projectId], [instanceId], [databaseId] et [table] par l'ID de projet, l'ID d'instance, l'ID de base de données et la table que vous avez créée précédemment. L'option enableDataBoost active la fonctionnalité Spanner Data Boost, qui a un impact proche de zéro sur l'instance Spanner principale.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Exécuter le code sur votre cluster
    1. Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
      1. Accédez à la page Clusters Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster
        Page des clusters Dataproc dans la console Cloud.
      2. Sur la page >Détails du cluster, sélectionnez l'onglet "Instances de VM". Ensuite, cliquez sur SSH à droite du nom du nœud maître du cluster
        Page d'informations du cluster Dataproc dans la console Cloud

        Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maître.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Créez singers.scala avec l'éditeur de texte vi, vim ou nano préinstallé, puis collez le code Scala dans la liste de codes Scala.
      nano singers.scala
        
    3. Lancez la REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Exécutez singers.scala avec la commande :load singers.scala pour créer la table Spanner Singers. La liste des résultats affiche des exemples issus de la sortie de la table "Singers" (Chanteurs).
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examinez le code et remplacez les espaces réservés [projectId], [instanceId], [databaseId] et [table] par l'ID de projet, l'ID d'instance, l'ID de base de données et la table que vous avez créée précédemment. L'option enableDataBoost active la fonctionnalité Spanner Data Boost, qui a un impact proche de zéro sur l'instance Spanner principale.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Exécutez le code sur votre cluster
    1. Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
      1. Accédez à la page Clusters Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster
        Page "Clusters" de la console Cloud.
      2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Ensuite, cliquez sur SSH à droite du nom du nœud maître du cluster
        Sélectionnez "SSH sur la ligne du nom du cluster" sur la page "Détails du cluster" de la console Cloud.

        Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud principal.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Créez singers.py avec l'éditeur de texte vi, vim ou nano préinstallé, puis collez le code PySpark à partir de la liste de codes PySpark.
      nano singers.py
      
    3. Exécutez singers.py avec spark-submit pour créer la table Spanner Singers.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      Voici le résultat :
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Nettoyage

Pour effectuer un nettoyage et éviter que des frais récurrents ne soient facturés sur votre compte Google Cloud pour les ressources créées dans ce tutoriel, procédez comme suit :

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Pour en savoir plus