Use the spark-bigquery-connector
with Apache Spark
to read and write data from and to BigQuery.
This tutorial demonstrates a PySpark application that uses the
spark-bigquery-connector
.
Use the BigQuery connector with your workload
See Dataproc Serverless for Spark runtime releases to determine the BigQuery connector version that is installed in your batch workload runtime version. If the connector is not listed, see the next section for instructions on how to make the connector available to applications.
How to use the connector with Spark runtime version 2.0
The BigQuery connector is not installed in Spark runtime version 2.0. When using Spark runtime version 2.0, you can make the connector available to your application in one of the following ways:
- Use the
jars
parameter to point to a connector jar file when you submit your Dataproc Serverless for Spark batch workload The following example specifies a connector jar file (see the GoogleCloudDataproc/spark-bigquery-connector repository on GitHub for a list of available connector jar files).- Google Cloud CLI example:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Google Cloud CLI example:
- Include the connector jar file in your Spark application as a dependency (see Compiling against the connector)
Calculate costs
This tutorial uses billable components of Google Cloud, including:
- Dataproc Serverless
- BigQuery
- Cloud Storage
Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users may be eligible for a free trial.
BigQuery I/O
This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.
The connector writes the wordcount output to BigQuery as follows:
Buffering the data into temporary files in your Cloud Storage bucket
Copying the data in one operation from your Cloud Storage bucket into BigQuery
Deleting the temporary files in Cloud Storage after the BigQuery load operation completes (temporary files are also deleted after the Spark application terminates). If deletion fails, you will need to delete any unwanted temporary Cloud Storage files, which typically are placed in
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Configure billing
By default, the project associated with the credentials or service account is
billed for API usage. To bill a different project, set the following
configuration: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
You can also add to a read or write operation, as follows:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Submit a PySpark wordcount batch workload
Run a Spark batch workload that counts the number of words in a public dataset.
- Open a local terminal or Cloud Shell
- Create the
wordcount_dataset
with the bq command-line tool in a local terminal or in Cloud Shell.bq mk wordcount_dataset
- Create a Cloud Storage bucket with the
Google Cloud CLI.
gcloud storage buckets create gs://YOUR_BUCKET
ReplaceYOUR_BUCKET
with the name of the Cloud Storage bucket you created. - Create the file
wordcount.py
locally in a text editor by copying the following PySpark code.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Submit the PySpark batch workload:
gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
Sample terminal output:... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
To preview the output table in the Google Cloud console, open your project's BigQuery page, select thewordcount_output
table, and then click Preview.
For more information
- BigQuery Storage & Spark SQL - Python
- Creating a table definition file for an external data source
- Use externally partitioned data