Hadoop MapReduce job with Bigtable

This example uses Hadoop to perform a simple MapReduce job that counts the number of times a word appears in a text file. The MapReduce job uses Bigtable to store the results of the map operation. The code for this example is in the GitHub repository GoogleCloudPlatform/cloud-bigtable-examples, in the directory java/dataproc-wordcount.

Set up authentication

To use the Java samples on this page from a local development environment, install and initialize the gcloud CLI, and then set up Application Default Credentials with your user credentials.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create local authentication credentials for your Google Account:

    gcloud auth application-default login

For more information, see Set up authentication for a local development environment.

Overview of the code sample

The code sample provides a simple command-line interface that takes one or more text files and a table name as input, finds all of the words that appear in the file, and counts how many times each word appears. The MapReduce logic appears in the WordCountHBase class.

First, a mapper tokenizes the text file's contents and generates key-value pairs, where the key is a word from the text file and the value is 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

A reducer then sums the values for each key and writes the results to a Bigtable table that you specified. Each row key is a word from the text file. Each row contains a cf:count column, which contains the number of times the row key appears in the text file.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}