The GroupByKey
core transform is a parallel reduction operation used to
process collections of key/value pairs. You use GroupByKey
with an input
PCollection
of key/value pairs that represents a multimap, where the collection
contains multiple pairs that have the same key, but different values. The GroupByKey
transform lets you gather together all of the values in the multimap that share the same key.
GroupByKey
is analogous to the Shuffle phase of a Map/Shuffle/Reduce-style
algorithm. You use GroupByKey
to collect all of the values associated with a unique
key.
GroupByKey
is a good way to aggregate data that has something in common. For
example, you might want to group together customer orders from the same postal code (wherein the
"key" would be the zip code of each individual order, and the "value" would be the remainder of
the order). Or, perhaps you might want to group together all user queries by user ID, or by the
time of day they occurred.
You can also join multiple collections of key/value pairs, when those collections share a
common key (even if the value types are different). There are two ways to perform a join. One way
is by using the CoGroupByKey transform, which lets you group together all of
the values (of any type) in multiple collections that share a common key. The other way is to join
multiple collections of key/value pairs by using a ParDo with one or
more side inputs. In some cases, this second way can be more efficient than using a
CoGroupByKey
.
Joins are useful when you have multiple data sets, possibly from multiple sources, that provide information about related things. For example, let's say you have two different files with auction data: one file has the auction ID, bid data, and pricing data; the other file has the auction ID and an item description. You can join those two data sets, using the auction ID as a common key and the other attached data as the associated values. After the join, you have one data set that contains all the information (bid, price, and item) associated with each auction ID.
GroupByKey
Let's examine the mechanics of GroupByKey
with a simple example case, where our data
set consists of words from a text file and the line on which they appear. We want to group
together all the line numbers (values) that share the same word (key), letting us see all the
places in the text where one particular word appears.
Our input is a PCollection
of key/value pairs where each word is a key, and the
value is a line number in the file where the word appears. Here's a list of the key/value pairs in
our input collection:
cat, 1 dog, 5 and, 1 jump, 3 tree, 2 cat, 5 dog, 2 and, 2 cat, 9 and, 6 ...
The GroupByKey
transform gathers up all of the values with the same key, and
creates a new pair consisting of the unique key and a collection of all the values that were
associated with that key in the input PCollection
. If we were to apply
GroupByKey
on the collection of key/value pairs above, the output collection would
look like this:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ...
Thus, GroupByKey
represents a transform from a multi-map, which is
a map of multiple keys to individual values, to a uni-map, which is a map of unique keys
to collections of values.
Java
A Note on Representing Key/Value Pairs:
In the Dataflow Java SDK, you represent a key/value pair with an object of type
KV<K, V>
. KV
is a specialty class with keys of type
K
and values of type V
.
A common processing pattern for key-grouped collections (the result of a GroupByKey
transform) is to combine the values associated with each key into a single, merged value
associated with that key. The Dataflow SDKs encapsulate this entire pattern (key-grouping, and
then combining the values for each key) as the Combine
transform. See
Combining Collections and Values for more information.
Applying a GroupByKey Transform
You must apply a GroupByKey
to an input PCollection
of
key/value pairs. GroupByKey
returns a new PCollection
of key/value
pairs; in the new collection, the keys are unique, and each associated value element is actually
a value stream that contains one or more values associated with the key.
Java
The following example code shows how to apply GroupByKey
to a
PCollection
of KV<K, V>
objects, where each element pair
represents a key of type K
and a single value of type V
.
The return value of GroupByKey
is a new PCollection
of type
KV<K, Iterable<V>>
, where each element pair represents a key and a
collection of values as a Java Iterable
.
// A PCollection of key/value pairs: words and line numbers. PCollection<KV<String, Integer>> wordsAndLines = ...; // Apply a GroupByKey transform to the PCollection "wordsAndLines". PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply( GroupByKey.<String, Integer>create());Java 8 is able to infer the parameter types of
GroupByKey.create
,
but they may need to be specified explicitly in earlier versions of Java.
GroupByKey with Windowing
Java
GroupByKey
behaves a bit differently when the input PCollection
is divided into multiple windows— instead of a
single global window.
The GroupByKey
transform also considers the window to which each element belongs
when performing the reduction. The window(s) (as determined by each key/value pair's timestamp)
essentially acts as a secondary key. GroupByKey
with windowing thus groups by both
key and window. When all elements are part of a single global window, GroupByKey
degenerates to the simple semantics described above.
While an element's window(s) acts as a secondary key for grouping, it can be potentially more powerful. Elements might belong to more than one window, and overlapping windows may be merged. This allows you to create more complex groupings.
Let's apply windowing to our previous example:
cat, 1 (window 0) dog, 5 (window 0) and, 1 (window 0) jump, 3 (window 1) tree, 2 (window 1) cat, 5 (window 1) dog, 2 (window 2) and, 2 (window 2) cat, 9 (window 2) and, 6 (window 2) ...
GroupByKey
gathers up all the elements with the same key and window,
generating an output collection like this:
cat, [1] (window 0) dog, [5] (window 0) and, [1] (window 0) jump, [3] (window 1) tree, [2] (window 1) cat, [5] (window 1) dog, [2] (window 2) and, [2,6] (window 2) cat, [9] (window 2)
Note that the window now affects the output groups; key/value pairs in different windows are not grouped together.
Joins with CoGroupByKey
The CoGroupByKey
transform performs a relational join of two or more data
sets. CoGroupByKey
groups together the values from multiple PCollection
s
of key/value pairs, where each PCollection
in the input has the same key type.
Java
For type safety, Dataflow requires you to pass each PCollection
as part of a
KeyedPCollectionTuple
. You'll need to declare a
TupleTag for each input
PCollection
that you want to pass to CoGroupByKey
.
CoGroupByKey
bundles the joined output together in a
CoGbkResult
object.
Let's use a simple example to demonstrate the mechanics of a CoGroupByKey
. We have
two input collections to bundle into a KeyedPCollectionTuple
. The first is a
PCollection<K, V1>
, so we'll assign a TupleTag<V1>
called
tag1
. The key-value pairs in this PCollection
are:
key1 ↦ v1 key2 ↦ v2 key2 ↦ v3
The second is a PCollection<K, V2>
, so we'll assign a
TupleTag<V2>
called tag2
. This collection contains:
key1 ↦ x1 key1 ↦ x2 key3 ↦ x3
The resulting CoGbkResult
collection contains all the data associated with
each unique key from any of the input collections. The returned data type is
PCollection<KV<K, CoGbkResult>>
, with the following contents:
key1 -> { tag1 ↦ [v1] tag2 ↦ [x1, x2] } key2 -> { tag1 ↦ [v2, v3] tag2 ↦ [] } key3 -> { tag1 ↦ [] tag2 ↦ [x3] }
After applying CoGroupByKey
, you can look up the data from each
collection by using the appropriate TupleTag
.
Applying CoGroupByKey
Java
CoGroupByKey
accepts a tuple of keyed PCollection
s
(PCollection<KV<K, V>>
) as input. As output, CoGroupByKey
returns a special type called CoGbkResults
, which groups values from all the input
PCollection
s by their common keys. You index the CoGbkResults
using the
TupleTag
mechanism for multiple
collections. You can access a specific collection in the CoGbkResults
object by
using the TupleTag
that you supplied with the initial collection.
Here's an example that joins two different data sets (perhaps from different sources) that have
been read into separate PCollection
s:
// Each data set is represented by key-value pairs in separate PCollections. // Both data sets share a common key type ("K"). PCollection<KV<K, V1>> pc1 = ...; PCollection<KV<K, V2>> pc2 = ...; // Create tuple tags for the value types in each collection. final TupleTag<V1> tag1 = new TupleTag<V1>(); final TupleTag<V2> tag2 = new TupleTag<V2>(); // Merge collection values into a CoGbkResult collection. PCollection<KV<K, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(tag1, pc1) .and(tag2, pc2) .apply(CoGroupByKey.<K>create());
In the resulting PCollection<KV<K, CoGbkResult>>
each key (all of type K) will have a different CoGbkResult
. In other words,
CoGbkResult
is a map from TupleTag<T>
to
Iterable<T>
.
Here is another example of using CoGroupByKey
, followed by
a ParDo
that consumes the resulting CoGbkResult
; the ParDo
might, for example, format the data for later processing:
// Each BigQuery table of key-value pairs is read into separate PCollections. // Each shares a common key ("K"). PCollection<KV<K, V1>> pt1 = ...; PCollection<KV<K, V2>> pt2 = ...; // Create tuple tags for the value types in each collection. final TupleTag<V1> t1 = new TupleTag<V1>(); final TupleTag<V2> t2 = new TupleTag<V2>(); //Merge collection values into a CoGbkResult collection PCollection<KV<K, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(t1, pt1) .and(t2, pt2) .apply(CoGroupByKey.<K>create()); // Access results and do something. PCollection<T> finalResultCollection = coGbkResultCollection.apply(ParDo.of( new DoFn<KV<K, CoGbkResult>, T>() { @Override public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); // Get all collection 1 values Iterable<V1> pt1Vals = e.getValue().getAll(t1); // Now get collection 2 values V2 pt2Val = e.getValue().getOnly(t2); ... Do Something .... c.output(...some T...); } }));
Using CoGroupByKey with Unbounded PCollections
Java
When using CoGroupByKey
to group PCollection
s that have a
windowing strategy applied, all of the
PCollection
s you want to group must use the same windowing strategy and window
sizing. For example, all the collections you're merging must use (hypothetically) identical
5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
If your pipeline attempts to use CoGroupByKey
to merge PCollection
s
with incompatible windows, Dataflow will generate an IllegalStateException
error when your pipeline is constructed.
Joins with ParDo and Side Inputs
Instead of using CoGroupByKey
, you can apply a ParDo
with one or more
side inputs to perform a join. A side input is an
additional input, other than the main input, that you can provide to your PCollection
.
Your DoFn
can access the side input each time it processes an element in the input
PCollection
.
Performing a join this way can be more efficient than using CoGroupByKey
in some
cases, such as:
- The
PCollection
s you are joining are disproportionate in size and the smallerPCollection
s could fit into memory. - You have a large table which you want to join against multiple times at different places
in the pipeline. Instead of doing a
CoGroupByKey
several times, you can create one side input and pass it to multipleParDo
s. For example, you'd like to join two tables to generate a third. Then you'd like to join the first table with the third and so on.
To perform a join this way, apply a ParDo
to one of the PCollection
s
and pass the other PCollection
(s) as side inputs. The following code sample shows how
to perform such a join.
Java
// Each BigQuery table of key-value pairs is read into separate PCollections. PCollection<KV<K1, V1>> mainInput = ... PCollection<KV<K2, V2>> sideInput = ... // Create a view from your side input data final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap()); // Access side input and perform join logic PCollection<T> joinedData = mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() { @Override public void processElement(ProcessContext c) { K2 sideInputKey = ... transform c.element().getKey() into K2 ... V2 sideInputValue = c.sideInput(view).get(sideInputKey); V1 mainInputValue = c.element().getValue(); ... Do Something ... c.output(...some T...); } }));