Exemple : Tâche MapReduce Hadoop avec Cloud Bigtable

Cet exemple utilise Hadoop pour effectuer une tâche MapReduce simple qui compte le nombre d'occurrences d'un mot dans un fichier texte. La tâche MapReduce utilise Cloud Bigtable pour stocker les résultats de l'opération de mappage. Le code de cet exemple se trouve dans le dépôt GitHub GoogleCloudPlatform/cloud-bigtable-examples, dans le répertoire java/dataproc-wordcount.

Présentation de l'exemple de code

L'exemple de code fournit une interface de ligne de commande simple qui utilise un ou plusieurs fichiers texte et un nom de table en entrée. Il trouve tous les mots figurant dans le fichier et compte le nombre d'occurrences de chacun. La logique MapReduce est incluse dans la classe WordCountHBase.

Tout d'abord, un mappeur segmente le contenu du fichier texte et génère des paires clé/valeur, où la clé est un mot du fichier texte et la valeur est 1 :

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

Un réducteur additionne ensuite les valeurs de chaque clé et écrit les résultats dans une table Cloud Bigtable que vous avez spécifiée. Chaque clé de ligne est un mot du fichier texte. Chaque ligne contient une colonne cf:count qui indique le nombre d'occurrences de la clé de ligne dans le fichier texte.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}