Hide
Hadoop on Google Cloud Platform

Writing a MapReduce Job with the Datastore Connector

Contents

  1. DatastoreHadoopInputFormat class
  2. Input Parameters
  3. Mapper
  4. DatastoreHadoopOutputFormat class
  5. Output Parameters
  6. Reducer
  7. Complete Code for a sample WordCount job

DatastoreHadoopInputFormat class

The input connector provides Hadoop with the appropriate Datastore entities accessible in a wrapper of the Datastore Key and Entity classes.

There are three main operations of the Datastore input connector for Hadoop.

  • Using a user-specified query to select the appropriate Datastore entities,
  • Splitting the results of the query evenly among the Hadoop nodes, and
  • Parsing the splits into Java objects to pass to the mapper. The Hadoop Mapper class receives a DatastoreKey and DatastoreEntity instance for each selected Datastore entity.

The Datastore input connector for Hadoop provides access to Datastore entities through an extension of the Hadoop InputFormat class. In order to configure the input connector correctly, a few lines must be added to the main Hadoop job. In particular, several parameters must be set in the Hadoop configuration and the InputFormat class must be set to DatastoreHadoopInputFormat. Below is an example of the parameters that need to be set and the lines of code that need to be inserted to correctly configure the input connector.

Input Parameters

job
The org.apache.hadoop.mapreduce.Job that represents the Hadoop job to run.
query
The Datastore query that specifies the input to run the job over. Example: Query.newBuilder()
datasetId
The Datastore datasetId under which all of the input connector operations occur. Example: "sample-dataset-id"

Note that your datasetId will generally be the same as your Google Cloud Project Id. For a more detailed description of using datasetIds see here.

// Set the input for the job.
DatastoreHadoopInputFormat.setInput(job, query, datasetId);

// Set InputFormat.
job.setInputFormatClass(DatastoreHadoopInputFormat.class);

Mapper

The Datastore input connector for Hadoop reads from Datastore. It passes the Datastore entities one at a time as input to the Hadoop Mapper function. The inputs take the form of a key value pair: DatastoreKey and DatastoreEntity. Note that DatastoreKey and DatastoreEntity wrap the classes Key and Entity classes Datastore. To get the Entity call get() on the DatastoreEntity. To get the Key call get() on the DatastoreKey. The Mapper should be able to accept the DatastoreKey and DatastoreEntity pair as input. An example of a Mapper for a WordCount job is shown below.

  /**
   * The mapper function for word count.
   */
  public static class Map extends Mapper<DatastoreKey, DatastoreEntity, Text, IntWritable> {
    @Override
    public void map(DatastoreKey key, DatastoreEntity value, Context context)
        throws IOException, InterruptedException {
      // Get the "word" property of the entity.
      for (Property prop : value.get().getPropertyList()) {
        if (prop.getName().equals("word")) {
          // Get the line of text from the entity.
          String line = prop.getValue().getStringValue();
          // Iterate over the word in the line.
          String[] tokenizer = line.split(" ");
          for(String token:tokenizer) {
            // For each word, add it to the ouput of the mapper.
            Text word = new Text();
            word.set(token.replaceAll("[^A-Za-z]", "").toLowerCase());
            context.write(word, new IntWritable(1));
          }
        }
      }
    }
  }

DatastoreHadoopOutputFormat class

The output connector provides Hadoop with the ability to write Datastore entities directly into a Datastore dataset. The Datastore output connector for Hadoop provides access to Datastore entities through an extension of the Hadoop OutputFormat class. In order to configure the output connector correctly, a few lines must be added to the main Hadoop job. In particular, several parameters must be set in the Hadoop configuration and the OutputFormat class must be set to DatastoreHadoopOutputFormat. Below is an example of the parameters that need to be set and the lines of code that need to be inserted to correctly configure the output connector.

Output Parameters

datasetId
The Datastore datasetId under which all of the input connector operations occur. Example: "sample-dataset-id"
numRecordsInBatch
The number of records to write to Datastore in each batch. Example: "1000"

Note that your datasetId will generally be the same as your Google Cloud Project Id. For a more detailed description of using datasetIds see here.

// Configure output connector parameters
DatastoreHadoopOutputFormat.setOutputSpecs(job, datasetId, numRecordsInBatch);

// Set OutputFormat.
job.setOutputFormatClass(BigQueryOutputFormat.class);

In the above code, "job" refers to the org.apache.hadoop.mapreduce.Job that represents the Hadoop job to run.

Reducer

The Datastore output connector for Hadoop writes to Datastore. It takes a DatastoreKey and DatastoreEntity as input and writes ONLY the DatastoreEntity to Datastore. DatatoreKey should contain a Datastore Key (you can build a DatastoreKey from a Key k by calling new DatastoreKey(k). DatatoreEntity should contain a Datastore Entity (you can build a DatastoreEntity from an Entity e by calling new DatastoreEntity(e). The Reducer should output a DatastoreKey and DatastoreEntity pair. Note that the DatastoreKey is ignored and only the DatastoreEntity is written to Datastore (the DatastoreEntity should already contain its own Key). An example of a Reducer for a WordCount job is shown below.

  /**
   * Reducer function for word count
   */
  public static class Reduce extends Reducer<Text, IntWritable, DatastoreKey, DatastoreEntity> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
      // Count the total times a word appears.
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      // Create a new DatastoreEntity to be added.
      if(!key.toString().isEmpty()){
      // Create the Path Element of the Entity.
      PathElement.Builder p = PathElement.newBuilder();
      p.setKind("wordcount");
      p.setName(key.toString());

      // Create the Key of the Entity.
      Key.Builder k = Key.newBuilder();
      k.addPathElement(p);

      // Create the Entity
      Entity.Builder e = Entity.newBuilder();
      e.addProperty(makeProperty("count", makeValue(sum)));
      e.addProperty(makeProperty("word", makeValue(key.toString())));
      e.setKey(k);

      // Wrap the Entity as a DatastoreEntity and add to output.
      context.write(new DatastoreKey(), new DatastoreEntity(e.build()));
      }
    }
  }

