Job de MapReduce do Hadoop com o Bigtable

Neste exemplo, usamos o Hadoop para executar um job de MapReduce simples que conta o número de vezes que uma palavra aparece em um arquivo de texto. O job de MapReduce usa o Bigtable para armazenar os resultados da operação de mapeamento. O código deste exemplo está no repositório do GitHub GoogleCloudPlatform/cloud-bigtable-examples, no diretório java/dataproc-wordcount.

Configurar a autenticação

Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e configure o Application Default Credentials com suas credenciais de usuário.

    Instale a CLI do Google Cloud.

    Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Confira mais informações em Set up authentication for a local development environment.

Visão geral do exemplo de código

O exemplo de código oferece uma interface da linha de comando simples que usa um ou mais arquivos de texto e um nome de tabela como entrada, localiza todas as palavras que aparecem no arquivo e conta quantas vezes cada palavra aparece. A lógica do MapReduce aparece na classe WordCountHBase.

Primeiro, um mapeador tokeniza o conteúdo do arquivo de texto e gera pares de chave-valor, em que a chave é uma palavra do arquivo de texto e o valor é 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

Com um redutor, são somados os valores para cada chave e gravados os resultados em uma tabela do Cloud Bigtable que você especificou. Cada chave de linha é uma palavra do arquivo de texto. Cada linha contém uma coluna cf:count, que contém o número de vezes que a chave de linha aparece no arquivo de texto.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}