Use the BigQuery connector with Spark

The spark-bigquery-connector is used with Apache Spark to read and write data from and to BigQuery. This tutorial provides example code that uses the spark-bigquery-connector within a Spark application. For instructions on creating a cluster, see the Dataproc Quickstarts .

Providing the connector to your application

The spark-bigquery-connector must be available to your application at runtime. This can be accomplished in one of the following ways:

  • Install the connector in the Spark jars directory.
  • Add the connector at runtime using the --jars parameter, which can be used with the Dataproc API or spark-submit
  • Include the jar in your Scala or Java Spark application as a dependency (see Compiling against the connector)

If the connector is not available at runtime, a ClassNotFoundException is thrown.

Calculating costs

This tutorial uses billable components of Google Cloud Platform, including:

  • Dataproc
  • BigQuery
  • Cloud Storage

Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users may be eligible for a free trial.

Reading and writing data from BigQuery

This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.

The connector writes the data to BigQuery by first buffering all the data into a Cloud Storage temporary table, and then it copies all data from into BigQuery in one operation. The connector attempts to delete the temporary files once the BigQuery load operation has succeeded and once again when the Spark application terminates. If the job fails, you may need to manually remove any remaining temporary Cloud Storage files. Typically, you'll find temporary BigQuery exports in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Running the code

Before running this example, create a dataset named "wordcount_dataset" or change the output dataset in the code to an existing BigQuery dataset in your Google Cloud project.

Use the bq command to create the wordcount_dataset:

bq mk wordcount_dataset

Use the gsutil command to create a Cloud Storage bucket, which will be used to export to BigQuery:

gsutil mb gs://[bucket]

Scala

  1. Examine the code and replace the [bucket] placeholder with the Cloud Storage bucket you created above.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery.
    val wordsDF = spark.read.format("bigquery")
      .option("table","publicdata.samples.shakespeare")
      .load()
      .cache()
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save()
    
    
  2. Run the code on your cluster
    1. SSH into the Cloud Dataproc cluster's master node
      1. Go to your project's Cloud Dataproc Clusters page in the Cloud Console, then click on the name of your cluster
      2. On the cluster detail page, select the VM Instances tab, then click the SSH selection that appears to the right of the name of your cluster's master node

        A browser window opens at your home directory on the master node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create wordcount.scala with the pre-installed vi, vim, or nano text editor, then paste in the Scala code from the Scala code listing
      nano wordcount.scala
        
    3. Launch the spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Run wordcount.scala with the :load wordcount.scala command to create the BigQuery wordcount_output table. The output listing displays 20 lines from the wordcount output.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      To preview the output table, open your project's BigQuery page, select the wordcount_output table, and then click Preview.

PySpark

  1. Examine the code and replace the [bucket] placeholder with the Cloud Storage bucket you created above.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Run the code on your cluster
    1. SSH into the Cloud Dataproc cluster's master node
      1. Go to your project's Cloud Dataproc Clusters page in the Cloud Console, then click on the name of your cluster
      2. On the cluster detail page, select the VM Instances tab, then click the SSH selection that appears to the right of the name of your cluster's master node

        A browser window opens at your home directory on the master node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create wordcount.py with the pre-installed vi, vim, or nano text editor, then paste in the PySpark code from the PySpark code listing
      nano wordcount.py
      
    3. Run wordcount with spark-submit to create the BigQuery wordcount_output table. The output listing displays 20 lines from the wordcount output.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      To preview the output table, open your project's BigQuery page, select the wordcount_output table, and then click Preview.
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataproc Documentation
Need help? Visit our support page.