Google Cloud Dataflow SDK for Java, version 1.9.1
Class ParDo
- java.lang.Object
-
- com.google.cloud.dataflow.sdk.transforms.ParDo
-
public class ParDo extends Object
ParDo
is the core element-wise transform in Google Cloud Dataflow, invoking a user-specified function on each of the elements of the inputPCollection
to produce zero or more output elements, all of which are collected into the outputPCollection
.Elements are processed independently, and possibly in parallel across distributed cloud resources.
The
ParDo
processing style is similar to what happens inside the "Mapper" or "Reducer" class of a MapReduce-style algorithm.DoFns
The function to use to process each element is specified by a
DoFn<InputT, OutputT>
, primarily via itsprocessElement
method. TheDoFn
may also override the default implementations ofstartBundle
andfinishBundle
.Conceptually, when a
ParDo
transform is executed, the elements of the inputPCollection
are first divided up into some number of "bundles". These are farmed off to distributed worker machines (or run locally, if using theDirectPipelineRunner
). For each bundle of input elements processing proceeds as follows:- A fresh instance of the argument
DoFn
is created on a worker. This may be through deserialization or other means. If theDoFn
subclass does not overridestartBundle
orfinishBundle
then this may be optimized since it cannot observe the start and end of a bundle. - The
DoFn's
DoFn.startBundle(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>.Context)
method is called to initialize it. If this method is not overridden, the call may be optimized away. - The
DoFn's
DoFn.processElement(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>.ProcessContext)
method is called on each of the input elements in the bundle. - The
DoFn's
DoFn.finishBundle(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>.Context)
method is called to complete its work. AfterDoFn.finishBundle(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>.Context)
is called, the framework will never again invoke any of these three processing methods. If this method is not overridden, this call may be optimized away.
Each of the calls to any of the
DoFn's
processing methods can produce zero or more output elements. All of the of output elements from all of theDoFn
instances are included in the outputPCollection
.For example:
PCollection<String> lines = ...; PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String line = c.element(); for (String word : line.split("[^a-zA-Z']+")) { c.output(word); } }})); PCollection<Integer> wordLengths = words.apply(ParDo.of(new DoFn<String, Integer>() { public void processElement(ProcessContext c) { String word = c.element(); Integer length = word.length(); c.output(length); }}));
Each output element has the same timestamp and is in the same windows as its corresponding input element, and the output
PCollection
has the sameWindowFn
associated with it as the input.Naming
ParDo
transformsThe name of a transform is used to provide a name for any node in the
Pipeline
graph resulting from application of the transform. It is best practice to provide a name at the time of application, viaPCollection.apply(String, PTransform)
. Otherwise, a unique name - which may not be stable across pipeline revision - will be generated, based on the transform name.If a
ParDo
is applied exactly once inlined, then it can be given a name vianamed(java.lang.String)
. For example:PCollection<String> words = lines.apply(ParDo.named("ExtractWords") .of(new DoFn<String, String>() { ... })); PCollection<Integer> wordLengths = words.apply(ParDo.named("ComputeWordLengths") .of(new DoFn<String, Integer>() { ... }));
Side Inputs
While a
ParDo
processes elements from a single "main input"PCollection
, it can take additional "side input"PCollectionViews
. These side inputPCollectionViews
express styles of accessingPCollections
computed by earlier pipeline operations, passed in to theParDo
transform usingwithSideInputs(com.google.cloud.dataflow.sdk.values.PCollectionView<?>...)
, and their contents accessible to each of theDoFn
operations viasideInput
. For example:PCollection<String> words = ...; PCollection<Integer> maxWordLengthCutOff = ...; // Singleton PCollection final PCollectionView<Integer> maxWordLengthCutOffView = maxWordLengthCutOff.apply(View.<Integer>asSingleton()); PCollection<String> wordsBelowCutOff = words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); int lengthCutOff = c.sideInput(maxWordLengthCutOffView); if (word.length() <= lengthCutOff) { c.output(word); } }}));
Side Outputs
Optionally, a
ParDo
transform can produce multiple outputPCollections
, both a "main output"PCollection<OutputT>
plus any number of "side output"PCollections
, each keyed by a distinctTupleTag
, and bundled in aPCollectionTuple
. TheTupleTags
to be used for the outputPCollectionTuple
are specified by invokingwithOutputTags(com.google.cloud.dataflow.sdk.values.TupleTag<OutputT>, com.google.cloud.dataflow.sdk.values.TupleTagList)
. Unconsumed side outputs do not necessarily need to be explicitly specified, even if theDoFn
generates them. Within theDoFn
, an element is added to the main outputPCollection
as normal, usingDoFn.Context.output(OutputT)
, while an element is added to a side outputPCollection
usingDoFn.Context.sideOutput(com.google.cloud.dataflow.sdk.values.TupleTag<T>, T)
. For example:PCollection<String> words = ...; // Select words whose length is below a cut off, // plus the lengths of words that are above the cut off. // Also select words starting with "MARKER". final int wordLengthCutOff = 10; // Create tags to use for the main and side outputs. final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; PCollectionTuple results = words.apply( ParDo // Specify the main and consumed side output tags of the // PCollectionTuple result: .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)) .of(new DoFn<String, String>() { // Create a tag for the unconsumed side output. final TupleTag<String> specialWordsTag = new TupleTag<String>(){}; public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { // Emit this short word to the main output. c.output(word); } else { // Emit this long word's length to a side output. c.sideOutput(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { // Emit this word to a different side output. c.sideOutput(markedWordsTag, word); } if (word.startsWith("SPECIAL")) { // Emit this word to the unconsumed side output. c.sideOutput(specialWordsTag, word); } }})); // Extract the PCollection results, by tag. PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); PCollection<Integer> wordLengthsAboveCutOff = results.get(wordLengthsAboveCutOffTag); PCollection<String> markedWords = results.get(markedWordsTag);
Properties May Be Specified In Any Order
Several properties can be specified for a
ParDo
PTransform
, including name, side inputs, side output tags, andDoFn
to invoke. Only theDoFn
is required; the name is encouraged but not required, and side inputs and side output tags are only specified when they're needed. These properties can be specified in any order, as long as they're specified before theParDo
PTransform
is applied.The approach used to allow these properties to be specified in any order, with some properties omitted, is to have each of the property "setter" methods defined as static factory methods on
ParDo
itself, which return an instance of eitherParDo.Unbound
orParDo.Bound
nested classes, each of which offer property setter instance methods to enable setting additional properties.ParDo.Bound
is used forParDo
transforms whoseDoFn
is specified and whose input and output static types have been bound.ParDo.Unbound
is used forParDo
transforms that have not yet had theirDoFn
specified. OnlyParDo.Bound
instances can be applied.Another benefit of this approach is that it reduces the number of type parameters that need to be specified manually. In particular, the input and output types of the
ParDo
PTransform
are inferred automatically from the type parameters of theDoFn
argument passed toof(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>)
.Output Coders
By default, the
Coder<OutputT>
for the elements of the main outputPCollection<OutputT>
is inferred from the concrete type of theDoFn<InputT, OutputT>
.By default, the
Coder<SideOutputT>
for the elements of a side outputPCollection<SideOutputT>
is inferred from the concrete type of the correspondingTupleTag<SideOutputT>
. To be successful, theTupleTag
should be created as an instance of a trivial anonymous subclass, with{}
suffixed to the constructor call. Such uses block Java's generic type parameter inference, so the<X>
argument must be provided explicitly. For example:// A TupleTag to use for a side input can be written concisely: final TupleTag<Integer> sideInputag = new TupleTag<>(); // A TupleTag to use for a side output should be written with "{}", // and explicit generic parameter type: final TupleTag<String> sideOutputTag = new TupleTag<String>(){};
TupleTag
instantiation is used in the example of multiple side outputs, above.Serializability of
DoFns
A
DoFn
passed to aParDo
transform must beSerializable
. This allows theDoFn
instance created in this "main program" to be sent (in serialized form) to remote worker machines and reconstituted for each bundles of elements of the inputPCollection
being processed. ADoFn
can have instance variable state, and non-transient instance variable state will be serialized in the main program and then deserialized on remote worker machines for each bundle of elements to process.To aid in ensuring that
DoFns
are properlySerializable
, even local execution using theDirectPipelineRunner
will serialize and then deserializeDoFns
before executing them on a bundle.DoFns
expressed as anonymous inner classes can be convenient, but due to a quirk in Java's rules for serializability, non-static inner or nested classes (including anonymous inner classes) automatically capture their enclosing class's instance in their serialized state. This can lead to including much more than intended in the serialized state of aDoFn
, or even things that aren'tSerializable
.There are two ways to avoid unintended serialized state in a
DoFn
:- Define the
DoFn
as a named, static class. - Define the
DoFn
as an anonymous inner class inside of a static method.
Both of these approaches ensure that there is no implicit enclosing instance serialized along with the
DoFn
instance.Prior to Java 8, any local variables of the enclosing method referenced from within an anonymous inner class need to be marked as
final
. If defining theDoFn
as a named static class, such variables would be passed as explicit constructor arguments and stored in explicit instance variables.There are three main ways to initialize the state of a
DoFn
instance processing a bundle:- Define instance variable state (including implicit instance
variables holding final variables captured by an anonymous inner
class), initialized by the
DoFn
's constructor (which is implicit for an anonymous inner class). This state will be automatically serialized and then deserialized in theDoFn
instance created for each bundle. This method is good for state known when the originalDoFn
is created in the main program, if it's not overly large. - Compute the state as a singleton
PCollection
and pass it in as a side input to theDoFn
. This is good if the state needs to be computed by the pipeline, or if the state is very large and so is best read from file(s) rather than sent as part of theDoFn
's serialized state. - Initialize the state in each
DoFn
instance, inDoFn.startBundle(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>.Context)
. This is good if the initialization doesn't depend on any information known only by the main program or computed by earlier pipeline operations, but is the same for all instances of thisDoFn
for all program executions, say setting up empty caches or initializing constant data.
No Global Shared State
ParDo
operations are intended to be able to run in parallel across multiple worker machines. This precludes easy sharing and updating mutable state across those machines. There is no support in the Google Cloud Dataflow system for communicating and synchronizing updates to shared state across worker machines, so programs should not access any mutable static variable state in theirDoFn
, without understanding that the Java processes for the main program and workers will each have its own independent copy of such state, and there won't be any automatic copying of that state across Java processes. All information should be communicated toDoFn
instances via main and side inputs and serialized state, and all output should be communicated from aDoFn
instance via main and side outputs, in the absence of external communication mechanisms written by user code.Fault Tolerance
In a distributed system, things can fail: machines can crash, machines can be unable to communicate across the network, etc. While individual failures are rare, the larger the job, the greater the chance that something, somewhere, will fail. The Google Cloud Dataflow service strives to mask such failures automatically, principally by retrying failed
DoFn
bundle. This means that aDoFn
instance might process a bundle partially, then crash for some reason, then be rerun (often on a different worker machine) on that same bundle and on the same elements as before. Sometimes two or moreDoFn
instances will be running on the same bundle simultaneously, with the system taking the results of the first instance to complete successfully. Consequently, the code in aDoFn
needs to be written such that these duplicate (sequential or concurrent) executions do not cause problems. If the outputs of aDoFn
are a pure function of its inputs, then this requirement is satisfied. However, if aDoFn's
execution has external side-effects, such as performing updates to external HTTP services, then theDoFn's
code needs to take care to ensure that those updates are idempotent and that concurrent updates are acceptable. This property can be difficult to achieve, so it is advisable to strive to keepDoFns
as pure functions as much as possible.Optimization
The Google Cloud Dataflow service automatically optimizes a pipeline before it is executed. A key optimization, fusion, relates to
ParDo
operations. If oneParDo
operation produces aPCollection
that is then consumed as the main input of anotherParDo
operation, the twoParDo
operations will be fused together into a single ParDo operation and run in a single pass; this is "producer-consumer fusion". Similarly, if two or more ParDo operations have the samePCollection
main input, they will be fused into a singleParDo
that makes just one pass over the inputPCollection
; this is "sibling fusion".If after fusion there are no more unfused references to a
PCollection
(e.g., one between a producer ParDo and a consumerParDo
), thePCollection
itself is "fused away" and won't ever be written to disk, saving all the I/O and space expense of constructing it.The Google Cloud Dataflow service applies fusion as much as possible, greatly reducing the cost of executing pipelines. As a result, it is essentially "free" to write
ParDo
operations in a very modular, composable style, eachParDo
operation doing one clear task, and stringing together sequences ofParDo
operations to get the desired overall effect. Such programs can be easier to understand, easier to unit-test, easier to extend and evolve, and easier to reuse in new programs. The predefined library of PTransforms that come with Google Cloud Dataflow makes heavy use of this modular, composable style, trusting to the Google Cloud Dataflow service's optimizer to "flatten out" all the compositions into highly optimized stages.- See Also:
- the web documentation for ParDo
- A fresh instance of the argument
-
-
Nested Class Summary
Nested Classes Modifier and Type Class and Description static class
ParDo.Bound<InputT,OutputT>
APTransform
that, when applied to aPCollection<InputT>
, invokes a user-specifiedDoFn<InputT, OutputT>
on all its elements, with all its outputs collected into an outputPCollection<OutputT>
.static class
ParDo.BoundMulti<InputT,OutputT>
APTransform
that, when applied to aPCollection<InputT>
, invokes a user-specifiedDoFn<InputT, OutputT>
on all its elements, which can emit elements to any of thePTransform
's main and side outputPCollection
s, which are bundled into a resultPCollectionTuple
.static class
ParDo.Unbound
An incompleteParDo
transform, with unbound input/output types.static class
ParDo.UnboundMulti<OutputT>
An incomplete multi-outputParDo
transform, with unbound input type.
-
Constructor Summary
Constructors Constructor and Description ParDo()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method and Description static ParDo.Unbound
named(String name)
Creates aParDo
PTransform
with the given name.static <InputT,OutputT>
ParDo.Bound<InputT,OutputT>of(DoFn<InputT,OutputT> fn)
static <InputT,OutputT>
ParDo.Bound<InputT,OutputT>of(DoFnWithContext<InputT,OutputT> fn)
static <OutputT> ParDo.UnboundMulti<OutputT>
withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags)
Creates a multi-outputParDo
PTransform
whose outputPCollection
s will be referenced using the given main output and side output tags.static ParDo.Unbound
withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs)
Creates aParDo
with the given side inputs.static ParDo.Unbound
withSideInputs(PCollectionView<?>... sideInputs)
Creates aParDo
PTransform
with the given side inputs.
-
-
-
Method Detail
-
named
public static ParDo.Unbound named(String name)
Creates aParDo
PTransform
with the given name.See the discussion of naming above for more explanation.
The resulting
PTransform
is incomplete, and its input/output types are not yet bound. UseParDo.Unbound.of(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>)
to specify theDoFn
to invoke, which will also bind the input/output types of thisPTransform
.
-
withSideInputs
public static ParDo.Unbound withSideInputs(PCollectionView<?>... sideInputs)
Creates aParDo
PTransform
with the given side inputs.Side inputs are
PCollectionViews
, whose contents are computed during pipeline execution and then made accessible toDoFn
code viasideInput
. Each invocation of theDoFn
receives the same values for these side inputs.See the discussion of Side Inputs above for more explanation.
The resulting
PTransform
is incomplete, and its input/output types are not yet bound. UseParDo.Unbound.of(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>)
to specify theDoFn
to invoke, which will also bind the input/output types of thisPTransform
.
-
withSideInputs
public static ParDo.Unbound withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs)
Creates aParDo
with the given side inputs.Side inputs are
PCollectionView
s, whose contents are computed during pipeline execution and then made accessible toDoFn
code viasideInput
.See the discussion of Side Inputs above for more explanation.
The resulting
PTransform
is incomplete, and its input/output types are not yet bound. UseParDo.Unbound.of(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>)
to specify theDoFn
to invoke, which will also bind the input/output types of thisPTransform
.
-
withOutputTags
public static <OutputT> ParDo.UnboundMulti<OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags)
Creates a multi-outputParDo
PTransform
whose outputPCollection
s will be referenced using the given main output and side output tags.TupleTags
are used to name (with its static element typeT
) each main and side outputPCollection<T>
. ThisPTransform's
DoFn
emits elements to the main outputPCollection
as normal, usingDoFn.Context.output(OutputT)
. TheDoFn
emits elements to a side outputPCollection
usingDoFn.Context.sideOutput(com.google.cloud.dataflow.sdk.values.TupleTag<T>, T)
, passing that side output's tag as an argument. The result of invoking thisPTransform
will be aPCollectionTuple
, and any of the the main and side outputPCollection
s can be retrieved from it viaPCollectionTuple.get(com.google.cloud.dataflow.sdk.values.TupleTag<T>)
, passing the output's tag as an argument.See the discussion of Side Outputs above for more explanation.
The resulting
PTransform
is incomplete, and its input type is not yet bound. UseParDo.UnboundMulti.of(com.google.cloud.dataflow.sdk.transforms.DoFn<InputT, OutputT>)
to specify theDoFn
to invoke, which will also bind the input type of thisPTransform
.
-
of
public static <InputT,OutputT> ParDo.Bound<InputT,OutputT> of(DoFn<InputT,OutputT> fn)
Creates aParDo
PTransform
that will invoke the givenDoFn
function.The resulting
PTransform's
types have been bound, with the input being aPCollection<InputT>
and the output aPCollection<OutputT>
, inferred from the types of the argumentDoFn<InputT, OutputT>
. It is ready to be applied, or further properties can be set on it first.
-
of
@Experimental public static <InputT,OutputT> ParDo.Bound<InputT,OutputT> of(DoFnWithContext<InputT,OutputT> fn)
Creates aParDo
PTransform
that will invoke the givenDoFnWithContext
function.The resulting
PTransform's
types have been bound, with the input being aPCollection<InputT>
and the output aPCollection<OutputT>
, inferred from the types of the argumentDoFn<InputT, OutputT>
. It is ready to be applied, or further properties can be set on it first.DoFnWithContext
is an experimental alternative toDoFn
which simplifies accessing the window of the element.
-
-