Often in the course of a pipeline, you'll want to combine or otherwise merge collections of
values in your data. For example, you may have a collection of sales data consisting of orders
for a given month, each with a dollar value. In your pipeline, you might want to combine
all of the order values into a single value that represents the total dollar value of orders for
that month, or the largest order, or the average dollar value per order. To obtain this kind of
data, you **combine** the values in your collection.

The Dataflow SDKs contains a number of operations you can use to combine the values in your
pipeline's `PCollection`

objects or to combine
key-grouped values.

The `Combine`

core transform encapsulates various generic methods you can use to
combine the elements or values in a `PCollection`

. `Combine`

has variants
that work on entire `PCollection`

s, and some that combine the individual value streams
in `PCollection`

s of key/value pairs. `Combine`

also has subclasses for
specific numeric combining operations, such as sums, minimums,
maximums, and mean averages.

When you apply a `Combine`

transform, you must provide the function that contains
the actual logic for combining the elements or values. See
Creating and Specifying a Combine Function later in this
section for more information.

In this sense, `Combine`

transforms are similar to the
`ParDo`

transform that applies the logic in a processing function you provide to each
element.

## Combining a PCollection into a Single Value

You can combine all of the elements in a given `PCollection`

into a single value,
represented in your pipeline as a new `PCollection`

containing one element. To combine
the elements of a `PCollection`

, you use the global combine transform.

`Combine`

behaves differently if the input `PCollection`

is
divided using windowing. In that case,
global combine returns a *single element per window*.

Global combine uses a combining function that you supply to combine each element value. See Creating and Specifying a Combine Function later in this section for more details.

The Dataflow SDKs provide some pre-built combine functions for common numeric combination operations, such as sums, minimums, maximums, and mean averages. You can use these functions with global combine instead of creating your own combine function. See Using the Included Statistical Combine Operations for more information.

### Java

The following example code shows how to `apply`

a `Combine.globally`

transform to produce a single sum value for a `PCollection`

of type
`Integer`

:

PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()));

In the example, `Sum.SumIntegerFn()`

is the `CombineFn`

that the
transform uses to combine the elements in the input `PCollection`

. The resulting
`PCollection`

, called `sum`

, will contain one value: the sum of all the
elements in the input `PCollection`

.

### Global Windowing

If your input `PCollection`

uses the default
global windowing, Dataflow's default behavior is
to return a `PCollection`

containing one item. That item's value comes from the
accumulator in the combine function that you specified when applying
`Combine`

. For example, Dataflow's `Sum`

combine function returns a zero
value (the sum of an empty input), while `Min`

combine function returns a maximal or
infinite value.

To have `Combine`

instead return an empty `PCollection`

if the input is
empty, specify `.withoutDefaults`

when you apply your `Combine`

transform,
as in the following code example:

PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

### Non-Global Windowing

If your `PCollection`

uses any
non-global windowing function, Dataflow does not
provide the default behavior. You **must** specify one of the following options when applying
`Combine`

:

- Specify
`.withoutDefaults`

, where windows that are empty in the input`PCollection`

will likewise be empty in the output collection. - Specify
`.asSingletonView`

, in which the output is immediately converted to a`PCollectionView`

, which will provide a default value for each empty window when used as a side input. You'll generally only need to use this option if the result of your pipeline's`Combine`

is to be used as a side input later in the pipeline.

## Combining Values in a Key-Grouped Collection

After creating a key-grouped collection (for example, by using a `GroupByKey`

transform) a common pattern is to combine the collection of values associated with each key into
a single, merged value.

Drawing on the previous example from `GroupByKey`

, a key-grouped
`PCollection`

called `groupedWords`

looks like this:

cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ...

In the above `PCollection`

, each element has a string key (for example,
"cat") and an iterable of integers for its value (in the first element, containing [1,
5, 9]), where each integer represents the line number in the original text where the key
appeared. If our pipeline's next processing step combines the values (rather
than considering them individually), you can combine the iterable of integers
to create a single, merged value to be paired with each key.

The actual logic of how to combine the values is up to you; for example, given a key-grouped collection of keys and integer values, you might choose to sum the collection of integers associated with each key (which is useful when the values represent occurrence counts, for example). In our example, because the values represent line numbers, we might want to take the min, indicating the first occurrence.

### Java

PCollection<KV<String, Integer>> occurrences = ...; PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create()); PCollection<KV<String, Integer>> firstOccurrences = pc.apply( Combine.groupedValues(new Min.MinIntegerFn()));

### Using Combine PerKey

The Dataflow SDKs represent this entire pattern—`GroupByKey`

, then
merging the collection of values—as the Combine PerKey transform.

Combine PerKey performs both steps of the pattern: You `apply`

it to a
`PCollection`

of key/value pairs, just as
you would a GroupByKey. The
Combine PerKey transform then applies a combine function you supply to the set of all values
associated with each key. The combine operation produces a new `PCollection`

of
key/value pairs, which contains unique keys and a single, merged value for each key.

Combine PerKey performs the `GroupByKey`

transform as part
of its operation; therefore, it behaves as GroupByKey
does when your `PCollection`

is divided using
windowing. That is, Combine PerKey combines
**per key and window**.

While you can create a `ParDo`

to perform this kind of function
(namely, by key-grouping a collection and then combining the values), the more structured
semantics of the `CombineFn`

transform allows the Cloud Dataflow service to
execute the combining operation more efficiently.

The combine function you supply to Combine PerKey must be an associative reduction
function or a subclass of `CombineFn`

.
See Creating and Specifying
a Combine Function later in this section for more details on how to create a
`CombineFn`

.

### Java

When you use `Combine.perKey`

, you pass type parameters according to the
types of keys and values in your input `PCollection`

, and an additional type
parameter if the merged value in your output `PCollection`

is of a different type
than the values in your input `PCollection`

.

You'll also need to pass to `Combine.perKey`

the combine function that contains the
combination logic to apply to each value collection. You typically define your combine function
out of line.

The following example code shows how to `apply`

a `Combine.perKey`

transform. In the example, the input `PCollection`

is grouped by key, and the
collection of `Double`

values associated with each key is combined into a single
sum value:

PCollection<KV<String, Double>> salesRecords = ...; PCollection<KV<String, Double>> totalSalesPerPerson = salesRecords.apply(Combine.<String, Double, Double>perKey( new Sum.SumDoubleFn()));

Note that `Combine.perKey`

takes three type parameters: `String`

for the
key type, and `Double`

for the values. This is because the collection of values for
each key is of type `Double`

, and the resulting combined value is also of type
`Double`

.

The following example code shows how to `apply`

a `Combine.perKey`

transform where the combined value is of a different type than the original collection of
values per key; in this case, the input `PCollection`

has keys of type
`String`

and values of type `Integer`

, and the combined value is a
`Double`

that represents the mean average of all the values per key.

PCollection<KV<String, Integer>> playerAccuracy = ...; PCollection<KV<String, Double>> avgAccuracyPerPlayer = playerAccuracy.apply(Combine.<String, Integer, Double>perKey( new MeanInts())));

Often Java is able to infer these type parameters from the type of the `CombineFn`

.
Java 8 is better at this inference than earlier versions of Java.

## Creating and Specifying a Combine Function

When you use a `Combine`

transform, regardless of variant, you'll need to provide some
processing logic that specifies how to combine the multiple elements into a single value.

When you design a combine function, note that the function is not necessarily invoked exactly
once on all values with a given key. Because the input data (including the value collection)
may be distributed across multiple worker instances, the combining function might be called
multiple times to perform partial combining on subsets of the value collection. Generally
speaking, the combine might be repeatedly applied in a tree structure. Because the tree structure
is unspecified, *your combining function should be commutative and associative*.

You have a few options when creating a combine function. Simple combine operations, such as
sums, can usually be implemented as a simple function from a set of
values to a single value of the same type. Your function must be associative and commutative.
In other words, for a combining function *f* and a set of values
*v _{1}, ..., v_{n}*, it should be the case that, logically,

*f(v*etc.

_{1}, v_{2}, ... , v_{n}) = f(v_{1}, v_{2}, ... , f(v_{k}, ... , v_{n})) = f(f(v_{1}, v_{2}), ... , f(v_{k}, ... , v_{n})) = f(f(v_{k}, ... , v_{n}), ... , f(v_{2}, v_{1})) More complex
combination operations might require you to create a subclass of `CombineFn`

that
has an accumulation type distinct from the input/output type. Finally, the Dataflow SDK
provides some common functions that handle different statistical combinations, such as
`Sum`

, `Min`

, `Max`

, and `Mean`

. See
Using the Included Statistical Combine Operations for more
information.

### Simple Combinations Using Simple Functions

### Java

For simple combine functions, such as summing a collection of `Integer`

values
into a single `Integer`

, you can create a simple function that implements the
`SerializableFunction`

interface. Your `SerializableFunction`

should
convert an `Iterable<V>`

to a single value of type `V`

.

The following example code shows a simple combine function to sum a collection of
`Integer`

values; the function `SumInts`

implements the interface
`SerializableFunction`

:

