Use the BigQuery connector with Spark

The BigQuery connector can be used with Apache Spark to read and write data from/to BigQuery. This page provides example code that uses the BigQuery connector with Spark. See the Cloud Dataproc Quickstarts for instructions on creating a cluster.

Reading and writing data from BigQuery

This example reads data from BigQuery into Spark to perform a word count using SparkContext.newAPIHadoopRDD (see the Spark documentation for more information). It writes the data back to BigQuery using PairRDDFunctions.saveAsNewAPIHadoopDataset.

Before trying this example, either create a dataset named "wordcount_dataset" or change the output dataset in the code to an existing BigQuery dataset in your Google Cloud Platform project. Here is the bq command to create the dataset:

bq mk wordcount_dataset

Scala

Examine the code

IndirectBigQueryOutputFormat provides Hadoop with the ability to write JsonObject values directly into a BigQuery table. This class provides access to BigQuery records through an extension of the Hadoop OutputFormat class. To use it correctly, several parameters must be set in the Hadoop configuration, and the OutputFormat class must be set to IndirectBigQueryOutputFormat. Below is an example of the parameters to set and the lines of code needed to correctly use IndirectBigQueryOutputFormat.
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

// Assumes you have a spark context (sc) -- running from spark-shell REPL.
// Marked as transient since configuration is not Serializable. This should
// only be necessary in spark-shell REPL.
@transient
val conf = sc.hadoopConfiguration

// Input parameters.
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")

// Input configuration.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)

// Output parameters.
val outputTableId = projectId + ":wordcount_dataset.wordcount_output"
// Temp output bucket that is deleted upon completion of job.
val outputGcsPath = ("gs://" + bucket + "/hadoop/tmp/bigquery/wordcountoutput")

// Output configuration.
// Let BigQuery auto-detect output schema (set to null below).
BigQueryOutputConfiguration.configureWithAutoSchema(
    conf,
    outputTableId,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
    conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

conf.set("mapreduce.job.outputformat.class",
         classOf[IndirectBigQueryOutputFormat[_,_]].getName)

// Truncate the table before writing output to allow multiple runs.
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
         "WRITE_TRUNCATE")

// Helper to convert JsonObjects to (word, count) tuples.
def convertToTuple(record: JsonObject) : (String, Long) = {
  val word = record.get("word").getAsString.toLowerCase
  val count = record.get("word_count").getAsLong
  return (word, count)
}

// Helper to convert (word, count) tuples to JsonObjects.
def convertToJson(pair: (String, Long)) : JsonObject = {
  val word = pair._1
  val count = pair._2
  val jsonObject = new JsonObject()
  jsonObject.addProperty("word", word)
  jsonObject.addProperty("word_count", count)
  return jsonObject
}

// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
    conf,
    classOf[GsonBigQueryInputFormat],
    classOf[LongWritable],
    classOf[JsonObject])

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Display 10 results.
wordCounts.take(10).foreach(l => println(l))

// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
    .map(pair => (null, convertToJson(pair)))
    .saveAsNewAPIHadoopDataset(conf))

Run the code on your cluster

  1. SSH into the Cloud Dataproc cluster's master node
    1. Go to your project's Cloud Dataproc Clusters page in the GCP Console, then click on the name of your cluster
    2. On the cluster detail page, select the VM Instances tab, then click the SSH selection that appears to the right of the name of your cluster's master node

      A browser window opens at your home directory on the master node
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Create wordcount.scala with the pre-installed vi, vim, or nano text editor, then paste in the Scala code from the Scala code listing
    nano wordcount.scala
      
  3. Launch the spark-shell REPL (remove jars parameter if using Cloud Dataproc 1.0-1.2).
    $ spark-shell --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  4. Run wordcount.scala with the :load wordcount.scala command to create the BigQuery wordcount_output table. The output listing displays 10 lines from the wordcount output.
    :load wordcount.scala
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    To preview the output table, open your project's BigQuery page, select the wordcount_output table, and then click Preview.

PySpark

Examine the code

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

Run the code on your cluster

  1. SSH into the Cloud Dataproc cluster's master node
    1. Go to your project's Cloud Dataproc Clusters page in the GCP Console, then click on the name of your cluster
    2. On the cluster detail page, select the VM Instances tab, then click the SSH selection that appears to the right of the name of your cluster's master node

      A browser window opens at your home directory on the master node
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Create wordcount.py with the pre-installed vi, vim, or nano text editor, then paste in the PySpark code from the PySpark code listing
    nano wordcount.py
      
  3. Run wordcount with spark-submit to create the BigQuery wordcount_output table. The output listing displays 10 lines from the wordcount output.
    spark-submit wordcount.py
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    To preview the output table, open your project's BigQuery page, select the wordcount_output table, and then click Preview.
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataproc Documentation