Usa el conector de Spanner con Spark

En esta página, se muestra cómo usar el conector de Spark para Spanner a fin de leer datos de Spanner con Apache Spark

Calcula los costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Antes de ejecutar el instructivo, asegúrate de conocer la versión del conector y obtener un URI de conector.

Cómo especificar el URI del archivo JAR del conector

Las versiones del conector de Spark Spanner se enumeran en el repositorio de GoogleCloud Dataproc/spark-spanner-connector de GitHub.

Para especificar el archivo JAR del conector, sustituye la información de la versión del conector en la siguiente string de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

El conector está disponible para las versiones 3.1+ de Spark.

Ejemplo de gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Prepara la base de datos de Spanner

Si no tienes una tabla de Spanner, puedes seguir el instructivo para crearla. Luego, tendrás un ID de instancia, un ID de base de datos y una tabla Singers.

Crea un clúster de Dataproc

Cualquier clúster de Dataproc que use el conector necesita los permisos spanner o cloud-platform. Los clústeres de Dataproc tienen un permiso predeterminado de cloud-platform para la imagen 2.1 o superior. Si usas una versión anterior, puedes usar la consola de Google Cloud, Google Cloud CLI y la API de Dataproc para crear un clúster de Dataproc.

Console

  1. En la consola de Google Cloud, abre la página Crear un clúster de Dataproc.
  2. En la pestaña “Administrar seguridad”, haz clic en “Habilita el permiso cloud-platform para este clúster” en la sección “Acceso al proyecto”.
  3. Completa o confirma los otros campos de creación del clúster y, luego, haz clic en “Crear”.

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create. Por ejemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Deberás asegurarte de que el permiso de Spanner correspondiente esté asignado a la cuenta de servicio de la VM de Dataproc. Si usas Data Boost en el instructivo, consulta el permiso de IAM de Data Boost.

Lee datos de Spanner

Puedes usar Scala y Python para leer datos de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.

Scala

  1. Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Ejecuta el código de tu clúster
    1. Usa SSH para conectarte al nodo de la instancia principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página de clústeres de Dataproc en la consola de Cloud.
      2. En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo de la instancia principal del clúster
        Página de detalles del clúster de Dataproc en la consola de Cloud.

        Se abrirá una ventana del navegador en tu directorio principal en el nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.scala con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de Scala de la lista de código de Scala.
      nano singers.scala
        
    3. Inicia el REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Ejecuta cantantes.scala con el comando :load singers.scala para crear la tabla Singers de Spanner. En la lista de salida, se muestran ejemplos de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Ejecuta el código en tu clúster
    1. Usa SSH para conectarte al nodo de la instancia principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página Clústeres en la consola de Cloud.
      2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo instancia principal del clúster
        Selecciona SSH en la fila del nombre del clúster en la página Detalles del clúster en la consola de Cloud.

        Se abrirá una ventana del navegador en tu directorio principal, en el nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.py con el editor de texto preinstalado de vi, vim o nano y, luego, pega el código de PySpark de la lista de códigos de PySpark.
      nano singers.py
      
    3. Ejecuta cantantes.py con spark-submit para crear la tabla Singers de Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      El resultado es el siguiente:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Limpieza

Sigue estos pasos para realizar una limpieza y evitar que se apliquen cargos continuos a tu cuenta de Google Cloud por los recursos creados en esta explicación.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Para más información