Often in the course of a pipeline, you'll want to combine or otherwise merge collections of values in your data. For example, you may have a collection of sales data consisting of orders for a given month, each with a dollar value. In your pipeline, you might want to combine all of the order values into a single value that represents the total dollar value of orders for that month, or the largest order, or the average dollar value per order. To obtain this kind of data, you combine the values in your collection.
The Dataflow SDKs contains a number of operations you can use to combine the values in your
pipeline's PCollection
objects or to combine
key-grouped values.
The Combine
core transform encapsulates various generic methods you can use to
combine the elements or values in a PCollection
. Combine
has variants
that work on entire PCollection
s, and some that combine the individual value streams
in PCollection
s of key/value pairs. Combine
also has subclasses for
specific numeric combining operations, such as sums, minimums,
maximums, and mean averages.
When you apply a Combine
transform, you must provide the function that contains
the actual logic for combining the elements or values. See
Creating and Specifying a Combine Function later in this
section for more information.
In this sense, Combine
transforms are similar to the
ParDo
transform that applies the logic in a processing function you provide to each
element.
Combining a PCollection into a Single Value
You can combine all of the elements in a given PCollection
into a single value,
represented in your pipeline as a new PCollection
containing one element. To combine
the elements of a PCollection
, you use the global combine transform.
Combine
behaves differently if the input PCollection
is
divided using windowing. In that case,
global combine returns a single element per window.
Global combine uses a combining function that you supply to combine each element value. See Creating and Specifying a Combine Function later in this section for more details.
The Dataflow SDKs provide some pre-built combine functions for common numeric combination operations, such as sums, minimums, maximums, and mean averages. You can use these functions with global combine instead of creating your own combine function. See Using the Included Statistical Combine Operations for more information.
Java
The following example code shows how to apply
a Combine.globally
transform to produce a single sum value for a PCollection
of type
Integer
:
PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()));
In the example, Sum.SumIntegerFn()
is the CombineFn
that the
transform uses to combine the elements in the input PCollection
. The resulting
PCollection
, called sum
, will contain one value: the sum of all the
elements in the input PCollection
.
Global Windowing
If your input PCollection
uses the default
global windowing, Dataflow's default behavior is
to return a PCollection
containing one item. That item's value comes from the
accumulator in the combine function that you specified when applying
Combine
. For example, Dataflow's Sum
combine function returns a zero
value (the sum of an empty input), while Min
combine function returns a maximal or
infinite value.
To have Combine
instead return an empty PCollection
if the input is
empty, specify .withoutDefaults
when you apply your Combine
transform,
as in the following code example:
PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
Non-Global Windowing
If your PCollection
uses any
non-global windowing function, Dataflow does not
provide the default behavior. You must specify one of the following options when applying
Combine
:
- Specify
.withoutDefaults
, where windows that are empty in the inputPCollection
will likewise be empty in the output collection. - Specify
.asSingletonView
, in which the output is immediately converted to aPCollectionView
, which will provide a default value for each empty window when used as a side input. You'll generally only need to use this option if the result of your pipeline'sCombine
is to be used as a side input later in the pipeline.
Combining Values in a Key-Grouped Collection
After creating a key-grouped collection (for example, by using a GroupByKey
transform) a common pattern is to combine the collection of values associated with each key into
a single, merged value.
Drawing on the previous example from GroupByKey
, a key-grouped
PCollection
called groupedWords
looks like this:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ...
In the above PCollection
, each element has a string key (for example,
"cat") and an iterable of integers for its value (in the first element, containing [1,
5, 9]), where each integer represents the line number in the original text where the key
appeared. If our pipeline's next processing step combines the values (rather
than considering them individually), you can combine the iterable of integers
to create a single, merged value to be paired with each key.
The actual logic of how to combine the values is up to you; for example, given a key-grouped collection of keys and integer values, you might choose to sum the collection of integers associated with each key (which is useful when the values represent occurrence counts, for example). In our example, because the values represent line numbers, we might want to take the min, indicating the first occurrence.
Java
PCollection<KV<String, Integer>> occurrences = ...; PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create()); PCollection<KV<String, Integer>> firstOccurrences = pc.apply( Combine.groupedValues(new Min.MinIntegerFn()));
Using Combine PerKey
The Dataflow SDKs represent this entire pattern—GroupByKey
, then
merging the collection of values—as the Combine PerKey transform.
Combine PerKey performs both steps of the pattern: You apply
it to a
PCollection
of key/value pairs, just as
you would a GroupByKey. The
Combine PerKey transform then applies a combine function you supply to the set of all values
associated with each key. The combine operation produces a new PCollection
of
key/value pairs, which contains unique keys and a single, merged value for each key.
Combine PerKey performs the GroupByKey
transform as part
of its operation; therefore, it behaves as GroupByKey
does when your PCollection
is divided using
windowing. That is, Combine PerKey combines
per key and window.
While you can create a ParDo
to perform this kind of function
(namely, by key-grouping a collection and then combining the values), the more structured
semantics of the CombineFn
transform allows the Cloud Dataflow service to
execute the combining operation more efficiently.
The combine function you supply to Combine PerKey must be an associative reduction
function or a subclass of CombineFn
.
See Creating and Specifying
a Combine Function later in this section for more details on how to create a
CombineFn
.
Java
When you use Combine.perKey
, you pass type parameters according to the
types of keys and values in your input PCollection
, and an additional type
parameter if the merged value in your output PCollection
is of a different type
than the values in your input PCollection
.
You'll also need to pass to Combine.perKey
the combine function that contains the
combination logic to apply to each value collection. You typically define your combine function
out of line.
The following example code shows how to apply
a Combine.perKey
transform. In the example, the input PCollection
is grouped by key, and the
collection of Double
values associated with each key is combined into a single
sum value:
PCollection<KV<String, Double>> salesRecords = ...; PCollection<KV<String, Double>> totalSalesPerPerson = salesRecords.apply(Combine.<String, Double, Double>perKey( new Sum.SumDoubleFn()));
Note that Combine.perKey
takes three type parameters: String
for the
key type, and Double
for the values. This is because the collection of values for
each key is of type Double
, and the resulting combined value is also of type
Double
.
The following example code shows how to apply
a Combine.perKey
transform where the combined value is of a different type than the original collection of
values per key; in this case, the input PCollection
has keys of type
String
and values of type Integer
, and the combined value is a
Double
that represents the mean average of all the values per key.
PCollection<KV<String, Integer>> playerAccuracy = ...; PCollection<KV<String, Double>> avgAccuracyPerPlayer = playerAccuracy.apply(Combine.<String, Integer, Double>perKey( new MeanInts())));
Often Java is able to infer these type parameters from the type of the CombineFn
.
Java 8 is better at this inference than earlier versions of Java.
Creating and Specifying a Combine Function
When you use a Combine
transform, regardless of variant, you'll need to provide some
processing logic that specifies how to combine the multiple elements into a single value.
When you design a combine function, note that the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple worker instances, the combining function might be called multiple times to perform partial combining on subsets of the value collection. Generally speaking, the combine might be repeatedly applied in a tree structure. Because the tree structure is unspecified, your combining function should be commutative and associative.
You have a few options when creating a combine function. Simple combine operations, such as sums, can usually be implemented as a simple function from a set of values to a single value of the same type. Your function must be associative and commutative. In other words, for a combining function f and a set of values v1, ..., vn, it should be the case that, logically, f(v1, v2, ... , vn) = f(v1, v2, ... , f(vk, ... , vn)) = f(f(v1, v2), ... , f(vk, ... , vn)) = f(f(vk, ... , vn), ... , f(v2, v1)) etc.
More complex
combination operations might require you to create a subclass of CombineFn
that
has an accumulation type distinct from the input/output type. Finally, the Dataflow SDK
provides some common functions that handle different statistical combinations, such as
Sum
, Min
, Max
, and Mean
. See
Using the Included Statistical Combine Operations for more
information.
Simple Combinations Using Simple Functions
Java
For simple combine functions, such as summing a collection of Integer
values
into a single Integer
, you can create a simple function that implements the
SerializableFunction
interface. Your SerializableFunction
should
convert an Iterable<V>
to a single value of type V
.
The following example code shows a simple combine function to sum a collection of
Integer
values; the function SumInts
implements the interface
SerializableFunction
:
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> { @Override public Integer apply(Iterable<Integer> input) { int sum = 0; for (int item : input) { sum += item; } return sum; } }
Advanced Combinations using CombineFn
For more complex combine functions, you can define a subclass of the SDK class
CombineFn
. You should use CombineFn
when your combining function must
perform additional pre- or post-processing, which might change the output type, require a more
sophisticated accumulator, or take the key into account.
Consider the distributed nature of the computation. The full
key/collection-of-values for each element might not exist on the same Compute Engine
instance at any given moment. You should use CombineFn
for any calculation that
cannot be performed properly without considering all of the values.
For example, consider computing the mean average of the collection of values associated with a
given key. To compute the mean, your combine function must sum all of the values, and then
divide the resulting sum by the number of values it has added. However, if the division
were to take place in multiple distributed locations, the resulting mean average would be
incorrect. Using CombineFn
lets the Cloud Dataflow service accumulate both the
running sum and the number of elements seen, and save the final computation (in this case, the
division) until all of the elements have been accumulated.
A general combining operation consists of four operations
When you create a subclass of CombineFn
, you'll need to provide four operations
by overriding the corresponding methods:
- Create Accumulator
- Add Input
- Merge Accumulators
- Extract Output
If, for any reason, your combine function needs access to the key in the
key/collection-of-values pair, you can use KeyedCombineFn
instead.
KeyedCombineFn
passes in the key as an additional type parameter.
Create Accumulator creates a new "local" accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denomintator value). It may be called any number of times in a distributed fashion.
Add Input adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.
Merge Accumulators merges several accumulators into a single accumulator; this is how data in multiple accumulators are combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.
Extract Output performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.
The following example code shows how to define a CombineFn
to compute a mean
average:
Java
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { public static class Accum { int sum = 0; int count = 0; } @Override public Accum createAccumulator() { return new Accum(); } @Override public Accum addInput(Accum accum, Integer input) { accum.sum += input; accum.count++; return accum; } @Override public Accum mergeAccumulators(Iterable<Accum> accums) { Accum merged = createAccumulator(); for (Accum accum : accums) { merged.sum += accum.sum; merged.count += accum.count; } return merged; } @Override public Double extractOutput(Accum accum) { return ((double) accum.sum) / accum.count; } } PCollection<Integer> pc = ...; PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));
See the API
for Java reference documentation for CombineFn for full details on the required type parameters and generics you'll
need to subclass CombineFn
.
The accumulator type AccumT
must be encodable.
See Data Encoding for details on how to specify
a default coder for a data type.
Using the Included Statistical Combination Functions
Java
The Dataflow SDKs contain several classes that provide basic mathematical combination functions.
You can use combine functions from these classes as the combine function when using
Combine.globally
or Combine.perKey
. The functions include:
Sum
: add all values together to produce a single sum value.Min
: keep only the minimum value from all available valuesMax
: keep only the maximum value from all available valuesMean
: compute a single mean average value from all available values
Each class defines combine functions for Integer
, Long
, and
Double
types (excepting the Mean
class, which defines a generic
accumulating combine function, to which you'll need to pass a type parameter).
For example, you can use the Sum.SumIntegerFn
to sum a collection of
Integer
values (per key, in this example):
PCollection<KV<String, Integer>> playerScores = ...; PCollection<KV<String, Integer>> totalPointsPerPlayer = playerScores.apply(Combine.<String, Integer, Integer>perKey( new Sum.SumIntegerFn()));
Likewise, you can use Max.MaxLongFn
to compute the maximum Long
value
in a PCollection<Long>
as shown:
PCollection<Long> waitTimes = ...; PCollection<Long> longestWaitTime = waitTimes.apply( Combine.globally(new Max.MaxLongFn()));
The Sum
, Min
, Max
, and Mean
classes are all
defined in the package
com.google.cloud.dataflow.sdk.transforms. See the Cloud Dataflow
Java API reference for more information.