Utilizzare il connettore Spark Spanner

Questa pagina mostra come creare un cluster Dataproc che utilizza il connettore Spark Spanner per leggere i dati da Spanner utilizzando Apache Spark.

Il connettore Spanner funziona con Spark per leggere i dati dal database Spanner utilizzando la libreria Java di Spanner. Il connettore Spanner supporta la lettura di tabelle e grafi di Spanner in DataFrames e GraphFrames di Spark.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Prima di iniziare

Prima di utilizzare il connettore Spanner in questo tutorial, configura un cluster Dataproc e un'istanza e un database Spanner.

Configura un cluster Dataproc

Crea un cluster Dataproc o utilizza un cluster Dataproc esistente con le seguenti impostazioni:

Configura un'istanza Spanner con una tabella del database Singers

Crea un'istanza Spanner con un database che contiene una tabella Singers. Prendi nota dell'ID istanza e dell'ID database Spanner.

Utilizzare il connettore Spanner con Spark

Il connettore Spanner è disponibile per le versioni di Spark 3.1+. Specifichi la versione del connettore come parte della specifica del file JAR del connettore Cloud Storage quando invii un job a un cluster Dataproc.

Esempio:invio di job Spark gcloud CLI con il connettore Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Sostituisci quanto segue:

CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Leggere le tabelle Spanner

Puoi utilizzare Python o Scala per leggere i dati delle tabelle Spanner in un DataFrame Spark utilizzando l'API Spark Data Source.

PySpark

Puoi eseguire il codice PySpark di esempio in questa sezione sul tuo cluster inviando il job al servizio Dataproc o eseguendolo dal REPL spark-submit sul nodo master del cluster.

Job Dataproc

  1. Crea un file singers.py utilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Dopo aver compilato le variabili segnaposto, incolla il seguente codice nel file singers.py. Tieni presente che la funzionalità Data Boost di Spanner è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella del database Singers.
    2. Salva il file singers.py.
  2. Invia il job al servizio Dataproc utilizzando la console Google Cloud , gcloud CLI o l'API Dataproc.

    Esempio:invio di job gcloud CLI con il connettore Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Sostituisci quanto segue:

    1. CLUSTER_NAME: il nome del nuovo cluster.
    2. REGION: una regione di Compute Engine disponibile per eseguire il carico di lavoro.
    3. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Job spark-submit

  1. Connettiti al nodo master del cluster Dataproc utilizzando SSH.
    1. Vai alla pagina Cluster di Dataproc nella console Google Cloud , quindi fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster.
      Screenshot della pagina dei dettagli del cluster Dataproc nella console Google Cloud , che mostra il pulsante SSH utilizzato per connettersi al nodo master del cluster.

      Si apre una finestra del browser nella directory home del nodo master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.py sul nodo master utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.py. Tieni presente che la funzionalità Data Boost di Spanner è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella del database Singers.
    2. Salva il file singers.py.
  3. Esegui singers.py con spark-submit per creare la tabella Spanner Singers.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Sostituisci quanto segue:

    1. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

    L'output è:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Per eseguire il codice Scala di esempio sul cluster, completa i seguenti passaggi:

  1. Connettiti al nodo master del cluster Dataproc utilizzando SSH.
    1. Vai alla pagina Cluster di Dataproc nella console Google Cloud , quindi fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster. Pagina dei dettagli del cluster Dataproc nella console Google Cloud .

      Si apre una finestra del browser nella directory home del nodo master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.scala sul nodo master utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.scala. Tieni presente che la funzionalità Spanner Data Boost è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Sostituisci quanto segue:

      1. PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella del database Singers.
    2. Salva il file singers.scala.
  3. Avvia il REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Sostituisci quanto segue:

    CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Esegui singers.scala con il comando :load singers.scala per creare la tabella Spanner Singers. L'elenco dell'output mostra esempi dell'output di Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Leggere i grafici di Spanner

Il connettore Spanner supporta l'esportazione del grafico in DataFrames di nodi e archi separati, nonché l'esportazione direttamente in GraphFrames.

L'esempio seguente esporta un database Spanner in un GraphFrame. Utilizza la classe SpannerGraphConnectorPython, inclusa nel file JAR del connettore Spanner, per leggere il grafico Spanner.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Sostituisci quanto segue:

  • CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
  • INSTANCE_ID, DATABASE_ID e TABLE_NAME Inserisci l'ID istanza, database e grafico.

Per esportare nodi e archi DataFrames anziché GraphFrames, utilizza load_dfs:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Esegui la pulizia

Per evitare addebiti continui al tuo account Google Cloud , puoi interrompere o eliminare il cluster Dataproc e eliminare l'istanza Spanner.

Passaggi successivi