Parallel Processing with ParDo

ParDo is the core parallel processing operation in the Dataflow SDKs. You use ParDo for generic parallel processing. The ParDo processing style is similar to what happens inside the "Mapper" class of a Map/Shuffle/Reduce-style algorithm: ParDo takes each element in an input PCollection, performs some processing function on that element, and then emits zero, one, or multiple elements to an output PCollection.

You provide the function that ParDo performs on each of the elements of the input PCollection. The function you provide is invoked independently, and in parallel, on multiple worker instances in your Dataflow job.

ParDo is useful for a variety of data processing operations, including:

  • Filtering a data set. You can use ParDo to consider each element in a PCollection and either output that element to a new collection or discard it.
  • Formatting or converting the type of each element in a data set. You can use ParDo to format the elements in your PCollection, such as formatting key/value pairs into printable strings.
  • Extracting parts of each element in a data set. You can use ParDo to extract just a part of each element in your PCollection. This can be particularly useful for extracting individual fields from BigQuery table rows.
  • Performing computations on each element in a data set. You can use ParDo to perform simple or complex computations on every element, or certain elements, of a PCollection.

ParDo is also a common intermediate step in a pipeline. For example, you can use ParDo to assign keys to each element in a PCollection, creating key/value pairs. You can group the pairs later using a GroupByKey transform.

Applying a ParDo Transform

To use ParDo, you apply it to the PCollection you want to transform, and save the return value as a PCollection of the appropriate type.

The argument that you provide to ParDo must be a subclass of a specific type provided by the Dataflow SDK, called DoFn. For more information on DoFn, see Creating and Specifying Processing Logic later in this section.

The following code sample shows a basic ParDo applied to a PCollection of strings, passing a DoFn-based function to compute the length of each string, and outputting the string lengths to a PCollection of integers.

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

In the example, the code calls apply on the input collection (called "words"). ParDo is the PTransform argument. The .of operation is where you specify the DoFn to perform on each element, called, in this case, ComputeWordLengthFn().

Creating and Specifying Processing Logic

The processing logic you provide for ParDo must be of a specific type required by the Dataflow SDK that you're using to create your pipeline.

Java

You must build a subclass of the SDK class DoFn.

The function you provide is invoked independently and across multiple Google Compute Engine instances.

In addition, your DoFn should not rely on any persistent state from invocation to invocation. Any given instance of your processing function in Cloud Platform might not have access to state information in any other instance of that function.

Note: The Dataflow SDK provides a variant of ParDo which you can use to pass immutable persistent data to each invocation of your user code as a side input.

Java

A DoFn processes one element at a time from the input PCollection. When you create a subclass of DoFn, you specify the type of input element and the type of output element(s) as type parameters. The following code sample shows how we might define the ComputeWordLengthFn() function from the previous example, which accepts an input String and produces an output Integer:

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

Your subclass of DoFn must override the element-processing method, processElement, where you provide the code to actually work with the input element. The following code sample shows the complete ComputeWordLengthFn():

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

You don't need to manually extract the elements from the input collection; the Dataflow Java SDK handles extracting each element and passing it to your DoFn subclass. When you override processElement, your override method must accept an object of type ProcessContext, which allows you to access the element that you want to process. You access the element that's passed to your DoFn by using the method ProcessContext.element().

If the elements in your PCollection are key/value pairs, you can access the key by using ProcessContext.element().getKey(), and the value by using ProcessContext.element().getValue().

Java

The Dataflow SDK for Java automatically handles gathering the output elements into a result PCollection. You use the ProcessContext object to output the resulting element from processElement to the output collection. To output an element for the result collection, use the method ProcessContext.output().

Lightweight DoFns

The Dataflow SDKs provide language-specific ways to simplify how you provide your DoFn implementation.

Java

Often, you can create a simple DoFn argument to ParDo as an anonymous inner class instance. If your DoFn is only a few lines, it might be cleaner to specify it inline. The following code sample shows how to apply a ParDo with the ComputeWordLengthFn function as an anonymous DoFn:

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

For transforms like the one above that apply a function to each element in the input to produce exactly one output per element, you can use the higher-level MapElements transform. This is especially concise in Java 8, as MapElements accepts a lambda function.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

