Handling Multiple PCollections

Some Dataflow SDK transforms can take multiple PCollection objects as input or produce multiple PCollection objects as output. The Dataflow SDKs provide several different ways to bundle together multiple PCollection objects.

For PCollection objects storing the same data type, the Dataflow SDKs also provide the Flatten or Partition transforms. Flatten merges multiple PCollection objects into a single logical PCollection, while Partition splits a single PCollection into a fixed number of smaller collections.

PCollections Storing the Same Type


You can encapsulate multiple PCollection objects that all store the same data type (such as PCollection<String> in Java) using the class PCollectionList.

PCollectionList represents a collection of PCollection objects that store the same data type, such as String or Integer. The Flatten transform, for example, takes a PCollectionList and combines all the elements in all of the collections into a single logical PCollection.

Similarly, the Partition transform can split a single PCollection into a PCollectionList containing multiple PCollection objects, based on a partitioning function (dividing the input into percentile groups, for example).

The following example code shows how to create a PCollectionList from individual PCollection objects that contain String elements:

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

You can access the individual PCollection objects in a PCollectionList by index (the index is zero-based), as in the following code sample:

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

While all PCollection objects in a PCollectionList must contain the same data type, they need not have the same data encoding. A PCollectionList can have two PCollection<Integer> objects that use two different coders (for example, big endian and little endian).

PCollections Storing Different Types

Some advanced transforms take multiple inputs and produce multiple outputs of different types. A ParDo transform with side outputs, for example, might produce multiple PCollection objects that contain different data types.


The Dataflow Java SDK uses a tagged tuple system to represent a collection of PCollection objects. These tagged tuples help maintain type safety. The PCollection objects are contained in a PCollectionTuple class. For each PCollection in the tuple, you must create an associated TupleTag. You use the TupleTag to index, retrieve, and identify each PCollection in the PCollectionTuple.

Note: You should use PCollectionTuple when representing a group of PCollection objects that may store different types, such as PCollection<String> and PCollection<Integer>. If your PCollection objects all contain the same data type, consider using PCollectionList.

In the Dataflow Java SDK, each PCollectionTuple is keyed by a TupleTag. TupleTag is a class included in the Dataflow Java SDK specifically for indexing heterogeneously-typed tuples. The combination of tuple classes and TupleTag objects provide for a reasonable degree of type safety when using heterogeneously-typed tuples as inputs and outputs for your transforms.

When you create a PCollectionTuple, you'll need to also create a TupleTag for each PCollection that the tuple contains. Each TupleTag type must match the type for each PCollection in the tuple.

The TupleTag type enables tracking the static type for each PCollection in the tuple.

The following example code shows how to create a PCollectionTuple containing three PCollection objects, which respectively contain String, Integer, and Iterable<String> values:

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

To extract a particular PCollection from a tuple, you use the method PCollectionTuple.get, passing the TupleTag you used for that collection when you created the tuple:

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

If you need to create an empty tuple, you can use the method PCollectionTuple.empty. This method creates an empty tuple associated with a given pipeline:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

You can use the method PCollectionTuple.getAll to obtain a Map of all PCollection objects in a tuple, together with their associated TupleTag objects:

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

You can use the method PCollectionTuple.has to check if a tuple contains a PCollection associated with the given TupleTag.

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Merging PCollections with Flatten

If your pipeline has multiple PCollection objects that contain the same data type, you can merge them into a single logical PCollection using the Flatten transform.

Applying a Flatten Transform


Flatten takes a PCollectionList of any number of PCollection objects of a given type, and returns a single PCollection that contains all of the elements in the PCollection objects in that list.

The following example code shows how to apply a Flatten transform to merge multiple PCollection<String> objects into a single PCollection<String>. The example first constructs a PCollectionList containing all of the PCollection objects to be merged.

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

In Java 7, note that when you use the generic factory method Flatten.pCollections you'll need to specify a type parameter that corresponds to the type of element that each input PCollection holds.

Data Encoding in Merged Collections

By default, the coder for the output PCollection is the same as the coder for the first PCollection in the input PCollectionList. However, the input PCollection objects can each use different coders, as long as they all contain the same data type in your chosen language.

Merging Windowed Collections

When using Flatten to merge PCollection objects that have a windowing strategy applied, all of the PCollection objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you're merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use Flatten to merge PCollection objects with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.

Splitting a PCollection with Partition

You can use the Partition core transform to divide the elements in a single logical PCollection into N partitions. Each resulting partition is a PCollection, and those partitions are bundled together as a list of PCollection objects.

You can use Partition to divide a single PCollection into logical groups, such as percentile groups. This can be useful if your pipeline needs to perform different processing for each different percentile group, for example.

Applying a Partition Transform

Partition divides the elements of a PCollection according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection.

Note: The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).


The following example code divides a PCollection of objects of type Student into percentile groups:

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;

When you pass the Partition transform as an argument to apply, you'll need to provide an int value with the number of result partitions that you want, and a PartitionFn that represents the partitioning function. In the example, we define the PartitionFn in-line.

The return value from apply is a PCollectionList containing each of the resulting partitions as individual PCollection objects. You can extract each partition from the PCollectionList using the get method, as follows:

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);