En esta página se explica cómo crear un clúster de Dataproc que utilice el conector de Spark Spanner para leer datos de Spanner con Apache Spark.
El conector de Spanner funciona con Spark para leer datos de la base de datos de Spanner mediante la biblioteca Java de Spanner. El conector de Spanner permite leer tablas y gráficos de Spanner en DataFrames y GraphFrames de Spark.
Costes
En este documento, se utilizan los siguientes componentes facturables de Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para generar una estimación de costes basada en el uso previsto,
utiliza la calculadora de precios.
Antes de empezar
Antes de usar el conector de Spanner en este tutorial, configura un clúster de Dataproc y una instancia y una base de datos de Spanner.
Configurar un clúster de Dataproc
Crea un clúster de Dataproc o usa uno que ya tengas y que tenga los siguientes ajustes:
Permisos de la cuenta de servicio de la VM. La cuenta de servicio de la VM del clúster debe tener asignados los permisos de Spanner adecuados. Si usas Data Boost (Data Boost está habilitado en el código de ejemplo de Exportar tablas de Spanner), la cuenta de servicio de la VM también debe tener los permisos de gestión de identidades y accesos de Data Boost necesarios.
Permiso de acceso. El clúster debe crearse con el permiso
cloud-platform
o el permisospanner
adecuado habilitado. El ámbitocloud-platform
está habilitado de forma predeterminada en los clústeres creados con la versión 2.1 de la imagen o una posterior.En las siguientes instrucciones se muestra cómo definir el ámbito
cloud-platform
como parte de una solicitud de creación de clúster que utiliza la consola Google Cloud , la CLI de gcloud o la API de Dataproc. Para obtener más instrucciones sobre cómo crear clústeres, consulta el artículo Crear un clúster.Google Cloud consola
- En la Google Cloud consola, abre la página de Dataproc Crear un clúster.
- En el panel Gestionar seguridad de la sección Acceso al proyecto, haz clic en "Habilita el permiso de Cloud Platform en este clúster".
- Rellena o confirma los demás campos de creación del clúster y, a continuación, haz clic en Crear.
CLI de gcloud
Puedes ejecutar el siguiente comando
gcloud dataproc clusters create
para crear un clúster con el ámbitocloud-platform
habilitado.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configurar una instancia de Spanner con una tabla de base de datos Singers
Crea una instancia de Spanner
con una base de datos que contenga una tabla Singers
. Anota el ID de instancia y el ID de base de datos de Spanner.
Usar el conector de Spanner con Spark
El conector de Spanner está disponible para las versiones de Spark 3.1+
.
Especifica la versión del conector como parte de la especificación del archivo JAR del conector de Cloud Storage cuando envías una tarea a un clúster de Dataproc.
Ejemplo: envío de un trabajo de Spark de la CLI de gcloud con el conector de Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Haz los cambios siguientes:
CONNECTOR_VERSION: versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector
.
Leer tablas de Spanner
Puedes usar Python o Scala para leer datos de tablas de Spanner en un DataFrame de Spark mediante la API de fuente de datos de Spark.
PySpark
Puedes ejecutar el código de ejemplo de PySpark de esta sección en tu clúster enviando el trabajo al servicio Dataproc o ejecutándolo desde el REPL de spark-submit
en el nodo maestro del clúster.
Tarea de Dataproc
- Crea un archivo
singers.py
con un editor de texto local o en Cloud Shell con el editor de textovi
,vim
onano
preinstalado. - Después de rellenar las variables de marcador de posición, pega el siguiente código en el archivo
singers.py
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Haz los cambios siguientes:
- PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos
Singers
.
- Guarda el archivo
singers.py
. - Envía el trabajo al servicio de Dataproc mediante la Google Cloud consola, la CLI de gcloud o la API de Dataproc.
Ejemplo: envío de un trabajo de la CLI de gcloud con el conector de Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Haz los cambios siguientes:
- CLUSTER_NAME: el nombre del nuevo clúster.
- REGION: una región de Compute Engine disponible para ejecutar la carga de trabajo.
- CONNECTOR_VERSION: versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Tarea spark-submit
- Conéctate al nodo maestro del clúster de Dataproc mediante SSH.
- Ve a la página de Dataproc Clusters (Clústeres) en la Google Cloud consola y, a continuación, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. A continuación, haga clic en
SSH
a la derecha del nombre del nodo maestro del clúster.Se abrirá una ventana del navegador en el directorio principal del nodo maestro.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.py
en el nodo maestro con el editor de textovi
,vim
onano
preinstalado.- Pegue el siguiente código en el archivo
singers.py
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Haz los cambios siguientes:
- PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos
Singers
.
- Guarda el archivo
singers.py
.
- Pegue el siguiente código en el archivo
- Ejecuta
singers.py
conspark-submit
para crear la tablaSingers
de Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Haz los cambios siguientes:
- CONNECTOR_VERSION: versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
.
El resultado es el siguiente:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
Scala
Para ejecutar el código de Scala de ejemplo en tu clúster, sigue estos pasos:
- Conéctate al nodo maestro del clúster de Dataproc mediante SSH.
- Ve a la página de Dataproc Clusters (Clústeres) en la Google Cloud consola y, a continuación, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. A continuación, haga clic en
SSH
a la derecha del nombre del nodo maestro del clúster.Se abrirá una ventana del navegador en el directorio principal del nodo maestro.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.scala
en el nodo maestro con el editor de textovi
,vim
onano
preinstalado.- Pegue el siguiente código en el archivo
singers.scala
. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Haz los cambios siguientes:
- PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del Google Cloud panel de control de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : consulta Configurar una instancia de Spanner con una tabla de base de datos
Singers
.
- Guarda el archivo
singers.scala
.
- Pegue el siguiente código en el archivo
- Inicia el
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Haz los cambios siguientes:
CONNECTOR_VERSION: versión del conector de Spanner. Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Ejecuta
singers.scala
con el comando:load singers.scala
para crear la tablaSingers
de Spanner. En la lista de resultados se muestran ejemplos de los resultados de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Leer gráficos de Spanner
El conector de Spanner permite exportar el gráfico a DataFrames de nodos y aristas independientes, así como exportarlo directamente a GraphFrames
.
En el siguiente ejemplo se exporta un Spanner a un GraphFrame
.
Usa la clase SpannerGraphConnector
de Python, incluida en el archivo JAR del conector de Spanner, para leer el gráfico de Spanner.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Haz los cambios siguientes:
- CONNECTOR_VERSION: versión del conector de Spanner.
Elige la versión del conector de Spanner en la lista de versiones del repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector
. - PROJECT_ID: tu ID de proyecto Google Cloud . Los IDs de proyecto se indican en la sección Información del proyecto del panel de control de la consola Google Cloud .
- INSTANCE_ID, DATABASE_ID y TABLE_NAME Insert los IDs de instancia, base de datos y gráfico.
Para exportar nodos y aristas DataFrames
en lugar de GraphFrames, usa load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Limpieza
Para evitar que se te apliquen cargos continuos en tu cuenta de Google Cloud , puedes detener o eliminar tu clúster de Dataproc y eliminar tu instancia de Spanner.
Siguientes pasos
- Consulta los ejemplos de
pyspark.sql.DataFrame
. - Para obtener información sobre los idiomas admitidos en Spark DataFrame, consulta lo siguiente:
- Consulta el repositorio Spark Spanner Connector en GitHub.
- Consulta los consejos para ajustar las tareas de Spark.