Similarly, you can use Java 8 lambda functions with the Filter, FlatMapElements, and Partition transforms. See Pre-Written Transforms in the Dataflow SDKs for details on these transforms.

Transform Names

Transform names appear in the execution graph when you view your pipeline in the Dataflow Monitoring Interface. It is particularly important to specify an explicit name for your transform in order to recognize them in the graph.

Java

The .named operation specifies the transform name for this step in your pipeline. Transform names appear in the execution graph when you view your pipeline in the Dataflow Monitoring Interface. It is particularly important to specify an explicit name when you're using an anonymous DoFn instance with ParDo, so that you can see an easily-readable name for your step in the monitoring interface.

Side Inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform's DoFn while procesing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. For example, you can obtain a value from a remote service while your pipeline is running and use it as a side input. Or, you can use a value computed by a separate branch of your pipeline and add it as a side input to another branch's ParDo.

Representing a Side Input

Java

Side inputs are always of type PCollectionView. PCollectionView is a way of representing a PCollection as a single entity, which you can then pass as a side input to a ParDo. You can make a PCollectionView that expresses a PCollection as one of the following types:

View Type Usage
View.asSingleton Represents a PCollection as an individual value; generally you'd use this after combining a PCollection using Combine.globally. Use this when your side input is a single computed value. You should typically create a singleton view using Combine.globally(...).asSingletonView().
View.asList Represents a PCollection as a List. Use this view when your side input is a collection of individual values.
View.asMap Represents a PCollection as a Map. Use this view when your side input consists of key/value pairs (PCollection<K, V>), and has a single value for each key.
View.asMultimap Represents a PCollection as a MultiMap. Use this view when your side input consists of key/value pairs (PCollection<K, V>), and has multiple values for each key.

Note: Like other pipeline data, PCollectionView is immutable once created.

Passing Side Inputs to ParDo

Java

You pass side inputs to your ParDo transform by invoking .withSideInputs. Inside your DoFn, you access the side input by using the method DoFn.ProcessContext.sideInput.

The following example code creates a singleton side input from a PCollection<Integer> and passes it to a subsequent ParDo.

In the example, we have a PCollection<String> called words that represents a collection of individual words, and a PCollection<Integer> that represents word lengths; we can use the latter to compute a maximum word length cutoff as a singleton value, and then pass that computed value as a side input to a ParDo that filters words based on the cutoff.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());


  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

Side Inputs and Windowing

When you create a PCollectionView of a windowed PCollection, which may be infinite and thus cannot be compressed into a single value (or single collection class), the PCollectionView represents a single entity per window. That is, the PCollectionView represents one singleton per window, one list per window, etc.

Dataflow uses the window(s) for the main input element to look up the appropriate window for the side input element. Dataflow projects the main input element's window into the side input's window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window; however, if the inputs have different windows, Dataflow uses the projection to choose the most appropriate side input window.

Java

For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Dataflow projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.

If the side input has multiple trigger firings, Dataflow uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.

Side Outputs

While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo will return all of the output PCollections (including the main output) bundled together. For example, in Java, the output PCollections are bundled in a type-safe PCollectionTuple.

Tags for Side Outputs

Java

To emit elements to a side output PCollection, you'll need to create a TupleTag object to identify each collection your ParDo produces. For example, if your ParDo produces three output PCollections (the main output and two side outputs), you'll need to create three associated TupleTags.

The following example code shows how to create TupleTags for a ParDo with a main output and two side outputs:

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main ouput PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

Passing Output Tags to ParDo

Once you have specified the TupleTags for each of your ParDo outputs, you'll need to pass those tags to your ParDo by invoking .withOutputTags. You pass the tag for the main output first, and then the tags for any side outputs in a TupleTagList.

Building on our previous example, here's how we pass the three TupleTags (one for the main output and two for the side outputs) to our ParDo:

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple called results..

Emitting to Side Outputs In Your DoFn

Java

Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput. You'll need to pass the appropriate TupleTag for the target side output collection when you call ProcessContext.sideOutput.

From our previous example, here's the DoFn emitting to the main and side outputs:

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

After your ParDo, you'll need to extract the resulting main and side output PCollections from the returned PCollectionTuple. See the section on PCollectionTuples for some examples that show how to extract individual PCollections from a tuple.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation