Usar el conector de Bigtable para Spark
El conector de Bigtable Spark te permite leer y escribir datos en Bigtable. Puedes leer datos de tu aplicación Spark mediante Spark SQL y DataFrames. El conector de Bigtable para Spark admite las siguientes operaciones de Bigtable:
- Escribir datos
- Leer datos
- Cómo crear una tabla
En este documento se explica cómo convertir una tabla de DataFrames de Spark SQL en una tabla de Bigtable y, a continuación, compilar y crear un archivo JAR para enviar una tarea de Spark.
Estado de la compatibilidad con Spark y Scala
El conector de Bigtable para Spark admite las siguientes versiones de Scala:
El conector de Bigtable para Spark admite las siguientes versiones de Spark:
El conector de Bigtable Spark admite las siguientes versiones de Dataproc:
- 1.5. Clúster image-version
- Clúster de versiones de imagen 2.0
- 2.1. Clúster image-version
- 2.2 clúster de versiones de imagen
- Versión 1.0 del tiempo de ejecución de Dataproc Serverless
Calcular los costes
Si decides usar alguno de los siguientes componentes facturables de Google Cloud, se te cobrarán los recursos que utilices:
- Bigtable (no se te cobra por usar el emulador de Bigtable)
- Dataproc
- Cloud Storage
Los precios de Dataproc se aplican al uso de Dataproc en clústeres de Compute Engine. Los precios de Dataproc Serverless se aplican a las cargas de trabajo y las sesiones que se ejecutan en Dataproc Serverless para Spark.
Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.
Antes de empezar
Antes de usar el conector de Bigtable para Spark, debes cumplir los siguientes requisitos previos.
Roles obligatorios
Para obtener los permisos que necesitas para usar el conector de Bigtable para Spark, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos (IAM) en tu proyecto:
-
Administrador de Bigtable (
roles/bigtable.admin
)(opcional): permite leer o escribir datos y crear una tabla. -
Usuario de Bigtable (
roles/bigtable.user
): permite leer o escribir datos, pero no crear una tabla.
Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Si usas Dataproc o Cloud Storage, es posible que necesites permisos adicionales. Para obtener más información, consulta los permisos de Dataproc y los permisos de Cloud Storage.
Configurar Spark
Además de crear una instancia de Bigtable, también debes configurar tu instancia de Spark. Puedes hacerlo de forma local o seleccionar una de estas opciones para usar Spark con Dataproc:
- Clúster de Dataproc
- Dataproc Serverless
Para obtener más información sobre cómo elegir entre un clúster de Dataproc o una opción sin servidor, consulta la documentación Comparación entre Dataproc Serverless para Spark y Dataproc en Compute Engine .
Descargar el archivo JAR del conector
Puedes encontrar el código fuente del conector de Bigtable para Spark con ejemplos en el repositorio de GitHub del conector de Bigtable para Spark.
En función de la configuración de Spark, puedes acceder al archivo JAR de las siguientes formas:
Si ejecutas PySpark de forma local, debes descargar el archivo JAR del conector desde la ubicación de
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage.Sustituye
SCALA_VERSION
por2.12
o2.13
, que son las únicas versiones de Scala admitidas, y sustituyeCONNECTOR_VERSION
por la versión del conector que quieras usar.En el caso de los clústeres de Dataproc o de la opción sin servidor, usa el archivo JAR más reciente como artefacto que se puede añadir a tus aplicaciones de Scala o Java Spark. Para obtener más información sobre cómo usar el archivo JAR como artefacto, consulta Gestionar dependencias.
Si envías tu trabajo de PySpark a Dataproc, usa la marca
gcloud dataproc jobs submit pyspark --jars
para definir el URI de la ubicación del archivo JAR en Cloud Storage (por ejemplo,gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
).
Determinar el tipo de cálculo
En el caso de los trabajos de solo lectura, puedes usar la computación sin servidor de Data Boost, que te permite evitar que se vean afectados los clústeres que sirven tu aplicación. Tu aplicación Spark debe usar la versión 1.1.0 o una posterior del conector de Spark para usar Data Boost.
Para usar Data Boost, debes crear un perfil de aplicación de Data Boost y, a continuación, proporcionar el ID del perfil de aplicación para la spark.bigtable.app_profile.id
opción Spark al añadir la configuración de Bigtable a tu aplicación Spark. Si ya has creado un perfil de aplicación para tus trabajos de lectura de Spark y quieres seguir usándolo sin cambiar el código de tu aplicación, puedes convertirlo en un perfil de aplicación de Data Boost. Para obtener más información, consulta Convertir un perfil de aplicación.
Para obtener más información, consulte la descripción general de Data Boost de Bigtable.
En el caso de los trabajos que implican lecturas y escrituras, puedes usar los nodos del clúster de tu instancia para realizar cálculos. Para ello, especifica un perfil de aplicación estándar en tu solicitud.
Identifica o crea un perfil de aplicación que quieras usar.
Si no especificas un ID de perfil de aplicación, el conector usará el perfil de aplicación predeterminado.
Te recomendamos que uses un perfil de aplicación único para cada aplicación que ejecutes, incluida tu aplicación Spark. Para obtener más información sobre los tipos y la configuración de los perfiles de aplicaciones, consulta el resumen de los perfiles de aplicaciones. Para obtener instrucciones, consulta Crear y configurar perfiles de aplicaciones.
Añadir la configuración de Bigtable a tu aplicación Spark
En tu aplicación Spark, añade las opciones de Spark que te permitan interactuar con Bigtable.
Opciones de Spark admitidas
Usa las opciones de Spark que están disponibles en el paquete com.google.cloud.spark.bigtable
.
Nombre de la opción | Obligatorio | Valor predeterminado | Significado |
---|---|---|---|
spark.bigtable.project.id |
Sí | N/A | Define el ID del proyecto de Bigtable. |
spark.bigtable.instance.id |
Sí | N/A | Define el ID de la instancia de Bigtable. |
catalog |
Sí | N/A | Define el formato JSON que especifica el formato de conversión entre el esquema de tipo SQL de DataFrame y el esquema de la tabla de Bigtable. Consulta Crear metadatos de tabla en formato JSON para obtener más información. |
spark.bigtable.app_profile.id |
No | default |
Define el ID de perfil de aplicación de Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
No | Hora actual del sistema | Define la marca de tiempo en milisegundos que se usará al escribir un DataFrame en Bigtable. Ten en cuenta que, como todas las filas del DataFrame usan la misma marca de tiempo, las filas con la misma columna de clave de fila del DataFrame se conservan como una sola versión en Bigtable, ya que comparten la misma marca de tiempo. |
spark.bigtable.create.new.table |
No | false |
Defina el valor true para crear una tabla antes de escribir en Bigtable. |
spark.bigtable.read.timerange.start.milliseconds o spark.bigtable.read.timerange.end.milliseconds |
No | N/A | Define marcas de tiempo (en milisegundos desde la época) para filtrar las celdas con una fecha de inicio y una de finalización concretas, respectivamente. |
spark.bigtable.push.down.row.key.filters |
No | true |
Defina el valor true para permitir el filtrado de claves de fila simples en el lado del servidor. El filtrado por claves de fila compuestas se implementa en el lado del cliente.Consulta Leer una fila de DataFrame específica mediante un filtro para obtener más información. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
No | 30 min | Define la duración del tiempo de espera de un intento de lectura de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
No | 12 h | Define la duración del tiempo de espera total de un intento de lectura de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
No | 1 m | Define la duración del tiempo de espera de un intento de mutación de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
No | 10 min | Define la duración del tiempo de espera total de un intento de mutación de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.batch.mutate.size |
No | 100 |
Se define como el número de mutaciones de cada lote. El valor máximo que puede definir es 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
No | false |
Defina el valor true para habilitar el control de flujo de las mutaciones por lotes. |
Crear metadatos de tabla en formato JSON
El formato de tabla de DataFrames de Spark SQL debe convertirse en una tabla de Bigtable mediante una cadena con formato JSON. Este formato JSON de cadena hace que el formato de datos sea compatible con Bigtable. Puedes transferir el formato JSON en el código de tu aplicación mediante la opción .option("catalog", catalog_json_string)
.
Por ejemplo, considere la siguiente tabla de DataFrame y la tabla de Bigtable correspondiente.
En este ejemplo, las columnas name
y birthYear
del DataFrame se agrupan en la familia de columnas info
y se les cambia el nombre a name
y birth_year
, respectivamente. Del mismo modo, la columna address
se almacena en la familia de columnas location
con el mismo nombre de columna. La columna id
del DataFrame se convierte en la clave de fila de Bigtable.
Las claves de fila no tienen un nombre de columna específico en Bigtable. En este ejemplo, id_rowkey
solo se usa para indicar al conector que se trata de la columna de claves de fila. Puede usar cualquier nombre para la columna de clave de fila y asegurarse de que usa el mismo nombre al declarar el campo "rowkey":"column_name"
en formato JSON.
DataFrame | Tabla de Bigtable = t1 | |||||||
Columnas | Clave de fila | Familias de columnas | ||||||
información | ubicación | |||||||
Columnas | Columnas | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
El formato JSON del catálogo es el siguiente:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Las claves y los valores que se usan en el formato JSON son los siguientes:
Clave de catálogo | Valor de catálogo | Formato JSON |
---|---|---|
tabla | Nombre de la tabla de Bigtable. | "table":{"name":"t1"} Si la tabla no existe, usa .option("spark.bigtable.create.new.table", "true") para crearla. |
rowkey | Nombre de la columna que se usará como clave de fila de Bigtable. Asegúrate de que el nombre de la columna DataFrame se usa como clave de fila (por ejemplo, id_rowkey ). También se aceptan claves compuestas como claves de fila. Por ejemplo, "rowkey":"name:address" . Este enfoque puede dar lugar a claves de fila que requieran un análisis de tabla completo para todas las solicitudes de lectura. |
"rowkey":"id_rowkey" , |
columnas | Asignación de cada columna de DataFrame a la familia de columnas ("cf" ) y al nombre de columna ("col" ) correspondientes de Bigtable. El nombre de la columna puede ser diferente del nombre de la columna de la tabla DataFrame. Los tipos de datos admitidos son string , long y binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" En este ejemplo, id_rowkey es la clave de fila y info y location son las familias de columnas. |
Tipos de datos admitidos
El conector admite los tipos string
, long
y binary
(matriz de bytes) en el catálogo. Hasta que se añada la compatibilidad con otros tipos, como int
y float
, puedes convertir manualmente estos tipos de datos en arrays de bytes (BinaryType
de Spark SQL) antes de usar el conector para escribirlos en Bigtable.
Además, puedes usar Avro para serializar tipos complejos, como ArrayType
. Para obtener más información, consulta Serializar tipos de datos complejos con Apache Avro.
Escribir en Bigtable
Usa la función .write()
y las opciones admitidas para escribir tus datos en Bigtable.
Java
El siguiente código del repositorio de GitHub usa Java y Maven para escribir en Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
El siguiente código del repositorio de GitHub usa Python para escribir en Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Leer desde Bigtable
Usa la función .read()
para comprobar si la tabla se ha importado correctamente a Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compilar un proyecto
Genera el archivo JAR que se usa para ejecutar una tarea en un clúster de Dataproc, Dataproc sin servidor o una instancia de Spark local. Puedes compilar el archivo JAR de forma local y, después, usarlo para enviar un trabajo. La ruta al archivo JAR compilado se define como la variable de entorno PATH_TO_COMPILED_JAR
cuando envías un trabajo.
Este paso no se aplica a las aplicaciones PySpark.
Gestionar dependencias
El conector de Bigtable Spark admite las siguientes herramientas de gestión de dependencias:
Compila el archivo JAR
Maven
Añade la dependencia
spark-bigtable
a tu archivo pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Añade el complemento Maven Shade a tu archivo
pom.xml
para crear un uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Ejecuta el comando
mvn clean install
para generar un archivo JAR.
sbt
Añade la dependencia
spark-bigtable
a tu archivobuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Añade el complemento
sbt-assembly
a tu archivoproject/plugins.sbt
oproject/assembly.sbt
para crear un archivo JAR de Uber.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Ejecuta el comando
sbt clean assembly
para generar el archivo JAR.
Gradle
Añade la dependencia
spark-bigtable
a tu archivobuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Añade el complemento Shadow a tu archivo
build.gradle
para crear un archivo uber JAR:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Consulta la documentación del complemento Shadow para obtener más información sobre la configuración y la compilación de JARs.
Enviar una tarea
Envía una tarea de Spark mediante Dataproc, Dataproc Serverless o una instancia de Spark local para iniciar tu aplicación.
Definir el entorno de ejecución
Define las siguientes variables de entorno.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Haz los cambios siguientes:
- PROJECT_ID: identificador permanente del proyecto de Bigtable.
- INSTANCE_ID: identificador permanente de la instancia de Bigtable.
- TABLE_NAME: identificador permanente de la tabla.
- DATAPROC_CLUSTER: identificador permanente del clúster de Dataproc.
- DATAPROC_REGION: la región de Dataproc que contiene uno de los clústeres de tu instancia de Dataproc. Por ejemplo,
northamerica-northeast2
. - DATAPROC_ZONE: la zona en la que se ejecuta el clúster de Dataproc.
- SUBNET: ruta de recurso completa de la subred.
- GCS_BUCKET_NAME: el segmento de Cloud Storage al que se subirán las dependencias de la carga de trabajo de Spark.
- PATH_TO_COMPILED_JAR: la ruta completa o relativa al archivo JAR compilado. Por ejemplo,
/path/to/project/root/target/<compiled_JAR_name>
para Maven. - GCS_PATH_TO_CONNECTOR_JAR: el segmento de
gs://spark-lib/bigtable
Cloud Storage en el que se encuentra el archivospark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
. - PATH_TO_PYTHON_FILE: en el caso de las aplicaciones PySpark, la ruta al archivo Python que se usará para escribir datos en Bigtable y leerlos.
- LOCAL_PATH_TO_CONNECTOR_JAR: en el caso de las aplicaciones PySpark, ruta al archivo JAR del conector Spark de Bigtable descargado.
Enviar una tarea de Spark
En las instancias de Dataproc o en tu configuración local de Spark, ejecuta un trabajo de Spark para subir datos a Bigtable.
Clúster de Dataproc
Usa el archivo JAR compilado y crea un trabajo de clúster de Dataproc que lea y escriba datos desde y hacia Bigtable.
Crea un clúster de Dataproc. En el siguiente ejemplo se muestra un comando de muestra para crear un clúster de Dataproc 2.0 con Debian 10, dos nodos de trabajador y configuraciones predeterminadas.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Envía una tarea.
Scala/Java
En el siguiente ejemplo se muestra la clase
spark.bigtable.example.WordCount
, que incluye la lógica para crear una tabla de prueba en DataFrame, escribir la tabla en Bigtable y, a continuación, contar el número de palabras de la tabla.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
Usa el archivo JAR compilado y crea un trabajo de Dataproc que lea y escriba datos desde y hacia Bigtable con una instancia de Dataproc sin servidor.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Local Spark
Usa el archivo JAR descargado y crea un trabajo de Spark que lea y escriba datos desde y hacia Bigtable con una instancia de Spark local. También puedes usar el emulador de Bigtable para enviar el trabajo de Spark.
Usar el emulador de Bigtable
Si decides usar el emulador de Bigtable, sigue estos pasos:
Ejecuta el siguiente comando para iniciar el emulador:
gcloud beta emulators bigtable start
De forma predeterminada, el emulador elige
localhost:8086
.Define la variable de entorno
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Para obtener más información sobre cómo usar el emulador de Bigtable, consulta Hacer pruebas con el emulador.
Enviar una tarea de Spark
Usa el comando spark-submit
para enviar un trabajo de Spark, independientemente de si usas un emulador de Bigtable local.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Verificar los datos de la tabla
Ejecuta el siguiente comando de la
cbt
CLI
para verificar que los datos se han escrito en Bigtable. La
CLI de cbt
es un componente de Google Cloud CLI. Para obtener más información, consulta el
resumen de la CLI de cbt
.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Soluciones adicionales
Utilice el conector de Spark de Bigtable para soluciones específicas, como serializar tipos complejos de Spark SQL, leer filas específicas y generar métricas del lado del cliente.
Leer una fila de DataFrame específica con un filtro
Cuando usas DataFrames para leer datos de Bigtable, puedes especificar un filtro para leer solo filas concretas. Los filtros simples, como ==
, <=
y startsWith
en la columna de clave de fila, se aplican del lado del servidor para evitar un análisis de toda la tabla. Los filtros de claves de fila compuestas o los filtros complejos, como el filtro LIKE
de la columna de clave de fila, se aplican en el lado del cliente.
Si lees tablas grandes, te recomendamos que uses filtros de clave de fila sencillos para evitar realizar un análisis de toda la tabla. En la siguiente instrucción de ejemplo se muestra cómo leer usando un filtro sencillo. Asegúrate de que, en el filtro de Spark, usas el nombre de la columna DataFrame que se convierte en la clave de fila:
dataframe.filter("id == 'some_id'").show()
Cuando apliques un filtro, usa el nombre de la columna del DataFrame en lugar del nombre de la columna de la tabla de Bigtable.
Serializar tipos de datos complejos con Apache Avro
El conector de Spark de Bigtable permite usar Apache Avro para serializar tipos de Spark SQL complejos, como ArrayType
, MapType
o StructType
. Apache Avro proporciona serialización de datos para datos de registro que se suelen usar para procesar y almacenar estructuras de datos complejas.
Usa una sintaxis como "avro":"avroSchema"
para especificar que una columna de Bigtable se debe codificar con Avro. Después, puede usar .option("avroSchema", avroSchemaString)
al leer o escribir en Bigtable para especificar el esquema Avro correspondiente a esa columna en formato de cadena. Puedes usar nombres de opciones diferentes (por ejemplo, "anotherAvroSchema"
) para distintas columnas y transferir esquemas Avro para varias columnas.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Usar métricas del lado del cliente
Como el conector de Spark de Bigtable se basa en el cliente de Bigtable para Java, las métricas del lado del cliente están habilitadas de forma predeterminada en el conector. Consulta la documentación sobre las métricas del lado del cliente para obtener más información sobre cómo acceder a estas métricas e interpretarlas.
Usar el cliente de Bigtable para Java con funciones RDD de nivel inferior
Como el conector de Bigtable para Spark se basa en el cliente de Bigtable para Java, puedes usarlo directamente en tus aplicaciones de Spark y realizar solicitudes de lectura o escritura distribuidas en las funciones de RDD de bajo nivel, como mapPartitions
y foreachPartition
.
Para usar las clases del cliente de Bigtable para Java, añade el prefijo com.google.cloud.spark.bigtable.repackaged
a los nombres de los paquetes. Por ejemplo, en lugar de usar el nombre de clase com.google.cloud.bigtable.data.v2.BigtableDataClient
, usa com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Para obtener más información sobre el cliente de Bigtable para Java, consulta el cliente de Bigtable para Java.
Siguientes pasos
- Consulta cómo optimizar tu tarea de Spark en Dataproc.
- Usa las clases del cliente de Bigtable para Java con el conector de Bigtable para Spark.