Complete Code for a sample WordCount job

The code below is an example of a simple WordCount job that aggregates word counts from entities in Cloud Datstore. To see the script to run this code, go to Running with the Datastore connector for Hadoop.

package com.google.cloud.hadoop.io.datastore.samples;

import static com.google.api.services.datastore.client.DatastoreHelper.KEY_PROPERTY_NAME;
import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
import static com.google.api.services.datastore.client.DatastoreHelper.makeProperty;
import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;

import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.api.services.datastore.DatastoreV1.Key;
import com.google.api.services.datastore.DatastoreV1.Key.PathElement;
import com.google.api.services.datastore.DatastoreV1.KindExpression;
import com.google.api.services.datastore.DatastoreV1.Property;
import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
import com.google.api.services.datastore.DatastoreV1.Query;
import com.google.cloud.hadoop.io.datastore.DatastoreEntity;
import com.google.cloud.hadoop.io.datastore.DatastoreHadoopInputFormat;
import com.google.cloud.hadoop.io.datastore.DatastoreHadoopOutputFormat;
import com.google.cloud.hadoop.io.datastore.DatastoreKey;
import com.google.protobuf.TextFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * Sample program to run the Hadoop Wordcount example
 */
public class WordCount {
  // The kind of output entities for wordcount.
  static final String OUTPUT_KIND_NAME = "mapred.datastore.samples.output.kind";

  /**
   * The mapper function for word count.
   */
  public static class Map extends Mapper<DatastoreKey, DatastoreEntity, Text, IntWritable> {
    @Override
    public void map(DatastoreKey key, DatastoreEntity value, Context context) throws IOException,
        InterruptedException {
      // Iterate over Entity properties.
      for (Property prop : value.get().getPropertyList()) {
        // If Entity has a property line.
        if (prop.getName().equals("line")) {
          // Split line into words.
          String line = prop.getValue().getStringValue();
          String[] tokenizer = line.split(" ");
          for (String token : tokenizer) {
            Text word = new Text();
            word.set(token.replaceAll("[^A-Za-z]", "").toLowerCase());
            // Output each word and a count of 1.
            context.write(word, new IntWritable(1));
          }
        }
      }
    }
  }

  /**
   * Reducer function for word count.
   */
  public static class Reduce extends Reducer<Text, IntWritable, DatastoreKey, DatastoreEntity> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
        InterruptedException {
      // Get total count for word.
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      // If word is not empty.
      if (!key.toString().isEmpty()) {
        // Create Entity for each word containing the count and the word.
        PathElement.Builder p = PathElement.newBuilder();
        p.setKind(context.getConfiguration().get(OUTPUT_KIND_NAME));
        p.setName(key.toString());
        Key.Builder k = Key.newBuilder();
        k.addPathElement(p);
        Entity.Builder e = Entity.newBuilder();
        e.addProperty(makeProperty("count", makeValue(sum)));
        e.addProperty(makeProperty("word", makeValue(key.toString())));
        e.setKey(k);
        // Write Entity to output.
        context.write(new DatastoreKey(), new DatastoreEntity(e.build()));
      }
    }
  }

  // Print a usage statement and exit.
  private static void printUsageAndExit() {
    System.out.print(
        "Usage: hadoop jar datastore_wordcount.jar [datasetId] [inputKindName] [outputKindName]"
            + " [jobName].  " + "Please enter all parameters");
    System.exit(1);
  }

  /**
   * Configures and runs a WordCount job over the Cloud Datastore connector.
   *
   * @param args a String[] containing your datasetId
   */
  public static void main(String[] args) throws Exception {
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Check all args entered.
    if (args.length != 4) {
      printUsageAndExit();
    }

    // Set parameters from args.
    String datasetId = args[0];
    String inputKindName = args[1];
    String outputKindName = args[2];
    String jobName = args[3];

    // Configure Map Reduce for WordCount job.
    JobConf conf = new JobConf(parser.getConfiguration(), WordCount.class);
    conf.set(OUTPUT_KIND_NAME, outputKindName);
    Job job = new Job(conf, jobName);
    job.setJarByClass(WordCount.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    // Set input and output classes.
    job.setInputFormatClass(DatastoreHadoopInputFormat.class);
    job.setOutputFormatClass(DatastoreHadoopOutputFormat.class);

    // Set WordCount query.
    Query.Builder q = Query.newBuilder();
    KindExpression.Builder kind = KindExpression.newBuilder();
    kind.setName(inputKindName);
    q.addKind(kind);
    q.setFilter(makeFilter(KEY_PROPERTY_NAME, PropertyFilter.Operator.HAS_ANCESTOR,
        makeValue(makeKey(inputKindName, WordCountSetUp.ANCESTOR_ENTITY_VALUE))));
    String query = TextFormat.printToString(q);

    // Set parameters for DatastoreHadoopInputFormat.
    DatastoreHadoopInputFormat.setInput(job, query, datasetId);

    // Set parameters for DatastoreHadoopInputFormat.
    String numEntitiesInBatch = "100";
    DatastoreHadoopOutputFormat.setOutputSpecs(job, datasetId, numEntitiesInBatch);

    job.waitForCompletion(true);
  }
}