PCollection

The Dataflow SDKs use a specialized class called PCollection to represent data in a pipeline. A PCollection represents a multi-element data set.

You can think of a PCollection as "pipeline" data. Dataflow's transforms use PCollections as inputs and outputs; as such, if you want to work with data in your pipeline, it must be in the form of a PCollection. Each PCollection is owned by a specific Pipeline object, and only that Pipeline object can use it.

IMPORTANT: This document contains information about unbounded PCollections and Windowing. These concepts refer to the Dataflow Java SDK only, and are not yet available in the Dataflow Python SDK.

PCollection Characteristics

A PCollection represents a potentially large, immutable "bag" of elements. There is no upper limit on how many elements a PCollection can contain; any given PCollection might fit in memory, or it might represent a very large data set backed by a persistent data store.

Java

The elements of a PCollection can be of any type, but must all be of the same type. However, Dataflow needs to be able to encode each individual element as a byte string in order to support distributed processing. The Dataflow SDKs provide a Data Encoding mechanism that includes built in encodings for commonly used types and support for specifying custom encodings as needed. Creating a valid encoding for an aribitrary type can be challenging, but you can construct custom encoding for simple structured types.

An important data type for large scale data processing is the key/value pair. The Dataflow SDKs use the class KV<K, V> to represent key/value pairs.

Python

An important data type for large scale data processing is the key/value pair. The Dataflow Python SDK uses 2-tuples to represent key/value pairs.

PCollection Limitations

A PCollection has several key aspects in which it differs from a regular collection class:

  • A PCollection is immutable. Once created, you cannot add, remove, or change individual elements.
  • A PCollection does not support random access to individual elements.
  • A PCollection belongs to the pipeline in which it is created. You cannot share a PCollection between Pipeline objects.

A PCollection may be physically backed by data in existing storage, or it may represent data that has not yet been computed. As such, the data in a PCollection is immutable. You can use a PCollection in computations that generate new pipeline data (as a new PCollection); however, you cannot change the elements of an existing PCollection once it has been created.

A PCollection does not store data, per se; remember that a PCollection may have too many elements to fit in local memory where your Dataflow program is running. When you create or transform a PCollection, data isn't copied or moved in memory as with some regular container classes. Instead, a PCollection represents a potentially very large data set in the cloud.

Bounded and Unbounded PCollections

A PCollection's size can be either bounded or unbounded, and the boundedness (or unboundedness) is determined when you create the PCollection. Some root transforms create bounded PCollections, while others create unbounded ones; it depends on the source of your input data.

Bounded PCollections

Your PCollection is bounded if it represents a fixed data set, which has a known size that doesn't change. An example of a fixed data set might be "server logs from the month of October", or "all orders processed last week." TextIO and BigQueryIO root transforms create bounded PCollections.

Data sources that create bounded PCollections include:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Custom bounded data sources you create using the Custom Source API

Python

  • TextIO
  • BigQueryIO
  • Custom bounded data sources you create using the Custom Source API

Data sinks that accept bounded PCollections include:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Custom bounded data sinks you create using the Custom Sink API

Python

  • TextIO
  • BigQueryIO
  • Custom bounded data sinks you create using the Custom Sink API

Unbounded PCollections

Your PCollection is unbounded if it represents a continuously updating data set, or streaming data. An example of a continuously updating data set might be "server logs as they are generated" or "all new orders as they are processed." PubsubIO root transforms create unbounded PCollections.

Some sources, particularly those that create unbounded PCollections (such as PubsubIO), automatically append a timestamp to each element of the collection.

Data sources that create unbounded PCollections include:

Data sinks that accept unbounded PCollections include:

  • PubsubIO
  • BigQueryIO

Processing Characteristics

The bounded (or unbounded) nature of your PCollection affects how Dataflow processes your data. Bounded PCollections can be processed using batch jobs, which might read the entire data set once, and perform processing in a finite job. Unbounded PCollections must be processed using streaming jobs, as the entire collection can never be available for processing at any one time.

When grouping unbounded PCollections, Dataflow requires a concept called Windowing to divide a continuously updating data set into logical windows of finite size. Dataflow processes each window as a bundle, and processing continues as the data set is generated. See the following section on Timestamps and Windowing for more information.

PCollection Element Timestamps

