ParDo
is the core parallel processing operation in the Dataflow SDKs. You use
ParDo
for generic parallel processing. The ParDo
processing style is
similar to what happens inside the "Mapper" class of a Map/Shuffle/Reduce-style algorithm:
ParDo
takes each element in an input PCollection
, performs
some processing function on that element, and then emits zero, one, or multiple elements to an
output PCollection
.
You provide the function that ParDo
performs on each of the elements of the
input PCollection
. The function you provide is invoked independently,
and in parallel, on multiple
worker instances in your Dataflow job.
ParDo
is useful for a variety of data processing operations, including:
- Filtering a data set. You can use
ParDo
to consider each element in aPCollection
and either output that element to a new collection or discard it. - Formatting or converting the type of each element in a data set. You can use
ParDo
to format the elements in yourPCollection
, such as formatting key/value pairs into printable strings. - Extracting parts of each element in a data set. You can use
ParDo
to extract just a part of each element in yourPCollection
. This can be particularly useful for extracting individual fields from BigQuery table rows. - Performing computations on each element in a data set. You can use
ParDo
to perform simple or complex computations on every element, or certain elements, of aPCollection
.
ParDo
is also a common intermediate step in a pipeline. For example, you can use
ParDo
to assign keys to each element in a PCollection
, creating
key/value pairs. You can group the pairs later using a GroupByKey
transform.
Applying a ParDo Transform
To use ParDo
, you apply it to the PCollection
you want to
transform, and save the return value as a PCollection
of the appropriate type.
The argument that you provide to ParDo
must be a subclass of a specific type
provided by the Dataflow SDK, called DoFn
. For more information on
DoFn
, see Creating and Specifying Processing Logic
later in this section.
The following code sample shows a basic ParDo
applied to a PCollection
of strings, passing a DoFn
-based function to compute the length of
each string, and outputting the string lengths to a PCollection
of integers.
Java
// The input PCollection of Strings. PCollection<String> words = ...; // The DoFn to perform on each element in the input PCollection. static class ComputeWordLengthFn extends DoFn<String, Integer> { ... } // Apply a ParDo to the PCollection "words" to compute lengths for each word. PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn())); // The DoFn to perform on each element, which // we define above.
In the example, the code calls apply
on the input collection (called "words").
ParDo
is the PTransform
argument. The .of
operation is
where you specify the DoFn
to perform on each element, called, in this case,
ComputeWordLengthFn()
.
Creating and Specifying Processing Logic
The processing logic you provide for ParDo
must be of a specific type required by
the Dataflow SDK that you're using to create your pipeline.
Java
You must build a subclass of the SDK class DoFn
.
The function you provide is invoked independently and across multiple Google Compute Engine instances.
In addition, your DoFn
should not rely on any persistent state from invocation to
invocation. Any given instance of your processing function in Cloud Platform might not have access
to state information in any other instance of that function.
Note: The Dataflow SDK provides a variant of ParDo
which you can use to pass immutable persistent data to
each invocation of your user code as a side input.
Java
A DoFn
processes one element at a time from the input PCollection
.
When you create a subclass of DoFn
, you specify the type of input element and the
type of output element(s) as type parameters. The following code sample shows how we might
define the ComputeWordLengthFn()
function from the previous example, which accepts
an input String
and produces an output Integer
:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
Your subclass of DoFn
must override the element-processing method,
processElement
, where you provide the code to actually work with the input element.
The following code sample shows the complete ComputeWordLengthFn()
:
static class ComputeWordLengthFn extends DoFn<String, Integer> { @Override public void processElement(ProcessContext c) { String word = c.element(); c.output(word.length()); } }
You don't need to manually extract the elements from the input collection; the Dataflow Java
SDK handles extracting each element and passing it to your DoFn
subclass.
When you override processElement
, your override method must accept an object of
type ProcessContext
, which allows you to access the element that you want to
process. You access the element that's passed to your DoFn
by using the method
ProcessContext.element()
.
If the elements in your PCollection
are key/value pairs, you can
access the key by using ProcessContext.element().getKey()
, and the value by using
ProcessContext.element().getValue()
.
Java
The Dataflow SDK for Java automatically handles gathering the output elements into a
result PCollection
. You use the ProcessContext
object to output
the resulting element from processElement
to the output collection. To output an
element for the result collection, use the method ProcessContext.output()
.
Lightweight DoFns
The Dataflow SDKs provide language-specific ways to simplify how you provide your
DoFn
implementation.
Java
Often, you can create a simple DoFn
argument to ParDo
as an anonymous
inner class instance. If your DoFn
is only a few lines, it might be cleaner to
specify it inline. The following code sample shows how to apply a ParDo
with the
ComputeWordLengthFn
function as an anonymous DoFn
:
// The input PCollection. PCollection<String> words = ...; // Apply a ParDo with an anonymous DoFn to the PCollection words. // Save the result as the PCollection wordLengths. PCollection<Integer> wordLengths = words.apply( ParDo .named("ComputeWordLengths") // the transform name .of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance @Override public void processElement(ProcessContext c) { c.output(c.element().length()); } }));
For transforms like the one above that apply a function to each element in the input to produce
exactly one output per element, you can use the higher-level MapElements
transform.
This is especially concise in Java 8, as MapElements
accepts a lambda function.
// The input PCollection. PCollection<String> words = ...; // Apply a MapElements with an anonymous lambda function to the PCollection words. // Save the result as the PCollection wordLengths. PCollection<Integer> wordLengths = words.apply( MapElements.via((String word) -> word.length()) .withOutputType(new TypeDescriptor<Integer>() {});
Similarly, you can use Java 8 lambda functions with the Filter
,
FlatMapElements
, and Partition
transforms. See
Pre-Written Transforms in the Dataflow SDKs for details on these
transforms.
Transform Names
Transform names appear in the execution graph when you view your pipeline in the Dataflow Monitoring Interface. It is particularly important to specify an explicit name for your transform in order to recognize them in the graph.
Java
The .named
operation specifies the transform name for this step in your
pipeline. Transform names appear in the execution graph when you view your pipeline in the
Dataflow Monitoring Interface. It is
particularly important to specify an explicit name when you're using an anonymous
DoFn
instance with ParDo
, so that you can see an easily-readable name
for your step in the monitoring interface.
Side Inputs
In addition to the main input PCollection
, you can provide additional inputs to a
ParDo
transform in the form of side inputs. A side input is an additional
input that your DoFn
can access each time it processes an element in the input
PCollection
. When you specify a side input, you create a view of some other data that
can be read from within the ParDo
transform's DoFn
while processing each
element.
Side inputs are useful if your ParDo
needs to inject additional data when processing
each element in the input PCollection
, but the additional data needs to be determined
at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a
different branch of your pipeline. For example, you can obtain a value from a remote service while
your pipeline is running and use it as a side input. Or, you can use a value computed by a
separate branch of your pipeline and add it as a side input to another branch's
ParDo
.
Representing a Side Input
Java
Side inputs are always of type PCollectionView.
PCollectionView
is a way of representing a PCollection
as a single
entity, which you can then pass as a side input to a ParDo
. You can make a
PCollectionView
that expresses a PCollection
as one of the following
types:
View Type | Usage |
---|---|
View.asSingleton |
Represents a PCollection as an individual value; generally you'd use this
after combining a PCollection using
Combine.globally. Use this when your side input is a
single computed value. You should typically create a singleton view using
Combine.globally(...).asSingletonView() .
|
View.asList |
Represents a PCollection as a List . Use this view when your side
input is a collection of individual values. |
View.asMap |
Represents a PCollection as a Map . Use this view when your side
input consists of key/value pairs (PCollection<K, V> ), and has a single value
for each key. |
View.asMultimap |
Represents a PCollection as a MultiMap . Use this view when your
side input consists of key/value pairs (PCollection<K, V> ), and has
multiple values for each key. |
Note: Like other pipeline data, PCollectionView
is immutable
once created.
Passing Side Inputs to ParDo
Java
You pass side inputs to your ParDo
transform by invoking
.withSideInputs
. Inside your DoFn
, you access the side input by using
the method DoFn.ProcessContext.sideInput
.
The following example code creates a singleton side input from a
PCollection<Integer>
and passes it to a subsequent ParDo
.
In the example, we have a PCollection<String>
called words
that
represents a collection of individual words, and a PCollection<Integer>
that
represents word lengths; we can use the latter to compute a maximum word length cutoff as a
singleton value, and then pass that computed value as a side input to a ParDo
that
filters words
based
on the cutoff.
// The input PCollection to ParDo. PCollection<String> words = ...; // A PCollection of word lengths that we'll combine into a single value. PCollection<Integer> wordLengths = ...; // Singleton PCollection // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton. final PCollectionView<Integer> maxWordLengthCutOffView = wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView()); // Apply a ParDo that takes maxWordLengthCutOffView as a side input. PCollection<String> wordsBelowCutOff = words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); // In our DoFn, access the side input. int lengthCutOff = c.sideInput(maxWordLengthCutOffView); if (word.length() <= lengthCutOff) { c.output(word); } }})); }
Side Inputs and Windowing
When you create a PCollectionView
of a windowed PCollection
,
which may be infinite and thus cannot be compressed into a single value (or single collection
class), the PCollectionView
represents a single entity per
window. That is, the PCollectionView
represents one singleton per window, one list per window, etc.
Dataflow uses the window(s) for the main input element to look up the appropriate window for the side input element. Dataflow projects the main input element's window into the side input's window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window; however, if the inputs have different windows, Dataflow uses the projection to choose the most appropriate side input window.
Java
For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Dataflow projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.
If the side input has multiple trigger firings, Dataflow uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.
Side Outputs
While ParDo
always produces a main output PCollection
(as the return
value from apply
), you can also have your ParDo
produce any number of
additional output PCollection
s. If you choose to have multiple outputs, your
ParDo
will return all of the output PCollection
s (including the main
output) bundled together. For example, in Java, the output PCollection
s are
bundled in a type-safe PCollectionTuple.
Tags for Side Outputs
Java
To emit elements to a side output PCollection
, you'll need to create a
TupleTag
object to identify each collection your ParDo
produces. For
example, if your ParDo
produces three output PCollection
s (the main
output and two side outputs), you'll need to create three associated TupleTag
s.
The following example code shows how to create TupleTag
s for a ParDo
with a main output and two side outputs:
// Input PCollection to our ParDo. PCollection<String> words = ...; // The ParDo will filter words whose length is below a cutoff and add them to // the main output PCollection<String>. // If a word is above the cutoff, the ParDo will add the word length to a side output // PCollection<Integer>. // If a word starts with the string "MARKER", the ParDo will add that word to a different // side output PCollection<String>. final int wordLengthCutOff = 10; // Create the TupleTags for the main and side outputs. // Main output. final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; // Word lengths side output. final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; // "MARKER" words side output. final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
Passing Output Tags to ParDo
Once you have specified the TupleTag
s for each of your ParDo
outputs,
you'll need to pass those tags to your ParDo
by invoking
.withOutputTags
. You pass the tag for the main output first, and then the tags for
any side outputs in a TupleTagList
.
Building on our previous example, here's how we pass the three TupleTag
s (one for
the main output and two for the side outputs) to our ParDo
:
PCollectionTuple results = words.apply( ParDo // Specify the tag for the main output, wordsBelowCutoffTag. .withOutputTags(wordsBelowCutOffTag, // Specify the tags for the two side outputs as a TupleTagList. TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)) .of(new DoFn<String, String>() { // DoFn continues here. ... }
Note that all of the outputs (including the main output PCollection
) are
bundled into the returned PCollectionTuple
called results.
.
Emitting to Side Outputs In Your DoFn
Java
Inside your ParDo
's DoFn
, you can emit an element to a side output by
using the method ProcessContext.sideOutput
. You'll need to pass the appropriate
TupleTag
for the target side output collection when you call
ProcessContext.sideOutput
.
From our previous example, here's the DoFn
emitting to the main and side
outputs:
.of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { // Emit this short word to the main output. c.output(word); } else { // Emit this long word's length to a side output. c.sideOutput(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { // Emit this word to a different side output. c.sideOutput(markedWordsTag, word); } }}));
After your ParDo
, you'll need to extract the resulting main and side output
PCollection
s from the returned PCollectionTuple
. See the section on
PCollectionTuples for some
examples that show how to extract individual PCollection
s from a tuple.