Bigtable を使用した Hadoop MapReduce ジョブ

この例では、Hadoop を使用して、テキスト ファイル内に出現する単語の数をカウントする簡単な MapReduce ジョブを実行します。この MapReduce ジョブでは、Bigtable を使用して、マップ オペレーションの結果を格納します。この例のコードは、GitHub リポジトリ GoogleCloudPlatform/cloud-bigtable-examplesjava/dataproc-wordcount ディレクトリにあります。

認証を設定する

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

    Google Cloud CLI をインストールします。

    外部 ID プロバイダ(IdP)を使用している場合は、まずフェデレーション ID を使用して gcloud CLI にログインする必要があります。

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

詳細については Set up authentication for a local development environment をご覧ください。

コードサンプルの概要

このコードサンプルは、1 つ以上のテキスト ファイルと表の名前を入力として受け取るコマンドライン インターフェースを提供し、そのファイルに現れるすべての単語を見つけ、各単語の出現回数をカウントします。MapReduce ロジックは WordCountHBase クラスに記述されています。

最初に、マッパーがテキスト ファイルの内容をトークン化し、Key-Value ペアを生成します。キーはテキスト ファイル内の単語で、値は 1 です。

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

次に、レデューサが各キーの値を合計し、その結果を、指定した Bigtable テーブルに書き込みます。各行のキーは、テキスト ファイルから取り出した単語です。各行には cf:count 列があります。行キーがテキスト ファイルに出現する回数をこの列に格納します。

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}