WordCount Example Pipeline

If you haven't already, please perform the steps in the Quickstart before continuing.

The WordCount examples demonstrate how to set up a processing pipeline that can read text, tokenize the text lines into individual words, and perform a frequency count on each of those words. The Dataflow SDKs contain a series of these four successively more detailed WordCount examples that build on each other. The input text for all the examples is a set of Shakespeare's texts.

Each WordCount example introduces different concepts in the Dataflow SDKs.

  • Minimal WordCount demonstrates the basic principles involved in building a Dataflow pipeline.
  • WordCount introduces some of the more common best practices in creating re-usable and maintainable pipelines.
  • Debugging WordCount introduces logging and debugging practices.
  • Windowed WordCount demonstrates how you can use Dataflow's programming model to handle both bounded and unbounded datasets.

Begin by understanding Minimal WordCount, the simplest of the examples. Once you feel comfortable with the basic principles in building a pipeline, continue on to learn best practices for writing Dataflow programs in WordCount. Then, read through Debugging WordCount to understand how to utilize common practices for logging and debugging. Lastly, learn how to use the same pattern of computation across both finite and infinite datasets in Windowed WordCount.

MinimalWordCount

Minimal WordCount demonstrates a simple pipeline that can read a block of text from a file in Google Cloud Storage, apply transforms to tokenize and count the words, and write the data to an output file in a Cloud Storage bucket. This example hard-codes the locations for its input and output files and doesn't perform any error checking; it is intended to show you only the "bare bones" of creating a Dataflow pipeline. In later examples, we will parameterize the pipeline's input and output sources and show other best practices.

Java

Key Concepts:
  1. Creating the Pipeline
  2. Applying transforms to the Pipeline
    • Reading input (in this example: reading text files)
    • Applying ParDo transforms
    • Applying SDK-provided transforms (in this example: Count)
    • Writing output (in this example: writing to Google Cloud Storage)
  3. Running the Pipeline

The following sections explain these concepts in detail along with excerpts of the relevant code from the Minimal WordCount pipeline.

Creating the Pipeline

The first step in creating a Cloud Dataflow pipeline is to create a Pipeline Options object. This object lets us set various options for our pipeline, such as the pipeline runner that will execute our pipeline, the ID of our project, and the staging location for the pipeline to store its files (used to make your jars accessible in the Cloud). In this example we set these options programmatically, but more often command-line arguments are used to set Pipeline options.

In our example, we specified BlockingDataflowPipelineRunner as the PipelineRunner, in order to have our pipeline executed in the cloud using the Google Cloud Dataflow service. There are other options you can set for executing your pipeline in the cloud. You can also omit this option completely, in which case the default runner will execute your pipeline locally. These are demonstrated in the next two WordCount examples, and are discussed in further detail in Specifying Execution Parameters.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

The next step is to create a Pipeline object with the options we've just constructed. The Pipeline object builds up the graph of transformations to be executed, associated with that particular pipeline.

Java

Pipeline p = Pipeline.create(options);

See Pipelines for a detailed discussion of the pipeline object and how it works.

Applying Pipeline Transforms

The Minimal WordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. Each transform represents an operation in the pipeline.

Each transform takes some kind of input (data or otherwise), and produces some output data. The input and output data is represented by the SDK class PCollection. PCollection is a special class provided by the Dataflow SDK that you can use to represent a data set of virtually any size, including infinite data sets.

Figure 1 shows the pipeline's data flow:

The pipeline uses a TextIO.Read transform to create a PCollection from data stored
              in an input data file; the CountWords transform produces a PCollection of word
              counts from the raw text PCollection; TextIO.Write writes the formatted word counts
              to an output data file.
Figure 1: The pipeline data flow.

The Minimal WordCount pipeline contains five transforms:

  1. A text file Read transform is applied to the Pipeline object itself, and produces a PCollection as output. Each element in the output PCollection represents one line of text from the input file.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. A ParDo transform that invokes a DoFn (defined in-line as an anonymous class) on each element that tokenizes the text lines into individual words. The input for this transform is the PCollection of text lines generated by the previous TextIO.Read transform. The ParDo transform outputs a new PCollection, where each element represents an individual word in the text.
  4. Java

    You can give your transform a transform name that will appear in the Dataflow Monitoring Interface, by using the .named() operation as we've done in this example. When the Dataflow service executes your pipeline, the monitoring interface will indicate when each ParDo transform is being executed.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. The SDK-provided Count transform is a generic transform that takes a PCollection of any type, and returns a PCollection of key/value pairs. Each key represents a unique element from the input collection, and each value represents the number of times that key appeared in the input collection.

    In this pipeline, the input for Count is the PCollection of individual words generated by the previous ParDo, and the output is a PCollection of key/value pairs where each key represents a unique word in the text and the associated value is the occurrence count for each.
  6. Java

      .apply(Count.<String>perElement())
    
  7. Next is a transform that formats each of the key/value pairs of unique words and occurrence counts into a printable string suitable for writing to an output file.
  8. Java

    MapElements is a higher-level composite transform that encapsulates a simple ParDo; for each element in the input PCollection, MapElements applies a function that produces exactly one output element. This MapElements invokes a SimpleFunction (defined in-line as an anonymous class) that does the formatting. As input, this MapElements takes a PCollection of key/value pairs generated by Count, and produces a new PCollection of printable strings.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. A text file Write. This transform takes the final PCollection of formatted Strings as input and writes each element to an output text file. Each element in the input PCollection represents one line of text in the resulting output file.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Note that the Write transform produces a trivial result value of type PDone, which in this case is ignored.

