Example: Hadoop MapReduce job with Bigtable

This example uses Hadoop to perform a simple MapReduce job that counts the number of times a word appears in a text file. The MapReduce job uses Bigtable to store the results of the map operation. The code for this example is in the GitHub repository GoogleCloudPlatform/cloud-bigtable-examples, in the directory java/dataproc-wordcount.

Overview of the code sample

The code sample provides a simple command-line interface that takes one or more text files and a table name as input, finds all of the words that appear in the file, and counts how many times each word appears. The MapReduce logic appears in the WordCountHBase class.

First, a mapper tokenizes the text file's contents and generates key-value pairs, where the key is a word from the text file and the value is 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

A reducer then sums the values for each key and writes the results to a Bigtable table that you specified. Each row key is a word from the text file. Each row contains a cf:count column, which contains the number of times the row key appears in the text file.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}