En esta página, se muestra cómo usar el conector de Spark para Spanner a fin de leer datos de Spanner con Apache Spark
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Antes de ejecutar el instructivo, asegúrate de conocer la versión del conector y obtener un URI de conector.
Cómo especificar el URI del archivo JAR del conector
Las versiones del conector de Spark Spanner se enumeran en el repositorio de GoogleCloud Dataproc/spark-spanner-connector de GitHub.
Para especificar el archivo JAR del conector, sustituye la información de la versión del conector en la siguiente string de URI:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
El conector está disponible para las versiones 3.1+
de Spark.
Ejemplo de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Prepara la base de datos de Spanner
Si no tienes una tabla de Spanner, puedes seguir el instructivo para crearla. Luego, tendrás un ID de instancia, un ID de base de datos y una tabla Singers
.
Crea un clúster de Dataproc
Cualquier clúster de Dataproc que use el conector necesita los permisos spanner
o cloud-platform
. Los clústeres de Dataproc tienen un permiso predeterminado de cloud-platform
para la imagen 2.1 o superior. Si usas una versión anterior, puedes usar la consola de Google Cloud, Google Cloud CLI y la API de Dataproc para crear un clúster de Dataproc.
Console
- En la consola de Google Cloud, abre la página Crear un clúster de Dataproc.
- En la pestaña “Administrar seguridad”, haz clic en “Habilita el permiso cloud-platform para este clúster” en la sección “Acceso al proyecto”.
- Completa o confirma los otros campos de creación del clúster y, luego, haz clic en “Crear”.
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create. Por ejemplo:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Deberás asegurarte de que el permiso de Spanner correspondiente esté asignado a la cuenta de servicio de la VM de Dataproc. Si usas Data Boost en el instructivo, consulta el permiso de IAM de Data Boost.
Lee datos de Spanner
Puedes usar Scala y Python para leer datos de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.
Scala
- Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene
un impacto casi nulo en la instancia principal de Spanner.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Ejecuta el código de tu clúster
- Usa SSH para conectarte al nodo de la instancia principal del clúster de Dataproc
- Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
- En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo de la instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal en el nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.scala
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de Scala de la lista de código de Scala.nano singers.scala
- Inicia el REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Ejecuta cantantes.scala con el comando
:load singers.scala
para crear la tablaSingers
de Spanner. En la lista de salida, se muestran ejemplos de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene
un impacto casi nulo en la instancia principal de Spanner.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Ejecuta el código en tu clúster
- Usa SSH para conectarte al nodo de la instancia principal del clúster de Dataproc
- Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal, en el nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.py
con el editor de texto preinstalado devi
,vim
onano
y, luego, pega el código de PySpark de la lista de códigos de PySpark.nano singers.py
- Ejecuta cantantes.py con
spark-submit
para crear la tablaSingers
de Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
El resultado es el siguiente:... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Usa SSH para conectarte al nodo de la instancia principal del clúster de Dataproc
Limpieza
Sigue estos pasos para realizar una limpieza y evitar que se apliquen cargos continuos a tu cuenta de Google Cloud por los recursos creados en esta explicación.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME