El spark-bigquery-connector se usa con Apache Spark para leer y escribir datos desde y hacia BigQuery. En este instructivo, se proporciona un código de ejemplo que usa el conector spark-bigquery dentro de una aplicación de Spark. Para obtener instrucciones sobre cómo crear un clúster, consulta las Guías de inicio rápido de Dataproc.
Haz que el conector esté disponible para tu aplicación
Puedes hacer que el conector spark-bigquery esté disponible para tu aplicación de una de las siguientes maneras:
Instala spark-bigquery-connector en el directorio Spark jars de cada nodo mediante la acción de inicialización de los conectores de Dataproc cuando crees el clúster.
Proporciona el URI del conector cuando envíes tu trabajo:
- Consola de Google Cloud: Usa el elemento
Jars files
del trabajo de Spark en la página Submit a job de Dataproc. - CLI de gcloud: Usa la marca
gcloud dataproc jobs submit spark --jars
. - API de Dataproc: Usa el campo
SparkJob.jarFileUris
.
- Consola de Google Cloud: Usa el elemento
Incluye el jar en tu aplicación de Scala o Java Spark como una dependencia (consulta Compila con el conector).
Cómo especificar el URI del jar del conector
Las versiones del conector de BigQuery de Spark se enumeran en el repositorio de GoogleCloudDataproc/spark-bigquery-connector de GitHub.
Para especificar el jar del conector, sustituye la información de Scala y la versión del conector en la siguiente string de URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Usa
2.12
de Scala con las versiones de imágenes de Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Ejemplo de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Usa
2.11
de Scala con las versiones de imagen1.4
y anteriores de Dataproc:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Ejemplo de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Cómo leer y escribir datos desde BigQuery
Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.
El conector escribe los datos en BigQuery. Para ello, primero almacena en búfer todos los datos en una tabla temporal de Cloud Storage. Luego, copia todos los datos en BigQuery en una sola operación. El conector intenta borrar los archivos temporales una vez que la operación de carga de BigQuery se realiza correctamente, y lo vuelve a hacer cuando la aplicación Spark finaliza.
Si el trabajo falla, quita los archivos temporales de Cloud Storage restantes. Por lo general, los archivos temporales de BigQuery se encuentran en gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Cómo configurar la facturación
De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
También se puede agregar a una operación de lectura o escritura, de la siguiente manera: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Ejecuta el código
Antes de ejecutar este ejemplo, crea un conjunto de datos llamado “wordcount_dataset” o cambia el conjunto de datos de salida en el código a un conjunto de datos de BigQuery existente en tu proyecto de Google Cloud.
Usa el comando de bq para crear el wordcount_dataset
:
bq mk wordcount_dataset
Usa el comando de gsutil para crear un depósito de Cloud Storage, que se usará a fin de exportar a BigQuery:
gsutil mb gs://[bucket]
Scala
- Examina el código y reemplaza el marcador de posición [bucket] por el bucket de Cloud Storage que creaste antes.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Ejecuta el código en tu clúster.
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc.
- Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster
- En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principal.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
wordcount.scala
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de Scala de la lista de código de Scala.nano wordcount.scala
- Inicia el REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Ejecuta wordcount.scala con el comando
:load wordcount.scala
para crear la tabla de BigQuerywordcount_output
. La lista de salida muestra 20 líneas del resultado del recuento de palabras.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para obtener una vista previa de la tabla de salida, abre la páginaBigQuery
, selecciona la tablawordcount_output
y, luego, haz clic en Vista previa.
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc.
PySpark
- Examina el código y reemplaza el marcador de posición [bucket] por el bucket de Cloud Storage que creaste antes.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Ejecuta el código en tu clúster
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc.
- Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principal.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
wordcount.py
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de PySpark de la lista de código de PySpark.nano wordcount.py
- Ejecuta el conteo de palabras con
spark-submit
para crear la tabla de BigQuerywordcount_output
. La lista de salida muestra 20 líneas del resultado del recuento de palabras.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para obtener una vista previa de la tabla de salida, abre la páginaBigQuery
, selecciona la tablawordcount_output
y, luego, haz clic en Vista previa.
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc.
Para más información
- Almacenamiento de BigQuery y Spark SQL: Python
- Crea un archivo de definición de tablas para una fuente de datos externa
- Consulta datos particionados de forma externa
- Sugerencias de ajuste para trabajos de Spark