En esta página, se muestra cómo usar el Conector de Spanner de Spark para leer datos de Spanner con Apache Spark.
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Antes de ejecutar el instructivo, asegúrate de conocer la versión del conector y obtener un URI del conector.
Cómo especificar el URI del archivo JAR del conector
Las versiones del conector de Spark Spanner se enumeran en GitHub Repositorio de GoogleCloud Dataproc/spark-spanner-connector.
Especifica el archivo JAR del conector reemplazando la versión del conector
información en la siguiente cadena de URI:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
El conector está disponible para las versiones 3.1+
de Spark
Ejemplo de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Prepara la base de datos de Spanner
Si no tienes una tabla de Spanner, puedes seguir el instructivo para crear una. Después de eso, tendrás un ID de instancia,
un ID de base de datos y una tabla Singers
.
Crea un clúster de Dataproc
Cualquier clúster de Dataproc que use el conector necesita los permisos spanner
o cloud-platform
. Los clústeres de Dataproc tienen el alcance predeterminado cloud-platform
para la imagen 2.1 o posterior. Si usas una versión anterior, puedes usar la consola de Google Cloud, Google Cloud CLI y la API de Dataproc para crear un clúster de Dataproc.
Console
- En la consola de Google Cloud, abre Dataproc Página Crea un clúster
- En la pestaña "Administrar seguridad", haz clic en "Habilita el alcance de la plataforma de nube para este clúster" en la sección "Acceso al proyecto".
- Completa o confirma los otros campos de creación de clústeres y, luego, haz clic en "Crear".
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create. Por ejemplo:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Deberás asegurarte de que el permiso de Spanner correspondiente esté asignado a la cuenta de servicio de la VM de Dataproc. Si usas Data Boost en el instructivo, consulta Permiso de IAM de Data Boost.
Lee datos de Spanner
Puedes usar Scala y Python para leer datos de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.
Scala
- Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Ejecuta el código de tu clúster
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
- Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
- En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.scala
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.nano singers.scala
- Inicia el REPL de
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Ejecuta singers.scala con el comando
:load singers.scala
para crear la tablaSingers
de Spanner. La lista de salida muestra ejemplos del resultado de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Examina el código y reemplaza el marcador de posición [projectId], [instanceId], [databaseId] y [table] por
el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste anteriormente. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Ejecuta el código en tu clúster
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
- Ve a la sección Clústeres de Dataproc de la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.py
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.nano singers.py
- Ejecuta cantantes.py con
spark-submit
para crear el Spanner TablaSingers
. El resultado es el siguiente:spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
Limpieza
Sigue estos pasos para realizar una limpieza y evitar que se apliquen cargos continuos a tu cuenta de Google Cloud por los recursos creados en esta explicación.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME