Usa el conector de Spanner de Spark

En esta página, se muestra cómo crear un clúster de Dataproc que usa el conector de Spark Spanner para leer datos de Spanner con Apache Spark.

El conector de Spanner funciona con Spark para leer datos de la base de datos de Spanner con la biblioteca de Java de Spanner. El conector de Spanner admite la lectura de tablas y gráficos de Spanner en DataFrames y GraphFrames de Spark.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios de Google Cloud nuevos cumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

Antes de usar el conector de Spanner en este instructivo, configura un clúster de Dataproc y una instancia y una base de datos de Spanner.

Configura un clúster de Dataproc

Crea un clúster de Dataproc o usa uno existente que tenga los siguientes parámetros de configuración:

  • Permisos de la cuenta de servicio de la VM A la cuenta de servicio de la VM del clúster se le deben asignar los permisos de Spanner adecuados. Si usas Data Boost (Data Boost está habilitado en el código de ejemplo de Export Spanner tables), la cuenta de servicio de la VM también debe tener los permisos de IAM de Data Boost necesarios.

  • Es el permiso de acceso. El clúster debe crearse con el alcance cloud-platform o el alcance spanner adecuado habilitado. El alcance de cloud-platform está habilitado de forma predeterminada para los clústeres creados con la versión de imagen 2.1 o posterior.

    En las siguientes instrucciones, se muestra cómo establecer el alcance de cloud-platform como parte de una solicitud de creación de clúster que usa la consola de Google Cloud , gcloud CLI o la API de Dataproc. Para obtener instrucciones adicionales sobre la creación de clústeres, consulta Crea un clúster.

    Consola deGoogle Cloud

    1. En la Google Cloud consola, abre la página Crear un clúster de Dataproc.
    2. En el panel Administrar seguridad de la sección Acceso al proyecto, haz clic en "Habilita el alcance de la plataforma de nube para este clúster".
    3. Completa o confirma los demás campos de creación del clúster y, luego, haz clic en Crear.

    gcloud CLI

    Puedes ejecutar el siguiente comando gcloud dataproc clusters create para crear un clúster con el alcance cloud-platform habilitado.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Configura una instancia de Spanner con una tabla de base de datos de Singers

Crea una instancia de Spanner con una base de datos que contenga una tabla Singers. Anota el ID de la instancia y el ID de la base de datos de Spanner.

Usa el conector de Spanner con Spark

El conector de Spanner está disponible para las versiones 3.1+ de Spark. Especificas la versión del conector como parte de la especificación del archivo JAR del conector de Cloud Storage cuando envías un trabajo a un clúster de Dataproc.

Ejemplo: Envío de un trabajo de Spark con gcloud CLI y el conector de Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Reemplaza lo siguiente:

CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GoogleCloudDataproc/spark-spanner-connector de GitHub.

Leer tablas de Spanner

Puedes usar Python o Scala para leer datos de tablas de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.

PySpark

Puedes ejecutar el código de ejemplo de PySpark en esta sección en tu clúster enviando el trabajo al servicio de Dataproc o ejecutándolo desde el REPL de spark-submit en el nodo instancia principal del clúster.

Trabajo de Dataproc

  1. Crea un archivo singers.py con un editor de texto local o en Cloud Shell con el editor de texto preinstalado vi, vim o nano.
    1. Después de completar las variables de marcador de posición, pega el siguiente código en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos Singers.
    2. Guarda el archivo singers.py.
  2. Envía el trabajo al servicio de Dataproc con la consola de Google Cloud , gcloud CLI o la API de Dataproc.

    Ejemplo: Envío de un trabajo de gcloud CLI con el conector de Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Reemplaza lo siguiente:

    1. CLUSTER_NAME el nombre del clúster nuevo.
    2. REGION: Es una región de Compute Engine disponible para ejecutar la carga de trabajo.
    3. CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

Trabajo de spark-submit

  1. Conéctate al nodo instancia principal del clúster de Dataproc con SSH.
    1. Ve a la página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo instancia principal del clúster.
      Captura de pantalla de la página de detalles del clúster de Dataproc en la consola Google Cloud , en la que se muestra el botón SSH que se usa para conectarse al nodo instancia principal del clúster.

      Se abrirá una ventana del navegador en tu directorio principal del nodo.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.py en el nodo principal con el editor de texto vi, vim o nano instalado previamente.
    1. Pega el siguiente código en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos Singers.
    2. Guarda el archivo singers.py.
  3. Ejecuta singers.py con spark-submit para crear la tabla Singers de Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Reemplaza lo siguiente:

    1. CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

    El resultado es el siguiente:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para ejecutar el código de ejemplo de Scala en tu clúster, completa los siguientes pasos:

  1. Conéctate al nodo instancia principal del clúster de Dataproc con SSH.
    1. Ve a la página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo instancia principal del clúster. Página de detalles del clúster de Dataproc en la consola de Google Cloud .

      Se abrirá una ventana del navegador en tu directorio principal del nodo.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.scala en el nodo principal con el editor de texto vi, vim o nano instalado previamente.
    1. Pega el siguiente código en el archivo singers.scala. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos Singers.
    2. Guarda el archivo singers.scala.
  3. Inicia el REPL de spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Reemplaza lo siguiente:

    CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GoogleCloudDataproc/spark-spanner-connector de GitHub.

  4. Ejecuta singers.scala con el comando :load singers.scala para crear la tabla Singers de Spanner. La lista de salida muestra ejemplos del resultado de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Leer gráficos de Spanner

El conector de Spanner admite la exportación del grafo a DataFrames de nodos y aristas separados, así como la exportación directamente a GraphFrames.

En el siguiente ejemplo, se exporta un Spanner a un GraphFrame. Utiliza la clase SpannerGraphConnectorde Python, incluida en el JAR del conector de Spanner, para leer el gráfico de Spanner.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Reemplaza lo siguiente:

  • CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto del panel de la consola Google Cloud .
  • INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y el gráfico.

Para exportar nodos y bordes DataFrames en lugar de GraphFrames, usa load_dfs:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Limpia

Para evitar que se apliquen cargos continuos a tu cuenta de Google Cloud , puedes detener o borrar tu clúster de Dataproc y borrar tu instancia de Spanner.

¿Qué sigue?