Hide
Hadoop on Google Cloud Platform

Writing a MapReduce Job with the BigQuery Connector

Contents

BigQueryInputFormat class

The input format provides Hadoop with the appropriate BigQuery objects accessible in a JsonObject format.

There are three main operations of the BigQueryInputFormat class.

  • Using a user-specified query to select the appropriate BigQuery objects
  • Splitting the results of the query evenly among the Hadoop nodes
  • Parsing the splits into java objects to pass to the mapper. The Hadoop Mapper class receives a JsonObject representation of each selected BigQuery object.

The input format provides access to BigQuery records through an extension of the Hadoop InputFormat class. In order to configure the input format correctly, a few lines must be added to the main Hadoop job. In particular, several parameters must be set in the Hadoop configuration and the InputFormat class must be set to BigQueryInputFormat. Below is an example of the parameters that need to be set and the lines of code that need to be inserted to correctly configure the input format.

Input Parameters

fullyQualifiedInputTableId
The BigQuery table to read from, in the form [optional projectId]:[datasetId].[tableId]. Example: "publicdata:samples.shakespeare"
projectId
The BigQuery projectId under which all of the input operations occur. Example: "my-first-cloud-project"
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

...

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId);

// Set InputFormat.
job.setInputFormatClass(BigQueryInputFormat.class);

In the above code, job refers to the org.apache.hadoop.mapreduce.Job that represents the Hadoop job to run. conf refers to the org.apache.hadoop.Configuration that is the configuration of the Hadoop job to run.

When the job is complete, call BigQueryInputFormat.cleanupJob(job) to delete the temporary Google Cloud Storage files created while reading.

...

job.waitForCompletion(true);

// Make sure to clean up the Google Cloud Storage export paths if desired, and possibly an
// intermediate input table if we did sharded export and thus didn't clean it up at setup time.
BigQueryInputFormat.cleanupJob(job);

Mapper

