Utilizzare il connettore Spanner con Spark

Questa pagina mostra come utilizzare il connettore Spark Spanner per leggere i dati da Spanner utilizzando Apache Spark

Calcolo dei costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

Prima di eseguire il tutorial, assicurati di conoscere la versione del connettore e di avere un URI del connettore.

Come specificare l'URI del file JAR del connettore

Le versioni del connettore Spark Spanner sono elencate nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Specifica il file JAR del connettore sostituendo le informazioni sulla versione del connettore nella seguente stringa URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Il connettore è disponibile per le versioni di Spark 3.1+

Esempio della gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Prepara il database Spanner

Se non hai una tabella Spanner, puoi seguire il tutorial per crearne una. Dopodiché avrai un ID istanza, un ID database e una tabella Singers.

Crea cluster Dataproc

Qualsiasi cluster Dataproc che utilizza il connettore richiede gli ambiti spanner o cloud-platform. I cluster Dataproc hanno l'ambito predefinito cloud-platform per l'immagine 2.1 o successive. Se utilizzi una versione precedente, puoi utilizzare la console Google Cloud, Google Cloud CLI e l'API Dataproc per creare un cluster Dataproc.

Console

  1. Nella console Google Cloud, apri la pagina Dataproc Crea un cluster
  2. Nella scheda "Gestisci la sicurezza", fai clic su "Attiva l'ambito cloud-platform per questo cluster" nella sezione "Accesso al progetto".
  3. Completa la compilazione o la conferma degli altri campi per la creazione del cluster, poi fai clic su "Crea".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Puoi specificare GceClusterConfig.serviceAccountScopes nell'ambito di una richiesta clusters.create. Ad esempio:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Devi assicurarti che l'autorizzazione Spanner corrispondente sia assegnata all'account di servizio VM Dataproc. Se utilizzi Data Boost nel tutorial, consulta l'autorizzazione IAM Data Boost.

Leggere i dati da Spanner

Puoi utilizzare Scala e Python per leggere i dati da Spanner in un DataFrame Spark utilizzando l'API Spark Data Source.

Scala

  1. Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione enableDataBoost attiva la funzionalità Data Boost di Spanner, che ha un impatto quasi nullo sull'istanza Spanner principale.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Esegui il codice sul cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc.
      1. Vai alla pagina Cluster Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
        Pagina Cluster Dataproc nella console Cloud.
      2. Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Pagina dei dettagli del cluster Dataproc nella console Cloud.

        Si apre una finestra del browser nella home directory sul nodo principale
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.scala con l'editor di testo vi, vim o nano preinstallato, quindi incolla il codice Scala dall'elenco di codice Scala
      nano singers.scala
        
    3. Avvia la REPL di spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Esegui singers.scala con il comando :load singers.scala per creare la tabella Singers di Spanner. L'elenco di output mostra esempi dell'output di Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione enableDataBoost attiva la funzionalità Data Boost di Spanner, che ha un impatto quasi nullo sull'istanza Spanner principale.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Esegui il codice sul tuo cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc.
      1. Vai alla pagina Cluster Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
        Pagina Cluster nella console Cloud.
      2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Seleziona SSH nella riga del nome del cluster nella pagina Dettagli cluster della console Cloud.

        Si apre una finestra del browser nella home directory sul nodo principale
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.py con l'editor di testo vi, vim o nano preinstallato, quindi incolla il codice PySpark dalla lista di codice PySpark
      nano singers.py
      
    3. Esegui singers.py con spark-submit per creare la tabella Singers di Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      L'output è:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Esegui la pulizia

Per eseguire la pulizia ed evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse create in questa procedura dettagliata, segui questi passaggi.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Per ulteriori informazioni