将 Hadoop MapReduce 作业与 Bigtable 搭配使用

此示例使用 Hadoop 执行简单的 MapReduce 作业,该作业用于统计某一单词在文本文件中出现的次数。MapReduce 作业 使用 Bigtable 存储映射操作的结果。该示例的代码位于 GitHub 代码库 GoogleCloudPlatform/cloud-bigtable-examplesjava/dataproc-wordcount 目录中。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment

代码示例概览

代码示例提供了一个简单的命令行界面,该界面使用一个或多个文本文件和一个表名作为输入,查找文件中出现的所有单词,并统计每个单词出现的次数。MapReduce 逻辑显示在 WordCountHBase中。

首先,映射器将文本文件的内容令牌化并生成键值对,其中键为文本文件中的一个单词,值为 1

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

然后,缩减器对每个键的值进行求和,并将所得结果写入您指定的 Bigtable 表中。每个行键对应文本文件中的一个字词。每行都有一个 cf:count 列,其中包含该行键在文本文件中出现的次数。

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}