Job de MapReduce do Hadoop com o Bigtable

Neste exemplo, usamos o Hadoop para executar um job de MapReduce simples que conta o número de vezes que uma palavra aparece em um arquivo de texto. O job do MapReduce usa o Bigtable para armazenar os resultados da operação de mapeamento. O código deste exemplo está no repositório do GitHub GoogleCloudPlatform/cloud-bigtable-examples, no diretório java/dataproc-wordcount.

Configurar a autenticação

Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize o gcloud CLI e e configure o Application Default Credentials com suas credenciais de usuário.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Confira mais informações em Set up authentication for a local development environment.

Visão geral do exemplo de código

O exemplo de código oferece uma interface da linha de comando simples que usa um ou mais arquivos de texto e um nome de tabela como entrada, localiza todas as palavras que aparecem no arquivo e conta quantas vezes cada palavra aparece. A lógica do MapReduce aparece na classe WordCountHBase.

Primeiro, um mapeador tokeniza o conteúdo do arquivo de texto e gera pares de chave-valor, em que a chave é uma palavra do arquivo de texto e o valor é 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

Com um redutor, são somados os valores para cada chave e gravados os resultados em uma tabela do Cloud Bigtable que você especificou. Cada chave de linha é uma palavra do arquivo de texto. Cada linha contém uma coluna cf:count, que contém o número de vezes que a chave de linha aparece no arquivo de texto.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}