Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
El spark-bigquery-connector
se usa con Apache Spark
para leer y escribir datos desde y hacia BigQuery.
En este instructivo, se proporciona un código de ejemplo que usa el conector spark-bigquery dentro de una aplicación de Spark.
Si deseas obtener instrucciones para crear un clúster, consulta el
Guías de inicio rápido de Dataproc.
Haz que el conector esté disponible para tu aplicación
Puedes hacer que el conector spark-bigquery esté disponible para tu aplicación
de una de las siguientes maneras:
gcloud dataproc jobs submit spark \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
-- job-args
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
Dataproc
BigQuery
Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.
El conector escribe los datos en BigQuery
almacenar en búfer todos los datos
en una tabla temporal de Cloud Storage. Luego,
copia todos los datos a BigQuery en una sola operación. El conector intenta borrar los archivos temporales una vez que la operación de carga de BigQuery se realiza correctamente, y lo vuelve a hacer cuando la aplicación Spark finaliza.
Si el trabajo falla, quita las tareas temporales restantes
Archivos de Cloud Storage. Por lo general, los eventos temporales de BigQuery
se encuentran en gs://[bucket]/.spark-bigquery-[jobid]-[UUID].
Cómo configurar la facturación
De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio está
factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
También se puede agregar a una operación de lectura o escritura, de la siguiente manera: .option("parentProject", "<BILLED-GCP-PROJECT>").
Ejecuta el código
Antes de ejecutar este ejemplo, crea un conjunto de datos llamado “wordcount_dataset” o cambia el conjunto de datos de salida en el código a un conjunto de datos de BigQuery existente en tu proyecto de Google Cloud.
Usa el comando de bq para crear el wordcount_dataset:
bq mk wordcount_dataset
Usa el comando de Google Cloud CLI
para crear un bucket de Cloud Storage, que se usará para exportar a
BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
Examina el código y reemplaza el marcador de posición [bucket] por
en el bucket de Cloud Storage
que creaste anteriormente.
/*
* Remove comment if you are not running in spark-shell.
*
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("spark-bigquery-demo")
.getOrCreate()
*/
// Use the Cloud Storage bucket for temporary BigQuery export data used
// by the connector.
val bucket = "[bucket]"
spark.conf.set("temporaryGcsBucket", bucket)
// Load data in from BigQuery. See
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
// for option information.
val wordsDF =
(spark.read.format("bigquery")
.option("table","bigquery-public-data:samples.shakespeare")
.load()
.cache())
wordsDF.createOrReplaceTempView("words")
// Perform word count.
val wordCountDF = spark.sql(
"SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
wordCountDF.show()
wordCountDF.printSchema()
// Saving the data to BigQuery.
(wordCountDF.write.format("bigquery")
.option("table","wordcount_dataset.wordcount_output")
.save())
Ejecuta el código en tu clúster
Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
Ve a la sección
Clústeres de Dataproc
de la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster
.
En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH a la derecha del nombre del nodo de la instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principal.
Crea wordcount.scala con el vi preinstalado.
vim, o nano, editor de texto y, luego, pega en Scala
código de la
Lista de código de escalar
nano wordcount.scala
Inicia el REPL spark-shell.
$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
...
Using Scala version ...
Type in expressions to have them evaluated.
Type :help for more information.
...
Spark context available as sc.
...
SQL context available as sqlContext.
scala>
Ejecuta wordcount.scala con el comando :load wordcount.scala para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.
Para obtener una vista previa de la tabla de resultados, abre la
BigQuery
selecciona la tabla wordcount_output y, luego, haz clic
Vista previa.
PySpark
Examina el código y reemplaza el marcador de posición [bucket] por
en el bucket de Cloud Storage
que creaste anteriormente.
#!/usr/bin/env python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master('yarn') \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = "[bucket]"
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery.
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
word_count.show()
word_count.printSchema()
# Save the data to BigQuery
word_count.write.format('bigquery') \
.option('table', 'wordcount_dataset.wordcount_output') \
.save()
Ejecuta el código en tu clúster
Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
Ve a la sección
Clústeres de Dataproc
de la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster
.
En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH a la derecha del nombre del nodo de la instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principal.
Crea wordcount.py con el vi preinstalado.
Editor de texto vim o nano; luego, pégalo en PySpark
código de la
Lista de código de PySpark
nano wordcount.py
Ejecuta el conteo de palabras con spark-submit para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.