Usa el conector de Spanner con Spark

En esta página, se muestra cómo usar el Conector de Spanner de Spark para leer datos de Spanner con Apache Spark.

Calcula los costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Antes de ejecutar el instructivo, asegúrate de conocer la versión del conector y obtener un URI del conector.

Cómo especificar el URI del archivo JAR del conector

Las versiones del conector de Spanner de Spark se enumeran en el repositorio de GoogleCloudDataproc/spark-spanner-connector de GitHub.

Para especificar el archivo JAR del conector, reemplaza la información de la versión del conector en la siguiente cadena de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

El conector está disponible para las versiones 3.1+ de Spark.

Ejemplo de gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Prepara la base de datos de Spanner

Si no tienes una tabla de Spanner, puedes seguir el instructivo para crear una. Después de eso, tendrás un ID de instancia, un ID de base de datos y una tabla Singers.

Crea un clúster de Dataproc

Cualquier clúster de Dataproc que use el conector necesita los permisos spanner o cloud-platform. Los clústeres de Dataproc tienen el alcance predeterminado cloud-platform para la imagen 2.1 o posterior. Si usas una versión anterior, puedes usar la consola de Google Cloud, Google Cloud CLI y la API de Dataproc para crear un clúster de Dataproc.

Console

  1. En la consola de Google Cloud, abre la página Create a cluster de Dataproc.
  2. En la pestaña "Administrar seguridad", haz clic en "Habilita el alcance de la plataforma de nube para este clúster" en la sección "Acceso al proyecto".
  3. Completa o confirma los otros campos de creación del clúster y, luego, haz clic en “Crear”.

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create. Por ejemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Deberás asegurarte de que el permiso de Spanner correspondiente se asigne a la cuenta de servicio de VM de Dataproc. Si usas Data Boost en el instructivo, consulta el Permiso de IAM de Data Boost.

Lee datos de Spanner

Puedes usar Scala y Python para leer datos de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.

Scala

  1. Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Ejecuta el código de tu clúster
    1. Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página de clústeres de Dataproc en la consola de Cloud.
      2. En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo instancia principal del clúster
        Página de detalles del clúster de Dataproc en la consola de Cloud.

        Se abrirá una ventana del navegador en tu directorio principal del nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.scala con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.
      nano singers.scala
        
    3. Inicia el REPL de spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Ejecuta singers.scala con el comando :load singers.scala para crear la tabla Singers de Spanner. La lista de salida muestra ejemplos del resultado de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Ejecuta el código en tu clúster.
    1. Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página de clústeres en la consola de Cloud
      2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo instancia principal del clúster Selecciona SSH en la fila del nombre del clúster en la página Detalles del clúster de la consola de Cloud.
        Se abrirá una ventana del navegador en tu directorio principal del nodo principal.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea singers.py con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.
      nano singers.py
      
    3. Ejecuta singers.py con spark-submit para crear la tabla Singers de Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      El resultado es el siguiente:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Limpieza

Sigue estos pasos para realizar la limpieza y evitar que se apliquen cargos continuos a tu cuenta de Google Cloud por los recursos que creaste en esta explicación.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Más información