Each element in a PCollection has an associated timestamp. Timestamps are useful for PCollections that contain elements with an inherent notion of time. For example, a PCollection of orders to process may use the time an order was created as the element timestamp.

The timestamp for each element is initially assigned by the source that creates the PCollection. Sources that create unbounded PCollection often assign each new element a timestamp according to when it was added to the unbounded PCollection.

Java

Data sources that produce fixed data sets, such as BigQueryIO or TextIO, also assign timestamps to each element; however, these data sources typically assign the same timestamp (Long.MIN_VALUE) to each element.

You can manually assign timestamps to the elements of a PCollection. This is commonly done when elements have an inherent timestamp, but that timestamp must be calculated, for example by parsing it out of the structure of the element. To manually assign a timestamp, use a ParDo transform; within the ParDo transform, your DoFn can produce output elements with timestamps. See Assigning Timestamps for more information.

Python

You can manually assign timestamps to the elements of a PCollection. This is commonly done when elements have an inherent timestamp, but that timestamp must be calculated, for example by parsing it out of the structure of the element. To manually assign a timestamp, use a ParDo transform; within the ParDo transform, your DoFn can produce output elements with timestamps.

Windowing

The timestamps associated with each element in a PCollection are used for a concept called Windowing. Windowing divides the elements of a PCollection according to their timestamps. Windowing can be used on all PCollections, but is required for some computations over unbounded PCollections in order to divide the continuous data stream in finite chunks for processing.

See the section on Windowing for more information on how to use Dataflow's Windowing concepts in your pipeline.

Creating a PCollection

To work with a data set in a Cloud Dataflow pipeline, you'll need to create a PCollection to represent the data, wherever it is stored. The Dataflow SDKs provide two principal ways to create an initial PCollection:

  • You can read the data from an external data source, such as a file.
  • You can create a PCollection of data that's stored in an in-memory collection class.

Reading External Data

See Pipeline I/O for more information on reading data from an external data source.

Creating a PCollection from Data In Local Memory

You can create a PCollection out of data in local memory so that you can use that data in your pipeline's transforms. Typically, you use data from local memory to test your pipeline with smaller data sets, and to reduce your pipeline's dependence on external I/O while testing.

Java

To create a PCollection from an in-memory Java Collection, you apply the Create transform. Create is a root PTransform provided by the Dataflow SDK for Java. Create accepts a Java Collection and a Coder object, which specifies how the elements in the Collection should be encoded.

The following code sample creates a PCollection of String, representing individual lines of text, from a Java List:

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

The code above uses Create.of, which produces a PCollection containing the specified elements. Note that if your pipeline uses windowing, you should use Create.timestamped instead. Create.timestamped produces a PCollection containing the specified elements with specified timestamps.

Python

To create a PCollection, you apply the Create transform. Create is a standard transform provided by the Dataflow Python SDK.

with beam.Pipeline(options=pipeline_options) as p:

  lines = (p
           | beam.Create([
               'To be, or not to be: that is the question: ',
               'Whether \'tis nobler in the mind to suffer ',
               'The slings and arrows of outrageous fortune, ',
               'Or to take arms against a sea of troubles, ']))

Using PCollection with Custom Data Types

You can create a PCollection where the element type is a custom data type that you provide. This can be useful if you need to create a collection of your own class or structure with specific fields, like a Java class that holds a customer's name, address, and phone number.

When you create a PCollection of a custom type, you'll need to provide a Coder for that custom type. The Coder tells the Dataflow service how to serialize and deserialize the elements of your PCollection as your dataset is parallelized and partitioned out to multiple pipeline worker instances; see data encoding for more information.

Dataflow will attempt to infer a Coder for any PCollection for which you do not explictly set a Coder. The default Coder for a custom type is SerializableCoder, which uses Java serialization. However, Dataflow recommends using AvroCoder as the Coder when possible.

You can register AvroCoder as the default coder for your data type by using your Pipeline object's CoderRegistry. Annotate your class as follows:

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

To ensure that your custom class is compatible with AvroCoder, you might need to add some additional annotations—for example, you must annotate null fields in your data type with org.apache.avro.reflect.Nullable. See the API for Java reference documentation for AvroCoder and the package documentation for org.apache.avro.reflect for more information.

Dataflow's TrafficRoutes example pipeline creates a PCollection whose element type is a custom class called StationSpeed. StationSpeed registers AvroCoder as its default coder as follows:

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation