En esta página, se muestra cómo crear un clúster de Dataproc que usa el conector de Spark Spanner para leer datos de Spanner con Apache Spark.
El conector de Spanner funciona con Spark para leer datos de la base de datos de Spanner con la biblioteca de Java de Spanner. El conector de Spanner admite la lectura de tablas y gráficos de Spanner en DataFrames y GraphFrames de Spark.
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Antes de usar el conector de Spanner en este instructivo, configura un clúster de Dataproc y una instancia y una base de datos de Spanner.
Configura un clúster de Dataproc
Crea un clúster de Dataproc o usa uno existente que tenga los siguientes parámetros de configuración:
Permisos de la cuenta de servicio de la VM A la cuenta de servicio de la VM del clúster se le deben asignar los permisos de Spanner adecuados. Si usas Data Boost (Data Boost está habilitado en el código de ejemplo de Export Spanner tables), la cuenta de servicio de la VM también debe tener los permisos de IAM de Data Boost necesarios.
Es el permiso de acceso. El clúster debe crearse con el alcance
cloud-platform
o el alcancespanner
adecuado habilitado. El alcance decloud-platform
está habilitado de forma predeterminada para los clústeres creados con la versión de imagen 2.1 o posterior.En las siguientes instrucciones, se muestra cómo establecer el alcance de
cloud-platform
como parte de una solicitud de creación de clúster que usa la consola de Google Cloud , gcloud CLI o la API de Dataproc. Para obtener instrucciones adicionales sobre la creación de clústeres, consulta Crea un clúster.Consola deGoogle Cloud
- En la Google Cloud consola, abre la página Crear un clúster de Dataproc.
- En el panel Administrar seguridad de la sección Acceso al proyecto, haz clic en "Habilita el alcance de la plataforma de nube para este clúster".
- Completa o confirma los demás campos de creación del clúster y, luego, haz clic en Crear.
gcloud CLI
Puedes ejecutar el siguiente comando
gcloud dataproc clusters create
para crear un clúster con el alcancecloud-platform
habilitado.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configura una instancia de Spanner con una tabla de base de datos de Singers
Crea una instancia de Spanner con una base de datos que contenga una tabla Singers
. Anota el ID de la instancia y el ID de la base de datos de Spanner.
Usa el conector de Spanner con Spark
El conector de Spanner está disponible para las versiones 3.1+
de Spark.
Especificas la versión del conector como parte de la especificación del archivo JAR del conector de Cloud Storage cuando envías un trabajo a un clúster de Dataproc.
Ejemplo: Envío de un trabajo de Spark con gcloud CLI y el conector de Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Reemplaza lo siguiente:
CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GoogleCloudDataproc/spark-spanner-connector
de GitHub.
Leer tablas de Spanner
Puedes usar Python o Scala para leer datos de tablas de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.
PySpark
Puedes ejecutar el código de ejemplo de PySpark en esta sección en tu clúster enviando el trabajo al servicio de Dataproc o ejecutándolo desde el REPL de spark-submit
en el nodo instancia principal del clúster.
Trabajo de Dataproc
- Crea un archivo
singers.py
con un editor de texto local o en Cloud Shell con el editor de texto preinstaladovi
,vim
onano
. - Después de completar las variables de marcador de posición, pega el siguiente código en el archivo
singers.py
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos
Singers
.
- Guarda el archivo
singers.py
. - Envía el trabajo
al servicio de Dataproc con la consola de Google Cloud , gcloud CLI o
la API de Dataproc.
Ejemplo: Envío de un trabajo de gcloud CLI con el conector de Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Reemplaza lo siguiente:
- CLUSTER_NAME el nombre del clúster nuevo.
- REGION: Es una región de Compute Engine disponible para ejecutar la carga de trabajo.
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Trabajo de spark-submit
- Conéctate al nodo instancia principal del clúster de Dataproc con SSH.
- Ve a la página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster.Se abrirá una ventana del navegador en tu directorio principal del nodo.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.py
en el nodo principal con el editor de textovi
,vim
onano
instalado previamente.- Pega el siguiente código en el archivo
singers.py
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos
Singers
.
- Guarda el archivo
singers.py
.
- Pega el siguiente código en el archivo
- Ejecuta
singers.py
conspark-submit
para crear la tablaSingers
de Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Reemplaza lo siguiente:
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
.
El resultado es el siguiente:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
Scala
Para ejecutar el código de ejemplo de Scala en tu clúster, completa los siguientes pasos:
- Conéctate al nodo instancia principal del clúster de Dataproc con SSH.
- Ve a la página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster.Se abrirá una ventana del navegador en tu directorio principal del nodo.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.scala
en el nodo principal con el editor de textovi
,vim
onano
instalado previamente.- Pega el siguiente código en el archivo
singers.scala
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Cómo configurar una instancia de Spanner con la tabla de base de datos
Singers
.
- Guarda el archivo
singers.scala
.
- Pega el siguiente código en el archivo
- Inicia el REPL de
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Reemplaza lo siguiente:
CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de
GoogleCloudDataproc/spark-spanner-connector
de GitHub. - Ejecuta
singers.scala
con el comando:load singers.scala
para crear la tablaSingers
de Spanner. La lista de salida muestra ejemplos del resultado de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Leer gráficos de Spanner
El conector de Spanner admite la exportación del grafo a DataFrames de nodos y aristas separados, así como la exportación directamente a GraphFrames
.
En el siguiente ejemplo, se exporta un Spanner a un GraphFrame
.
Utiliza la clase SpannerGraphConnector
de Python, incluida en el JAR del conector de Spanner, para leer el gráfico de Spanner.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Reemplaza lo siguiente:
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
. - PROJECT_ID: El ID de tu proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto del panel de la consola Google Cloud .
- INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y el gráfico.
Para exportar nodos y bordes DataFrames
en lugar de GraphFrames, usa load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Limpia
Para evitar que se apliquen cargos continuos a tu cuenta de Google Cloud , puedes detener o borrar tu clúster de Dataproc y borrar tu instancia de Spanner.
¿Qué sigue?
- Consulta los ejemplos de
pyspark.sql.DataFrame
. - Para obtener información sobre la compatibilidad de idiomas de Spark DataFrame, consulta lo siguiente:
- Consulta el repositorio de Spark Spanner Connector en GitHub.
- Consulta las sugerencias de ajuste para trabajos de Spark.