Usar el conector de Spark Spanner

En esta página se explica cómo crear un clúster de Dataproc que utilice el conector de Spark Spanner para leer datos de Spanner con Apache Spark.

El conector de Spanner funciona con Spark para leer datos de la base de datos de Spanner mediante la biblioteca Java de Spanner. El conector de Spanner permite leer tablas y gráficos de Spanner en DataFrames y GraphFrames de Spark.

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Antes de empezar

Antes de usar el conector de Spanner en este tutorial, configura un clúster de Dataproc y una instancia y una base de datos de Spanner.

Configurar un clúster de Dataproc

Crea un clúster de Dataproc o usa uno que ya tengas y que tenga los siguientes ajustes:

Configurar una instancia de Spanner con una tabla de base de datos Singers

Crea una instancia de Spanner con una base de datos que contenga una tabla Singers. Anota el ID de instancia y el ID de base de datos de Spanner.

Usar el conector de Spanner con Spark

El conector de Spanner está disponible para las versiones de Spark 3.1+. Especifica la versión del conector como parte de la especificación del archivo JAR del conector de Cloud Storage cuando envías una tarea a un clúster de Dataproc.

Ejemplo: envío de un trabajo de Spark de la CLI de gcloud con el conector de Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Haz los cambios siguientes:

CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

Leer tablas de Spanner

Puedes usar Python o Scala para leer datos de tablas de Spanner en un DataFrame de Spark mediante la API de fuente de datos de Spark.

PySpark

Puedes ejecutar el código de ejemplo de PySpark de esta sección en tu clúster enviando el trabajo al servicio Dataproc o ejecutándolo desde el REPL de spark-submit en el nodo maestro del clúster.

Tarea de Dataproc

  1. Crea un archivo singers.py con un editor de texto local o en Cloud Shell con el editor de texto vi, vim o nano preinstalado.
    1. Después de rellenar las variables de marcador de posición, pega el siguiente código en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Haz los cambios siguientes:

      1. PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos Singers.
    2. Guarda el archivo singers.py.
  2. Envía el trabajo al servicio de Dataproc mediante la Google Cloud consola, la CLI de gcloud o la API de Dataproc.

    Ejemplo: envío de un trabajo de la CLI de gcloud con el conector de Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Haz los cambios siguientes:

    1. CLUSTER_NAME: el nombre del nuevo clúster.
    2. REGION: una región de Compute Engine disponible para ejecutar la carga de trabajo.
    3. CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

Tarea spark-submit

  1. Conéctate al nodo maestro del clúster de Dataproc mediante SSH.
    1. Ve a la página de Dataproc Clusters (Clústeres) en la Google Cloud consola y, a continuación, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. A continuación, haga clic en SSH a la derecha del nombre del nodo maestro del clúster.
      Captura de pantalla de la página de detalles del clúster de Dataproc en la consola Google Cloud , que muestra el botón SSH que se usa para conectarse al nodo maestro del clúster.

      Se abrirá una ventana del navegador en el directorio principal del nodo maestro.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.py en el nodo maestro con el editor de texto vi, vim o nano preinstalado.
    1. Pegue el siguiente código en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Haz los cambios siguientes:

      1. PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos Singers.
    2. Guarda el archivo singers.py.
  3. Ejecuta singers.py con spark-submit para crear la tabla Singers de Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Haz los cambios siguientes:

    1. CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

    El resultado es el siguiente:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para ejecutar el código de Scala de ejemplo en tu clúster, sigue estos pasos:

  1. Conéctate al nodo maestro del clúster de Dataproc mediante SSH.
    1. Ve a la página de Dataproc Clusters (Clústeres) en la Google Cloud consola y, a continuación, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. A continuación, haga clic en SSH a la derecha del nombre del nodo maestro del clúster. Página de detalles del clúster de Dataproc en la consola Google Cloud .

      Se abrirá una ventana del navegador en el directorio principal del nodo maestro.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.scala en el nodo maestro con el editor de texto vi, vim o nano preinstalado.
    1. Pegue el siguiente código en el archivo singers.scala. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Haz los cambios siguientes:

      1. PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos Singers.
    2. Guarda el archivo singers.scala.
  3. Inicia el spark-shell REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Haz los cambios siguientes:

    CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Ejecuta singers.scala con el comando :load singers.scala para crear la tabla Singers de Spanner. En la lista de resultados se muestran ejemplos de los resultados de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Leer gráficos de Spanner

El conector de Spanner permite exportar el gráfico a DataFrames de nodos y aristas independientes, así como exportarlo directamente a GraphFrames.

En el siguiente ejemplo se exporta un Spanner a un GraphFrame. Usa la clase SpannerGraphConnector de Python, incluida en el archivo JAR del conector de Spanner, para leer el gráfico de Spanner.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Haz los cambios siguientes:

  • CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del panel de control de la consola Google Cloud .
  • INSTANCE_ID, DATABASE_ID y TABLE_NAME Insert los IDs de instancia, base de datos y gráfico.

Para exportar nodos y aristas DataFrames en lugar de GraphFrames, usa load_dfs:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Limpieza

Para evitar que se te apliquen cargos continuos en tu cuenta de Google Cloud , puedes detener o eliminar tu clúster de Dataproc y eliminar tu instancia de Spanner.

Siguientes pasos