Combining Collections and Values

Often in the course of a pipeline, you'll want to combine or otherwise merge collections of values in your data. For example, you may have a collection of sales data consisting of orders for a given month, each with a dollar value. In your pipeline, you might want to combine all of the order values into a single value that represents the total dollar value of orders for that month, or the largest order, or the average dollar value per order. To obtain this kind of data, you combine the values in your collection.

The Dataflow SDKs contains a number of operations you can use to combine the values in your pipeline's PCollection objects or to combine key-grouped values.

The Combine core transform encapsulates various generic methods you can use to combine the elements or values in a PCollection. Combine has variants that work on entire PCollections, and some that combine the individual value streams in PCollections of key/value pairs. Combine also has subclasses for specific numeric combining operations, such as sums, minimums, maximums, and mean averages.

When you apply a Combine transform, you must provide the function that contains the actual logic for combining the elements or values. See Creating and Specifying a Combine Function later in this section for more information.

In this sense, Combine transforms are similar to the ParDo transform that applies the logic in a processing function you provide to each element.

Combining a PCollection into a Single Value

You can combine all of the elements in a given PCollection into a single value, represented in your pipeline as a new PCollection containing one element. To combine the elements of a PCollection, you use the global combine transform.

Combine behaves differently if the input PCollection is divided using windowing. In that case, global combine returns a single element per window.

Global combine uses a combining function that you supply to combine each element value. See Creating and Specifying a Combine Function later in this section for more details.

The Dataflow SDKs provide some pre-built combine functions for common numeric combination operations, such as sums, minimums, maximums, and mean averages. You can use these functions with global combine instead of creating your own combine function. See Using the Included Statistical Combine Operations for more information.

Java

The following example code shows how to apply a Combine.globally transform to produce a single sum value for a PCollection of type Integer:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

In the example, Sum.SumIntegerFn() is the CombineFn that the transform uses to combine the elements in the input PCollection. The resulting PCollection, called sum, will contain one value: the sum of all the elements in the input PCollection.

Global Windowing

If your input PCollection uses the default global windowing, Dataflow's default behavior is to return a PCollection containing one item. That item's value comes from the accumulator in the combine function that you specified when applying Combine. For example, Dataflow's Sum combine function returns a zero value (the sum of an empty input), while Min combine function returns a maximal or infinite value.

To have Combine instead return an empty PCollection if the input is empty, specify .withoutDefaults when you apply your Combine transform, as in the following code example:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

Non-Global Windowing

If your PCollection uses any non-global windowing function, Dataflow does not provide the default behavior. You must specify one of the following options when applying Combine:

  • Specify .withoutDefaults, where windows that are empty in the input PCollection will likewise be empty in the output collection.
  • Specify .asSingletonView, in which the output is immediately converted to a PCollectionView, which will provide a default value for each empty window when used as a side input. You'll generally only need to use this option if the result of your pipeline's Combine is to be used as a side input later in the pipeline.

Combining Values in a Key-Grouped Collection

After creating a key-grouped collection (for example, by using a GroupByKey transform) a common pattern is to combine the collection of values associated with each key into a single, merged value.

Drawing on the previous example from GroupByKey, a key-grouped PCollection called groupedWords looks like this:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

In the above PCollection, each element has a string key (for example, "cat") and an iterable of integers for its value (in the first element, containing [1, 5, 9]), where each integer represents the line number in the original text where the key appeared. If our pipeline's next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key.

The actual logic of how to combine the values is up to you; for example, given a key-grouped collection of keys and integer values, you might choose to sum the collection of integers associated with each key (which is useful when the values represent occurrence counts, for example). In our example, because the values represent line numbers, we might want to take the min, indicating the first occurrence.

Java

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

Using Combine PerKey

The Dataflow SDKs represent this entire pattern—GroupByKey, then merging the collection of values—as the Combine PerKey transform.

Combine PerKey performs both steps of the pattern: You apply it to a PCollection of key/value pairs, just as you would a GroupByKey. The Combine PerKey transform then applies a combine function you supply to the set of all values associated with each key. The combine operation produces a new PCollection of key/value pairs, which contains unique keys and a single, merged value for each key.

Combine PerKey performs the GroupByKey transform as part of its operation; therefore, it behaves as GroupByKey does when your PCollection is divided using windowing. That is, Combine PerKey combines per key and window.

While you can create a ParDo to perform this kind of function (namely, by key-grouping a collection and then combining the values), the more structured semantics of the CombineFn transform allows the Cloud Dataflow service to execute the combining operation more efficiently.

The combine function you supply to Combine PerKey must be an associative reduction function or a subclass of CombineFn. See Creating and Specifying a Combine Function later in this section for more details on how to create a CombineFn.

Java

When you use Combine.perKey, you pass type parameters according to the types of keys and values in your input PCollection, and an additional type parameter if the merged value in your output PCollection is of a different type than the values in your input PCollection.