Running the Pipeline

Run the pipeline by calling the run method, which sends your pipeline to be executed by the pipeline runner that you specified when you created your pipeline.

Java

p.run();

WordCount Example

This WordCount example introduces a few recommended programming practices that can make your pipeline easier to read, write, and maintain. While not explicitly required, they can make your pipeline's execution more flexible, aid in testing your pipeline, and help make your pipeline's code reusable.

This section assumes that you have a good understanding of the basic concepts in building a pipeline. If you feel that you aren't at that point yet, read the above section, Minimal WordCount.

Java

New Concepts:
  1. Applying ParDo with an explicit DoFn
  2. Creating Composite Transforms
  3. Using Parameterizable PipelineOptions

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

Specifying Explicit DoFns

When using ParDo transforms, you need to specify the processing operation that gets applied to each element in the input PCollection. This processing operation is a subclass of the SDK class DoFn. The example pipeline in the previous (Minimal WordCount) section, creates the DoFn subclasses for each ParDo inline, as an anonymous inner class instance.

However, it's often a good idea to define the DoFn at the global level, which makes it easier to unit test and can make the ParDo code more readable.

As mentioned in the previous (Minimal WordCount) example, when you execute your pipeline, the Dataflow Monitoring Interface indicates when each ParDo transform is being executed. The Dataflow service automatically generates transform names for the ParDo transforms from the name of the DoFn you pass. For example, the ParDo that applies the FormatAsTextFn() appears in the monitoring interface as ParDo(FormatAsText).

Java

In this example, the DoFns are defined as static classes:

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

See Parallel Procesing with ParDo for a detailed discussion of creating and specifying DoFn subclasses for your ParDo transforms.

Creating Composite Transforms

If you have a processing operation that consists of multiple transforms or ParDo steps, you can create it as a subclass of PTransform. Creating a PTransform subclass allows you to create complex reusable transforms, can make your pipeline's structure more clear and modular, and makes unit testing easier.

Making your pipeline's logical structure explicit, using PTransform subclasses, can also make it easier to monitor your pipeline. When the Dataflow service builds your pipeline's final, optimized structure, the Dataflow Monitoring Interface will use the transforms you've built to more accurately reflect how your pipeline is structured.

Java

In this example, two transforms are encapsulated as the PTransform subclass CountWords. CountWords contains the ParDo that runs ExtractWordsFn and the SDK-provided Count transform.

When CountWords is defined, we specify its ultimate input and output; the input is the PCollection<String> for the extraction operation, and the output is the PCollection<KV<String, Long>> produced by the count operation.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Using Parameterizable PipelineOptions

In the previous example, Minimal WordCount, we set various execution options when we created our pipeline. In this example, we define our own custom configuration options, by extending PipelineOptions.

You can add your own arguments to be processed by the command-line parser, and specify default values for them. You can then access the options values in your pipeline code.

In the Minimal WordCount example, we hard-coded the pipeline options. However, the most common way to construct PipelineOptions is via command-line argument parsing.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Debugging WordCount Example

The Debugging WordCount example demonstrates some best practices for instrumenting your pipeline code. You can use the Dataflow Monitoring Interface and Aggregators to gain additional visibility into your pipeline as it runs.

Java

You can also use the SDK's DataflowAssert to test your transforms' output at different stages of the pipeline.

Java

New Concepts:
  1. Viewing logs in the Dataflow Monitoring Interface
  2. Controlling Dataflow worker log levels
  3. Creating Aggregatorss
  4. Testing your Pipeline via DataflowAssert

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

Viewing logs in the Dataflow Monitoring Interface

Google Cloud Logging aggregates the logs from all of your Dataflow job's workers to a single location in the Google Cloud Platform Console. You can use the Dataflow Monitoring Interface to view the logs from all of the Compute Engine instances that Dataflow has spun up to complete your Dataflow job. You can add logging statements into your pipeline's DoFn instances that will appear in the Monitoring Interface as your pipeline runs.

Java

The following SLF4J logger uses the fully qualified class name of FilterTextFn as the logger name. All log statements emitted by this logger will be referenced by this name and will be visible in the Dataflow Monitoring Interface, given appropriate log level settings.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Controlling Dataflow worker log levels

Java

Dataflow workers that execute user code are configured to log to Cloud Logging by default at INFO log level and higher. You may override log levels for specific logging namespaces by specifying:

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

For example, by specifying

--workerLogLevelOverrides={"com.example":"DEBUG"}

when you execute this pipeline using the Dataflow service, the Monitoring Interface will contain only DEBUG or higher level logs for the com.example package, in addition to the default INFO or higher level logs.

In addition, you can override the default Dataflow worker logging configuration by specifying

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

For example, by specifying

--defaultWorkerLogLevel=DEBUG

when executing this pipeline with the Dataflow service, the Monitoring Interface will contain all DEBUG or higher level logs. Note that changing the default worker log level to TRACE or DEBUG will significantly increase the amount of logging information.

Learn more about logging within Cloud Dataflow.

Creating Aggregators

A custom aggregator can track values in your pipeline as it runs. Those values will be displayed in the Dataflow Monitoring Interface when the pipeline is run using the Dataflow service.

Aggregators may not become visible until the system begins executing the ParDo transform that created them and/or their initial value is changed. They are then viewable in the monitoring interface at the bottom of Job Summary.

The custom aggregators below track the number of matched and unmatched words.

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Aggregators in Batch and Streaming pipelines

Aggregators in batch pipelines provide consistency. They are committed exactly once for successful bundles and not committed for failing bundles.

In streaming pipelines, aggregators provide more lenient semantics. Contribution from successful bundles are a best effort, and for failed bundles may be reflected in the final value.

Testing your Pipeline via DataflowAssert

Java

DataflowAssert is a set of convenient PTransforms in the style of Hamcrest collection matchers that can be used when writing Pipeline level tests to validate the contents of PCollections. DataflowAssert is best used in unit tests with small data sets, but is demonstrated here as a teaching tool.

Below, we verify that the set of filtered words matches our expected counts. Note that DataflowAssert does not provide any output, and that successful completion of the pipeline implies that the expectations were met. Learn more about how to test your pipeline and see DebuggingWordCountTest for an example unit test.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

This example, WindowedWordCount, counts words in text just as the previous examples did, but introduces several advanced concepts. The input for WindowedWordCount can be a fixed data set (like in the previous examples), or an unbounded stream of data.

The Dataflow SDK is convenient in that it allows you to create a single pipeline that can handle both bounded and unbounded types of input. If the input is unbounded, then all PCollections of the pipeline will be unbounded as well. The same goes for bounded input.

Before reading this section, make sure you are familiar and comfortable with the basic principles of building a pipeline.

New Concepts:
  1. Reading unbounded and bounded input
  2. Adding timestamps to data
  3. Windowing
  4. Writing unbounded and bounded output

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

Reading Unbounded and Bounded Input

The input for WindowedWordCount can be either bounded or unbounded. If your input has a fixed number of elements, it's considered a 'bounded' data set. If your input is continuously updating, then it's considered 'unbounded'. See Bounded and Unbounded PCollections to read more about input types.

In this example, you can choose whether the input will be bounded or unbounded. Recall that the input for all the examples is a set of Shakespeare's texts - a finite, bounded, input. However, for the purpose of explaining the new concepts in this example, its input is a replay of Shakespeare's texts.

In this example, if your input is unbounded, then the input will be read from a Google Cloud Pub/Sub topic; in that case, the Read transform applied to the pipeline is a PubSubIO.Read. Otherwise, the input will be read from Google Cloud Storage.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Adding Timestamps to Data

Each element in a PCollection has an associated timestamp. The timestamp for each element is assigned by the source that creates the PCollection. In this example, if you choose unbounded input for your pipeline, the timestamps will come from the Pub/Sub data source. If you choose bounded input, the DoFn method named AddTimestampsFn (invoked by ParDo) will set a timestamp for each element in the PCollection.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

Below is the code for AddTimestampsFn, a DoFn invoked by ParDo, that sets the data element of the timestamp given the element itself. For example, if the elements were log lines, this ParDo could parse the time out of the log string and set it as the element's timestamp. There are no timestamps inherent in the works of Shakespeare, so in this case we've made up random timestamps just to illustrate the concept. Each line of the input text will get a random associated timestamp sometime in a 2-hour period.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

You can read more about timestamps in PCollection Element Timestamps.

Windowing

The Dataflow SDK uses a concept called Windowing to subdivide a PCollection according to the timestamps of its individual elements. Dataflow transforms that aggregate multiple elements, process each PCollection as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).

The WindowingWordCount example applies fixed-time windowing, wherein each window represents a fixed time interval. The fixed window size for this example defaults to 1 minute (you can change this with a command-line option). The pipeline then applies the CountWords transform.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Writing Unbounded and Bounded Output

Since our input might be either bounded or unbounded, the same is true of our output PCollection. We need to make sure that we choose an appropriate sink. Some output sinks support only bounded output or only unbounded output. For example, a text file is a sink that can receive only bounded data. The BigQuery output source is one that supports both bounded and unbounded input.

In this example, we stream the results to a BigQuery table. The results are then formatted for a BigQuery table, and then written to BigQuery using BigQueryIO.Write.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));

Send feedback about...

Cloud Dataflow Documentation