public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> { @Override public Integer apply(Iterable<Integer> input) { int sum = 0; for (int item : input) { sum += item; } return sum; } }

### Advanced Combinations using CombineFn

For more complex combine functions, you can define a subclass of the SDK class
`CombineFn`

. You should use `CombineFn`

when your combining function must
perform additional pre- or post-processing, which might change the output type, require a more
sophisticated accumulator, or take the key into account.

Consider the distributed nature of the computation. The full
key/collection-of-values for each element might not exist on the same Compute Engine
instance at any given moment. You should use `CombineFn`

for any calculation that
cannot be performed properly without considering all of the values.

For example, consider computing the mean average of the collection of values associated with a
given key. To compute the mean, your combine function must sum all of the values, and then
divide the resulting sum by the number of values it has added. However, if the division
were to take place in multiple distributed locations, the resulting mean average would be
incorrect. Using `CombineFn`

lets the Cloud Dataflow service accumulate both the
running sum and the number of elements seen, and save the final computation (in this case, the
division) until all of the elements have been accumulated.

A general combining operation consists of four operations
When you create a subclass of `CombineFn`

, you'll need to provide four operations
by overriding the corresponding methods:

- Create Accumulator
- Add Input
- Merge Accumulators
- Extract Output

If, for any reason, your combine function needs access to the key in the
key/collection-of-values pair, you can use `KeyedCombineFn`

instead.
`KeyedCombineFn`

passes in the key as an additional type parameter.

**Create Accumulator** creates a new "local" accumulator. In the example case, taking
a mean average, a local accumulator tracks the running sum of values (the numerator value for
our final average division) and the number of values summed so far (the denomintator value).
It may be called any number of times in a distributed fashion.

**Add Input** adds an input element to an accumulator, returning the accumulator value.
In our example, it would update the sum and increment the count. It may also be invoked
in parallel.

**Merge Accumulators** merges several accumulators into a single accumulator; this is
how data in multiple accumulators are combined before the final calculation. In the case of the
mean average computation, the accumulators representing each portion of the division are merged
together. It may be called again on its outputs any number of times.

**Extract Output** performs the final computation. In the case of computing a mean
average, this means dividing the combined sum of all the values by the number of values summed.
It is called once on the final, merged accumulator.

The following example code shows how to define a `CombineFn`

to compute a mean
average:

### Java

public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { public static class Accum { int sum = 0; int count = 0; } @Override public Accum createAccumulator() { return new Accum(); } @Override public Accum addInput(Accum accum, Integer input) { accum.sum += input; accum.count++; return accum; } @Override public Accum mergeAccumulators(Iterable<Accum> accums) { Accum merged = createAccumulator(); for (Accum accum : accums) { merged.sum += accum.sum; merged.count += accum.count; } return merged; } @Override public Double extractOutput(Accum accum) { return ((double) accum.sum) / accum.count; } } PCollection<Integer> pc = ...; PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

See the API
for Java reference documentation for CombineFn for full details on the required type parameters and generics you'll
need to subclass `CombineFn`

.

The accumulator type `AccumT`

must be encodable.
See Data Encoding for details on how to specify
a default coder for a data type.

### Using the Included Statistical Combination Functions

### Java

The Dataflow SDKs contain several classes that provide basic mathematical combination functions.
You can use combine functions from these classes as the combine function when using
`Combine.globally`

or `Combine.perKey`

. The functions include:

`Sum`

: add all values together to produce a single sum value.`Min`

: keep only the minimum value from all available values`Max`

: keep only the maximum value from all available values`Mean`

: compute a single mean average value from all available values

Each class defines combine functions for `Integer`

, `Long`

, and
`Double`

types (excepting the `Mean`

class, which defines a generic
accumulating combine function, to which you'll need to pass a type parameter).

For example, you can use the `Sum.SumIntegerFn`

to sum a collection of
`Integer`

values (per key, in this example):

PCollection<KV<String, Integer>> playerScores = ...; PCollection<KV<String, Integer>> totalPointsPerPlayer = playerScores.apply(Combine.<String, Integer, Integer>perKey( new Sum.SumIntegerFn()));

Likewise, you can use `Max.MaxLongFn`

to compute the maximum `Long`

value
in a `PCollection<Long>`

as shown:

PCollection<Long> waitTimes = ...; PCollection<Long> longestWaitTime = waitTimes.apply( Combine.globally(new Max.MaxLongFn()));

The `Sum`

, `Min`

, `Max`

, and `Mean`

classes are all
defined in the package
com.google.cloud.dataflow.sdk.transforms. See the Cloud Dataflow
Java API reference for more information.