Usa el conector spark-bigquery con Apache Spark para leer y escribir datos desde y hacia BigQuery.
En este tutorial se muestra una aplicación de PySpark que usa spark-bigquery-connector
.
Usar el conector de BigQuery con su carga de trabajo
Consulta las versiones de tiempo de ejecución de Serverless para Apache Spark para determinar la versión del conector de BigQuery que está instalada en la versión de tiempo de ejecución de tu carga de trabajo por lotes. Si el conector no aparece en la lista, consulta la siguiente sección para obtener instrucciones sobre cómo hacer que el conector esté disponible para las aplicaciones.
Cómo usar el conector con la versión 2.0 del tiempo de ejecución de Spark
El conector de BigQuery no está instalado en la versión 2.0 del tiempo de ejecución de Spark. Si usas la versión 2.0 del tiempo de ejecución de Spark, puedes hacer que el conector esté disponible para tu aplicación de una de las siguientes formas:
- Usa el parámetro
jars
para apuntar a un archivo JAR de conector cuando envíes tu Google Cloud carga de trabajo por lotes de Serverless para Apache Spark. Google Cloud En el siguiente ejemplo se especifica un archivo JAR de conector (consulta el repositorio GoogleCloudDataproc/spark-bigquery-connector en GitHub para ver una lista de los archivos JAR de conectores disponibles).- Ejemplo de Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Ejemplo de Google Cloud CLI:
- Incluye el archivo JAR del conector en tu aplicación Spark como dependencia (consulta Compilar con el conector).
Calcular los costes
En este tutorial se usan componentes facturables de Google Cloud, como los siguientes:
- Serverless para Apache Spark
- BigQuery
- Cloud Storage
Si quieres generar una estimación de costes en función del uso previsto, usa la calculadora de precios.
E/S de BigQuery
En este ejemplo se leen datos de BigQuery y se insertan en un DataFrame de Spark para contar palabras mediante la API de fuente de datos estándar.
El conector escribe el resultado de WordCount en BigQuery de la siguiente manera:
Almacenando los datos en archivos temporales de tu segmento de Cloud Storage
Copiar los datos en una sola operación desde tu segmento de Cloud Storage a BigQuery
Eliminar los archivos temporales de Cloud Storage una vez que se haya completado la operación de carga de BigQuery (los archivos temporales también se eliminan cuando finaliza la aplicación Spark). Si la eliminación falla, tendrás que eliminar los archivos temporales de Cloud Storage que no quieras, que normalmente se encuentran en
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Configurar la facturación
De forma predeterminada, el proyecto asociado a las credenciales o a la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, define la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
También puedes añadir a una operación de lectura o escritura, como se indica a continuación:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Enviar una carga de trabajo por lotes de recuento de palabras de PySpark
Ejecuta una carga de trabajo por lotes de Spark que cuenta el número de palabras de un conjunto de datos público.
- Abre un terminal local o Cloud Shell.
- Crea el
wordcount_dataset
con la herramienta de línea de comandos bq en un terminal local o en Cloud Shell.bq mk wordcount_dataset
- Crea un segmento de Cloud Storage con Google Cloud CLI.
Sustituyegcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
por el nombre del segmento de Cloud Storage que has creado. - Crea el archivo
wordcount.py
localmente en un editor de texto copiando el siguiente código de PySpark.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Envía la carga de trabajo por lotes de PySpark:
Ejemplo de salida de terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para previsualizar la tabla de resultados en la Google Cloud consola, abre la página de BigQuery de tu proyecto, selecciona la tablawordcount_output
y haz clic en Vista previa.
Más información
- Almacenamiento de BigQuery y Spark SQL - Python
- Crear un archivo de definición de tabla para una fuente de datos externa
- Usar datos con particiones externas