Utilizzare il connettore Spanner con Spark

Questa pagina mostra come utilizzare il connettore Spark Spanner per leggere i dati di Spanner utilizzando Apache Spark

Calcolo dei costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

Prima di eseguire il tutorial, accertati di conoscere la versione del connettore e di ottenere l'URI del connettore.

Come specificare l'URI del file JAR del connettore

Le versioni del connettore Spark Spanner sono elencate nel repository GoogleCloudDataproc/spark-spanner-connector.

Specifica il file JAR del connettore sostituendo le informazioni sulla versione del connettore nella seguente stringa URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Il connettore è disponibile per le versioni Spark 3.1+

Esempio della gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Preparazione del database Spanner

Se non hai una tabella Spanner, puoi seguire il tutorial per creare una tabella Spanner. Successivamente, avrai un ID istanza, un ID database e una tabella Singers.

Crea cluster Dataproc

Qualsiasi cluster Dataproc che utilizza il connettore richiede gli ambiti spanner o cloud-platform. I cluster Dataproc hanno l'ambito predefinito cloud-platform per l'immagine 2.1 o superiore. Se utilizzi una versione precedente, puoi usare la console Google Cloud, Google Cloud CLI e l'API Dataproc per creare un cluster Dataproc.

Console

  1. Nella console Google Cloud, apri la pagina di Dataproc Crea un cluster
  2. Nella scheda "Gestisci sicurezza", fai clic su "Abilita l'ambito cloud-platform per questo cluster" nella sezione "Accesso al progetto".
  3. Completa la compilazione o la conferma degli altri campi per la creazione del cluster, quindi fai clic su "Crea".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Puoi specificare GceClusterConfig.serviceAccountScopes come parte di una richiesta clusters.create. Ad esempio:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Dovrai assicurarti che l'autorizzazione Spanner corrispondente sia assegnata all'account di servizio VM Dataproc. Se utilizzi Data Boost nel tutorial, fai riferimento all'autorizzazione IAM di Data Boost.

Lettura dati da Spanner

Puoi utilizzare Scala e Python per leggere i dati da Spanner in un dataframe Spark utilizzando l'API per origine dati Spark.

Scala

  1. Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione EnableDataBoost abilita la funzionalità Data Boost di Spanner, che ha un impatto quasi pari a zero sull'istanza principale di Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Esegui il codice sul tuo cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
        Pagina Cluster Dataproc nella console Cloud.
      2. Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Pagina dei dettagli del cluster Dataproc nella console Cloud.

        Si apre una finestra del browser nella tua directory home sul nodo master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.scala con l'editor di testo vi, vim o nano preinstallato, poi incolla il codice Scala dall'elenco dei codici Scala
      nano singers.scala
        
    3. Avvia il REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Esegui singers.scala con il comando :load singers.scala per creare la tabella Singers di Spanner. L'elenco degli output mostra esempi dell'output Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione EnableDataBoost abilita la funzionalità Data Boost di Spanner, che ha un impatto quasi pari a zero sull'istanza principale di Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Esegui il codice sul tuo cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
        pagina Cluster nella console Cloud.
      2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Seleziona SSH sulla riga del nome del cluster nella pagina Dettagli cluster nella console Cloud.

        Si apre una finestra del browser nella tua directory home sul nodo principale
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.py con l'editor di testo vi, vim o nano preinstallato, poi incolla il codice PySpark dall'elenco di codici PySpark
      nano singers.py
      
    3. Esegui singers.py con spark-submit per creare la tabella Singers di Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      L'output è:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

esegui la pulizia

Per eseguire la pulizia ed evitare che al tuo account Google Cloud vengano addebitati costi per le risorse create in questa procedura dettagliata, segui questi passaggi.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Per maggiori informazioni