You'll also need to pass to Combine.perKey the combine function that contains the combination logic to apply to each value collection. You typically define your combine function out of line.

The following example code shows how to apply a Combine.perKey transform. In the example, the input PCollection is grouped by key, and the collection of Double values associated with each key is combined into a single sum value:

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Note that Combine.perKey takes three type parameters: String for the key type, and Double for the values. This is because the collection of values for each key is of type Double, and the resulting combined value is also of type Double.

The following example code shows how to apply a Combine.perKey transform where the combined value is of a different type than the original collection of values per key; in this case, the input PCollection has keys of type String and values of type Integer, and the combined value is a Double that reperesents the mean average of all the values per key.

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

Often Java is able to infer these type parameters from the type of the CombineFn. Java 8 is better at this inference than earlier versions of Java.

Creating and Specifying a Combine Function

When you use a Combine transform, regardless of variant, you'll need to provide some processing logic that specifies how to combine the multiple elements into a single value.

When you design a combine function, note that the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple worker instances, the combining function might be called multiple times to perform partial combining on subsets of the value collection. Generally speaking, the combine might be repeatedly applied in a tree structure. Because the tree structure is unspecified, your combining function should be commutative and associative.

You have a few options when creating a combine function. Simple combine operations, such as sums, can usually be implemented as a simple function from a set of values to a single value of the same type. Your function must be associative and commutative. In other words, for a combining function f and a set of values v1, ..., vn, it should be the case that, logically, f(v1, v2, ... , vn) = f(v1, v2, ... , f(vk, ... , vn)) = f(f(v1, v2), ... , f(vk, ... , vn)) = f(f(vk, ... , vn), ... , f(v2, v1)) etc.

More complex combination operations might require you to create a subclass of CombineFn that has an accumulation type distinct from the input/output type. Finally, the Dataflow SDK provides some common functions that handle different statistical combinations, such as Sum, Min, Max, and Mean. See Using the Included Statistical Combine Operations for more information.

Simple Combinations Using Simple Functions

Java

For simple combine functions, such as summing a collection of Integer values into a single Integer, you can create a simple function that implements the SerializableFunction interface. Your SerializableFunction should convert an Iterable<V> to a single value of type V.

The following example code shows a simple combine function to sum a collection of Integer values; the function SumInts implements the interface SerializableFunction:

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

Advanced Combinations using CombineFn

For more complex combine functions, you can define a subclass of the SDK class CombineFn. You should use CombineFn when your combining function must perform additional pre- or post-processing, which might change the output type, require a more sophisticated accumulator, or take the key into account.

Consider the distributed nature of the computation. The full key/collection-of-values for each element might not exist on the same Compute Engine instance at any given moment. You should use CombineFn for any calculation that cannot be performed properly without considering all of the values.

For example, consider computing the mean average of the collection of values associated with a given key. To compute the mean, your combine function must sum all of the values, and then divide the resulting sum by the number of values it has added. However, if the division were to take place in multiple distributed locations, the resulting mean average would be incorrect. Using CombineFn lets the Cloud Dataflow service accumulate both the running sum and the number of elements seen, and save the final computation (in this case, the division) until all of the elements have been accumulated.

A general combining operation consistes of four operations When you create a subclass of CombineFn, you'll need to provide four operations by overriding the corresponding methods:

  • Create Accumulator
  • Add Input
  • Merge Accumulators
  • Extract Output

If, for any reason, your combine function needs access to the key in the key/collection-of-values pair, you can use KeyedCombineFn instead. KeyedCombineFn passes in the key as an additional type parameter.

Create Accumulator creates a new "local" accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denomintator value). It may be called any number of times in a distributed fashion.

Add Input adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.

Merge Accumultators merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.

Extract Output performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.

The following example code shows how to define a CombineFn to compute a mean average:

Java

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

See the API for Java reference documentation for CombineFn for full details on the required type parameters and generics you'll need to subclass CombineFn.

The accumulator type AccumT must be encodable. See Data Encoding for details on how to specify a default coder for a data type.

Using the Included Statistical Combination Functions

Java

The Dataflow SDKs contain several classes that provide basic mathematical combination functions. You can use combine functions from these classes as the combine function when using Combine.globally or Combine.perKey. The functions include:

  • Sum: add all values together to produce a single sum value.
  • Min: keep only the minimum value from all available values
  • Max: keep only the maximum value from all available values
  • Mean: compute a single mean average value from all available values

Each class defines combine functions for Integer, Long, and Double types (excepting the Mean class, which defines a generic accumulating combine function, to which you'll need to pass a type parameter).

For example, you can use the Sum.SumIntegerFn to sum a collection of Integer values (per key, in this example):

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

Likewise, you can use Max.MaxLongFn to compute the maximum Long value in a PCollection<Long> as shown:

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

The Sum, Min, Max, and Mean classes are all defined in the package com.google.cloud.dataflow.sdk.transforms. See the Cloud Dataflow Java API reference for more information.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation