使用 Bigtable 執行 Hadoop MapReduce 工作

本範例使用 Hadoop 執行簡單的 MapReduce 工作,以計算文字檔中某個字詞的出現次數。MapReduce 工作會使用 Bigtable 儲存對應作業的結果。這個範例的程式碼位於 GitHub 存放區 GoogleCloudPlatform/cloud-bigtable-examplesjava/dataproc-wordcount 目錄中。

設定驗證方法

如要在本機開發環境中使用本頁的 Java 範例,請安裝並初始化 gcloud CLI,然後使用使用者憑證設定應用程式預設憑證。

    安裝 Google Cloud CLI。

    如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

詳情請參閱 Set up authentication for a local development environment

程式碼範例總覽

這個程式碼範例提供簡易的指令列介面,可接受一或多個文字檔和一個資料表名稱做為輸入,並找出檔案中出現的所有單字,以及計算每個單字的出現次數。MapReduce 邏輯會出現在 WordCountHBase 類別中。

首先,對應程式會將文字檔的內容代碼化並產生鍵/值組合,其中鍵為文字檔中的單字,值則為 1

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

然後,縮減器會加總每個鍵的值,並將結果寫入您指定的 Bigtable 資料表。每一資料列索引鍵均為文字檔中的一個字。每一資料列包含一個 cf:count 欄位,其中包含資料列索引鍵在文字檔中出現的次數。

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}