The BigQueryInputFormat class reads from BigQuery. It passes the BigQuery objects one at a time as input to the Hadoop Mapper function. The inputs take the form of a LongWritable and a JsonObject. The long writable tracks the record number. The JsonObject contains the json formatted BigQuery record. The Mapper should be able to accept the LongWritable and JsonObject pair as input. An example of a Mapper for a WordCount job is shown below.

  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";


  /**
   * The mapper function for word count.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable oneWritable = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      wordKey = context.getConfiguration().get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        context.write(word, oneWritable);
      }
    }
  }

BigQueryOutputFormat class

The output format provides Hadoop with the ability to write JsonObject values directly into a BigQuery table. The BigQueryOutputFormat provides access to BigQuery records through an extension of the Hadoop OutputFormat class. In order to configure the output format correctly, a few lines must be added to the main Hadoop job. In particular, several parameters must be set in the Hadoop configuration and the OutputFormat class must be set to BigQueryOutputFormat. Below is an example of the parameters that need to be set and the lines of code that need to be inserted to correctly configure the output format.

Output Parameters

projectId
The BigQuery projectId under which all of the output operations occur. Example: "my-first-cloud-project"
fullyQualifiedOutputTableId
The BigQuery dataset to write the final job results to, of the form [optional projectId]:[datasetId].[tableId]. The datasetId should already be present in your project. [outputDatasetId]_hadoop_temporary dataset will be created in BigQuery for temporary results. Make sure this does not conflict with an existing dataset. Example: "test_output_dataset.wordcount_output" or "my-first-cloud-project:test_output_dataset.wordcount_output"
outputTableSchema
A description of the output jsonObjects in a BigQuery table schema format. Example: "[{'name': 'Name','type': 'STRING'},{'name': 'Number','type': 'INTEGER'}]";
// Configure output parameters
String outputTableSchema =
    "[{'name': 'Word','type': 'STRING'},{'name': 'Number','type': 'INTEGER'}]";
BigQueryConfiguration.configureBigQueryOutput(
    conf, fullyQualifiedOutputTableId, outputTableSchema);

// Set OutputFormat.
job.setOutputFormatClass(BigQueryOutputFormat.class);

Reducer

The BigQueryOutputFormat class writes to BigQuery. It takes a key and a JsonObject value as input and writes ONLY the JsonObject value to BigQuery. The JsonObject should contain a json formatted BigQuery record. The Reducer should output a key (of any type) and JsonObject value pair. Note that the key is ignored and only the value is written to BigQuery. An example of a Reducer for a WordCount job is shown below.

In the following code, job refers to the org.apache.hadoop.mapreduce.Job that represents the Hadoop job to run. conf is an org.apache.hadoop.Configuration that specifies configuration of the Hadoop job to run.

  /**
   * Reducer function for word count.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, Text, JsonObject> {
    private static final Text dummyText = new Text("ignored");

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }
      jsonObject.addProperty("Number", count);
      // Key does not matter.
      context.write(dummyText, jsonObject);
    }
  }

Complete Code for a sample WordCount job

The code below is an example of a simple WordCount job that aggregates word counts from objects in BigQuery. To see the script to run this code, go to Running with the BigQuery connector for Hadoop.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {
  // Logger.
  protected static final Logger LOG = LoggerFactory.getLogger(WordCount.class);

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for word count.
   */
  public static class Map
      extends Mapper  {
    private static final LongWritable oneWritable = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in the map task.
      wordKey = context.getConfiguration().get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        context.write(word, oneWritable);
      }
    }
  }

  /**
   * Reducer function for word count.
   */
  public static class Reduce
      extends Reducer {
    private static final Text dummyText = new Text("ignored");

    @Override
    public void reduce(Text key, Iterable values, Context context)
        throws IOException, InterruptedException {
      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(dummyText, jsonObject);
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 4 parameters
   *
   * @param args a String[] containing projectId, fullyQualifiedInputTableId,
   *     and fullyQualifiedOutputTableId.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    if (args.length != 4) {
      System.out.println("Usage: hadoop jar bigquery_wordcount.jar "
          + "[projectId] [fullyQualifiedInputTableId] [fieldName] [fullyQualifiedOutputTableId]");
      String indent = "    ";
      System.out.println(indent
          + "projectId - Project under which to issue the BigQuery operations. "
          + "Also serves as the default project for table IDs which don't explicitly specify a "
          + "project for the table.");
      System.out.println(indent
          + "fullyQualifiedInputTableId - Input table ID of the form "
          + ":.");
      System.out.println(indent
          + "fieldName - Name of the field to count in the input table, e.g. 'word' in "
          + "publicdata:samples.shakespeare or 'repository_name' in "
          + "publicdata:samples.github_timeline.");
      System.out.println(indent
          + "fullyQualifiedOutputTableId - Output table ID of the form "
          + ":.");
      System.exit(1);
    }

    // Global parameters from args.
    String projectId = args[0];

    // Set InputFormat parameters from args.
    String fullyQualifiedInputTableId = args[1];
    String fieldName = args[2];

    // Set OutputFormat parameters from args.
    String fullyQualifiedOutputTableId = args[3];

    // Default OutputFormat parameters for this sample.
    String outputTableSchema =
        "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]";
    String jobName = "wordcount";

    JobConf conf = new JobConf(parser.getConfiguration(), WordCount.class);

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Make sure the required export-bucket setting is present.
    if (isStringNullOrEmpty(conf.get(BigQueryConfiguration.GCS_BUCKET_KEY))) {
      LOG.warn("Missing config for '{}'; trying to default to fs.gs.system.bucket.",
               BigQueryConfiguration.GCS_BUCKET_KEY);
      String systemBucket = conf.get("fs.gs.system.bucket");
      if (isStringNullOrEmpty(systemBucket)) {
        LOG.error("Also missing fs.gs.system.bucket; value must be specified.");
        System.exit(1);
      } else {
        LOG.info("Setting '{}' to '{}'", BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket);
        conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket);
      }
    } else {
      LOG.info("Using export bucket '{}' as specified in '{}'",
          conf.get(BigQueryConfiguration.GCS_BUCKET_KEY), BigQueryConfiguration.GCS_BUCKET_KEY);
    }

    // Configure input and output for BigQuery access.
    BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId);
    BigQueryConfiguration.configureBigQueryOutput(
        conf, fullyQualifiedOutputTableId, outputTableSchema);

    // Set the field we're querying for the count.
    LOG.info("Setting '{}' to '{}'", WORDCOUNT_WORD_FIELDNAME_KEY, fieldName);
    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, fieldName);

    Job job = new Job(conf, jobName);
    job.setJarByClass(WordCount.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    // Set input and output classes.
    job.setInputFormatClass(GsonBigQueryInputFormat.class);
    job.setOutputFormatClass(BigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // Make sure to clean up the Google Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job);
  }
}