Utilizzare il connettore Spanner con Spark

Questa pagina mostra come creare un cluster Dataproc che utilizza il connettore Spark Spanner per leggere i dati da Spanner utilizzando Apache Spark.

Calcolare i costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Prima di iniziare

Prima di utilizzare il connettore Spanner in questo tutorial, configura un cluster Dataproc, un'istanza e un database Spanner.

Configura un cluster Dataproc

Crea un cluster Dataproc o utilizza un cluster Dataproc esistente con le seguenti impostazioni:

  • Autorizzazioni dell'account di servizio della VM. All'account di servizio VM del cluster devono essere assegnate le autorizzazioni Spanner appropriate. Se utilizzi Data Boost (Data Boost è abilitato nel codice di esempio in Leggi i dati da Spanner), l'account di servizio della VM deve disporre anche delle autorizzazioni IAM di Data Boost richieste.

  • Ambito di accesso. Il cluster deve essere creato con l'ambito cloud-platform o l'ambito spanner appropriato abilitato. L'ambito cloud-platform è attivato per impostazione predefinita per i cluster creati con la versione dell'immagine 2.1 o successiva.

    Le istruzioni riportate di seguito spiegano come impostare l'ambito cloud-platform nell'ambito di una richiesta di creazione di un cluster che utilizza la console, l'interfaccia a riga di comando gcloud o l'API Dataproc. Google Cloud Per ulteriori istruzioni sulla creazione di cluster, consulta Creare un cluster.

    Google Cloud console

    1. Nella Google Cloud console, apri la pagina Dataproc Crea un cluster.
    2. Nel pannello Gestisci la sicurezza della sezione Accesso al progetto, fai clic su Attiva l'ambito cloud-platform per questo cluster.
    3. Compila o conferma gli altri campi per la creazione del cluster, quindi fai clic su Crea.

    Interfaccia a riga di comando gcloud

    Puoi eseguire il seguente comando gcloud dataproc clusters create per creare un cluster con l'ambito cloud-platform abilitato.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Puoi specificare GceClusterConfig.serviceAccountScopes nell'ambito di una richiesta clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Configura un'istanza Spanner con una tabella di database Singers

Crea un'istanza Spanner con un database che contiene una tabella Singers. Prendi nota dell'ID istanza e dell'ID database Spanner.

Utilizzare il connettore Spanner con Spark

Il connettore Spanner è disponibile per le versioni di Spark 3.1+. Specifica la versione del connettore come parte della specifica del file JAR del connettore Cloud Storage quando invii un job a un cluster Dataproc.

Esempio: invio di un job Spark con l'interfaccia a riga di comando gcloud e il connettore Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Sostituisci quanto segue:

CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Leggere i dati da Spanner

Puoi utilizzare Python o Scala per leggere i dati da Spanner in un DataFrame Spark utilizzando l'API di origine dati Spark.

PySpark

Puoi eseguire il codice PySpark di esempio in questa sezione sul tuo cluster inviando il job al servizio Dataproc o eseguendolo dalla REPL spark-submit sul nodo principale del cluster.

Job Dataproc

  1. Crea un file singers.py utilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.py. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configurare un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.py.
  2. Invia il job al servizio Dataproc utilizzando la Google Cloud console, l'interfaccia a riga di comando gcloud o l'API Dataproc.

    Esempio: invio di un job con l'interfaccia a riga di comando gcloud con il connettore Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Sostituisci quanto segue:

    1. CLUSTER_NAME: il nome del nuovo cluster.
    2. REGION: una regione Compute Engine disponibile per eseguire il carico di lavoro.
    3. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Job spark-submit

  1. Connettiti al nodo principale del cluster Dataproc tramite SSH.
    1. Vai alla pagina Dataproc Cluster nella Google Cloud console, quindi fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo principale del cluster.
      Pagina dei dettagli del cluster Dataproc nella console Cloud.

      Viene visualizzata una finestra del browser nella home directory sul nodo principale.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.py sul nodo principale utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.py. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configurare un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.py.
  3. Esegui singers.py con spark-submit per creare la tabella Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Sostituisci quanto segue:

    1. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

    L'output è:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Per eseguire il codice Scala di esempio sul cluster:

  1. Connettiti al nodo principale del cluster Dataproc tramite SSH.
    1. Vai alla pagina Dataproc Cluster nella Google Cloud console e fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo principale del cluster.
      Pagina dei dettagli del cluster Dataproc nella console Cloud.

      Viene visualizzata una finestra del browser nella home directory sul nodo principale.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.scala sul nodo principale utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.scala. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configurare un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.scala.
  3. Avvia la REPL di spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Sostituisci quanto segue:

    CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Esegui singers.scala con il comando :load singers.scala per creare la tabella Spanner Singers. L'elenco di output mostra esempi dell'output di Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Esegui la pulizia

Per evitare addebiti continui sul tuo Google Cloud account, puoi interrompere o eliminare il tuo cluster Dataproc e eliminare l'istanza Spanner.

Per maggiori informazioni