Ejemplo: Trabajo de Hadoop MapReduce con Cloud Bigtable

En este ejemplo, se usa Hadoop para ejecutar un trabajo simple de MapReduce que cuente la cantidad de veces que una palabra aparece en un archivo. El trabajo MapReduce usa Cloud Bigtable para almacenar los resultados de la operación de asignación. El código de este ejemplo se encuentra en el repositorio de GitHub GoogleCloudPlatform/cloud-bigtable-examples, en el directorio java/dataproc-wordcount.

Descripción general de la muestra de código

La muestra de código proporciona una interfaz simple de la línea de comandos que toma uno o más archivos de texto y un nombre de tabla como entradas, busca todas las palabras que aparecen en el archivo y cuenta cuántas veces aparece cada una. La lógica de MapReduce aparece en la clase WordCountHBase.

En primer lugar, un asignador convierte en token el contenido del archivo de texto y genera pares clave-valor, en los que la clave es una palabra del archivo de texto y el valor es 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

Luego, un reductor suma los valores de cada clave y escribe los resultados en una tabla de Cloud Bigtable que especifiques. Cada clave de fila es una palabra del archivo de texto. Cada fila contiene una columna cf:count, que indica la cantidad de veces que una clave de fila aparece en el archivo de texto.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

Documentación de Cloud Bigtable