Bigtable を使用した Hadoop MapReduce ジョブ

この例では、Hadoop を使用して、テキスト ファイル内に出現する単語の数をカウントする簡単な MapReduce ジョブを実行します。この MapReduce ジョブでは、Bigtable を使用して、マップ オペレーションの結果を格納します。この例のコードは、GitHub リポジトリ GoogleCloudPlatform/cloud-bigtable-examplesjava/dataproc-wordcount ディレクトリにあります。

認証の設定

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

詳細については Set up authentication for a local development environment をご覧ください。

コードサンプルの概要

このコードサンプルは、1 つ以上のテキスト ファイルと表の名前を入力として受け取るコマンドライン インターフェースを提供し、そのファイルに現れるすべての単語を見つけ、各単語の出現回数をカウントします。MapReduce ロジックは WordCountHBase クラスに記述されています。

最初に、マッパーがテキスト ファイルの内容をトークン化し、Key-Value ペアを生成します。キーはテキスト ファイル内の単語で、値は 1 です。

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

次に、レデューサが各キーの値を合計し、その結果を、指定した Bigtable テーブルに書き込みます。各行のキーは、テキスト ファイルから取り出した単語です。各行には cf:count 列があります。行キーがテキスト ファイルに出現する回数をこの列に格納します。

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}