Usa el conector spark-bigquery-connector
con Apache Spark
para leer y escribir datos desde y hacia BigQuery.
En este instructivo, se muestra una aplicación de PySpark que usa spark-bigquery-connector
.
Usa el conector de BigQuery con tu carga de trabajo
Consulta Dataproc Serverless para las versiones del entorno de ejecución de Spark para determinar la versión del conector de BigQuery que está instalada en la versión del entorno de ejecución de tu carga de trabajo por lotes. Si no aparece el conector, consulta la siguiente sección para obtener instrucciones sobre cómo hacerlo disponible para las aplicaciones.
Cómo usar el conector con la versión 2.0 del entorno de ejecución de Spark
El conector de BigQuery no está instalado en la versión 2.0 del entorno de ejecución de Spark. Cuando uses la versión 2.0 del entorno de ejecución de Spark, puedes hacer que el conector esté disponible para tu aplicación de una de las siguientes maneras:
- Usa el parámetro
jars
para apuntar a un archivo jar del conector cuando envíes tu carga de trabajo por lotes de Dataproc Serverless para Spark. En el siguiente ejemplo, se especifica un archivo jar del conector (consulta el repositorio GoogleCloudDataproc/spark-bigquery-connector en GitHub para obtener una lista de los archivos jar de conectores disponibles).- Ejemplo de Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Ejemplo de Google Cloud CLI:
- Incluye el archivo jar del conector en tu aplicación Spark como una dependencia. (consulta Compilación en el conector)
Calcula los costos
En este instructivo, se usan componentes facturables de Google Cloud, que incluyen lo siguiente:
- Dataproc sin servidores
- BigQuery
- Cloud Storage
Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform podrían ser aptos para una prueba gratuita.
BigQuery E/S
Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.
El conector escribe el resultado de recuento de palabras en BigQuery de la siguiente manera:
Almacenamiento en búfer de los datos en archivos temporales en tu bucket de Cloud Storage
Copia los datos en una operación de tu bucket de Cloud Storage a BigQuery
Borrar los archivos temporales en Cloud Storage después de que se complete la operación de carga de BigQuery (los archivos temporales también se borran después de que finaliza la aplicación de Spark) Si la eliminación falla, deberás borrar cualquier archivo temporal no deseado de Cloud Storage, que por lo general se ubica en
gs://your-bucket/.spark-bigquery-jobid-UUID
.
Configura la facturación
De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
También se puede agregar a una operación de lectura o escritura, de la siguiente manera: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Envía una carga de trabajo por lotes de conteo de palabras de PySpark
- Crea el
wordcount_dataset
con la herramienta de línea de comandos bq en una terminal local o en Cloud Shell.bq mk wordcount_dataset
- Crea un bucket de Cloud Storage con Google Cloud CLI en una terminal local o en Cloud Shell.
gcloud storage buckets create gs://your-bucket
- Examina el código.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Copia el elemento
wordcount.py
de forma local en un editor de texto el código de PySpark desde Lista de código de PySpark, reemplaza el [your-bucket] con el nombre del bucket de Cloud Storage que creaste. - Envía la carga de trabajo por lotes de PySpark:
Resultado de la terminal de muestra:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para obtener una vista previa de la tabla de salida en la consola de Google Cloud, abre la pestaña BigQuery selecciona la tablawordcount_output
y, luego, haz clic Vista previa.
Más información
- BigQuery Storage y Spark SQL: Python
- Crea un archivo de definición de tablas para una fuente de datos externa
- Consulta datos particionados de forma externa