GroupByKey and Join

The GroupByKey core transform is a parallel reduction operation used to process collections of key/value pairs. You use GroupByKey with an input PCollection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. The GroupByKey transform lets you gather together all of the values in the multimap that share the same key.

GroupByKey is analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. You use GroupByKey to collect all of the values associated with a unique key.

GroupByKey is a good way to aggregate data that has something in common. For example, you might want to group together customer orders from the same postal code (wherein the "key" would be the zip code of each individual order, and the "value" would be the remainder of the order). Or, perhaps you might want to group together all user queries by user ID, or by the time of day they occurred.

You can also join multiple collections of key/value pairs, when those collections share a common key (even if the value types are different). There are two ways to perform a join. One way is by using the CoGroupByKey transform, which lets you group together all of the values (of any type) in multiple collections that share a common key. The other way is to join multiple collections of key/value pairs by using a ParDo with one or more side inputs. In some cases, this second way can be more efficient than using a CoGroupByKey.

Joins are useful when you have multiple data sets, possibly from multiple sources, that provide information about related things. For example, let's say you have two different files with auction data: one file has the auction ID, bid data, and pricing data; the other file has the auction ID and an item description. You can join those two data sets, using the auction ID as a common key and the other attached data as the associated values. After the join, you have one data set that contains all the information (bid, price, and item) associated with each auction ID.

GroupByKey

Let's examine the mechanics of GroupByKey with a simple example case, where our data set consists of words from a text file and the line on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where one particular word appears.

Our input is a PCollection of key/value pairs where each word is a key, and the value is a line number in the file where the word appears. Here's a list of the key/value pairs in our input collection:

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

The GroupByKey transform gathers up all of the values with the same key, and creates a new pair consisting of the unique key and a collection of all the values that were associated with that key in the input PCollection. If we were to apply GroupByKey on the collection of key/value pairs above, the output collection would look like this:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

Thus, GroupByKey represents a transform from a multi-map, which is a map of multiple keys to individual values, to a uni-map, which is a map of unique keys to collections of values.

Java

A Note on Representing Key/Value Pairs:
In the Dataflow Java SDK, you represent a key/value pair with an object of type KV<K, V>. KV is a specialty class with keys of type K and values of type V.

A common processing pattern for key-grouped collections (the result of a GroupByKey transform) is to combine the values associated with each key into a single, merged value associated with that key. The Dataflow SDKs encapsulate this entire pattern (key-grouping, and then combining the values for each key) as the Combine transform. See Combining Collections and Values for more information.

Applying a GroupByKey Transform

You must apply a GroupByKey to an input PCollection of key/value pairs. GroupByKey returns a new PCollection of key/value pairs; in the new collection, the keys are unique, and each associated value element is actually a value stream that contains one or more values associated with the key.

Java

The following example code shows how to apply GroupByKey to a PCollection of KV<K, V> objects, where each element pair represents a key of type K and a single value of type V.

The return value of GroupByKey is a new PCollection of type KV<K, Iterable<V>>, where each element pair represents a key and a collection of values as a Java Iterable.

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
Java 8 is able to infer the parameter types of GroupByKey.create, but they may need to be specified explicitly in earlier versions of Java.

GroupByKey with Windowing

Java

GroupByKey behaves a bit differently when the input PCollection is divided into multiple windows— instead of a single global window.

The GroupByKey transform also considers the window to which each element belongs when performing the reduction. The window(s) (as determined by each key/value pair's timestamp) essentially acts as a secondary key. GroupByKey with windowing thus groups by both key and window. When all elements are part of a single global window, GroupByKey degenerates to the simple semantics described above.

While an element's window(s) acts as a secondary key for grouping, it can be potentially more powerful. Elements might belong to more than one window, and overlapping windows may be merged. This allows you to create more complex groupings.

Let's apply windowing to our previous example:

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey gathers up all the elements with the same key and window, generating an output collection like this:

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

Note that the window now affects the output groups; key/value pairs in different windows are not grouped together.

Joins with CoGroupByKey

The CoGroupByKey transform performs a relational join of two or more data sets. CoGroupByKey groups together the values from multiple PCollections of key/value pairs, where each PCollection in the input has the same key type.

Java

For type safety, Dataflow requires you to pass each PCollection as part of a KeyedPCollectionTuple. You'll need to declare a TupleTag for each input PCollection that you want to pass to CoGroupByKey.

CoGroupByKey bundles the joined output together in a CoGbkResult object.

Let's use a simple example to demonstrate the mechanics of a CoGroupByKey. We have two input collections to bundle into a KeyedPCollectionTuple. The first is a PCollection<K, V1>, so we'll assign a TupleTag<V1> called tag1. The key-value pairs in this PCollection are:

  key1 ↦ v1
  key2 ↦ v2
  key2 ↦ v3

The second is a PCollection<K, V2>, so we'll assign a TupleTag<V2> called tag2. This collection contains:

  key1 ↦ x1
  key1 ↦ x2
  key3 ↦ x3

The resulting CoGbkResult collection contains all the data associated with each unique key from any of the input collections. The returned data type is PCollection<KV<K, CoGbkResult>>, with the following contents:

  key1 -> {
    tag1 ↦ [v1]
    tag2 ↦ [x1, x2]
  }
  key2 -> {
    tag1 ↦ [v2, v3]
    tag2 ↦ []
  }
  key3 -> {
    tag1 ↦ []
    tag2 ↦ [x3]
  }

After applying CoGroupByKey, you can look up the data from each collection by using the appropriate TupleTag.

Applying CoGroupByKey

Java

CoGroupByKey accepts a tuple of keyed PCollections (PCollection<KV<K, V>>) as input. As output, CoGroupByKey returns a special type called CoGbkResults, which groups values from all the input PCollections by their common keys. You index the CoGbkResults using the TupleTag mechanism for multiple collections. You can access a specific collection in the CoGbkResults object by using the TupleTag that you supplied with the initial collection.

Here's an example that joins two different data sets (perhaps from different sources) that have been read into separate PCollections:

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

In the resulting PCollection<KV<K, CoGbkResult>> each key (all of type K) will have a different CoGbkResult. In other words, CoGbkResult is a map from TupleTag<T> to Iterable<T>.

Here is another example of using CoGroupByKey, followed by a ParDo that consumes the resulting CoGbkResult; the ParDo might, for example, format the data for later processing:

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

Using CoGroupByKey with Unbounded PCollections

Java

When using CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all the collections you're merging must use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use CoGroupByKey to merge PCollections with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.

Joins with ParDo and Side Inputs

Instead of using CoGroupByKey, you can apply a ParDo with one or more side inputs to perform a join. A side input is an additional input, other than the main input, that you can provide to your PCollection . Your DoFn can access the side input each time it processes an element in the input PCollection.

Performing a join this way can be more efficient than using CoGroupByKey in some cases, such as:

  • The PCollections you are joining are disproportionate in size and the smaller PCollections could fit into memory.
  • You have a large table which you want to join against multiple times at different places in the pipeline. Instead of doing a CoGroupByKey several times, you can create one side input and pass it to multiple ParDos. For example, you'd like to join two tables to generate a third. Then you'd like to join the first table with the third and so on.

To perform a join this way, apply a ParDo to one of the PCollections and pass the other PCollection(s) as side inputs. The following code sample shows how to perform such a join.

Java

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));

Send feedback about...

Cloud Dataflow Documentation