The Dataflow SDKs use a specialized class called PCollection
to represent data in a
pipeline. A PCollection
represents a multi-element data set.
You can think of a PCollection
as "pipeline" data. Dataflow's
transforms use PCollection
s as inputs and
outputs; as such, if you want to work with data in your pipeline, it must be in the form of a
PCollection
. Each PCollection
is owned by a specific
Pipeline
object, and only that Pipeline
object can use it.
IMPORTANT: This document contains information about unbounded
PCollection
s and Windowing. These concepts refer to the Dataflow Java SDK
only, and are not yet available in the Dataflow Python SDK.
PCollection Characteristics
A PCollection
represents a potentially large, immutable "bag" of elements.
There is no upper limit on how many elements a PCollection
can contain; any given
PCollection
might fit in memory, or it might represent a very large data set backed
by a persistent data store.
Java
The elements of a PCollection
can be of any type, but must all be of the same type.
However, Dataflow needs to be able to encode each individual element as a byte string in order to
support distributed processing. The Dataflow SDKs provide a
Data Encoding mechanism that includes built in
encodings for commonly used types and support for specifying custom encodings as needed.
Creating a valid encoding for an arbitrary type can be challenging, but you can
construct custom encoding for simple structured types.
An important data type for large scale data processing is the key/value pair. The
Dataflow SDKs use the class KV<K, V>
to represent key/value pairs.
Python
An important data type for large scale data processing is the key/value pair. The Dataflow Python SDK uses 2-tuples to represent key/value pairs.
PCollection Limitations
A PCollection
has several key aspects in which it differs from a regular collection
class:
- A
PCollection
is immutable. Once created, you cannot add, remove, or change individual elements. - A
PCollection
does not support random access to individual elements. - A
PCollection
belongs to the pipeline in which it is created. You cannot share aPCollection
betweenPipeline
objects.
A PCollection
may be physically backed by data in existing storage, or it may
represent data that has not yet been computed. As
such, the data in a PCollection
is immutable. You can use a PCollection
in computations that generate new pipeline data (as a new PCollection
); however, you
cannot change the elements of an existing PCollection
once it has been created.
A PCollection
does not store data, per se; remember that a
PCollection
may have too many elements to fit in local memory where your Dataflow
program is running. When you create or transform a PCollection
, data isn't copied
or moved in memory as with some regular container classes. Instead, a PCollection
represents a potentially very large data set in the cloud.
Bounded and Unbounded PCollections
A PCollection
's size can be either bounded or unbounded, and the
boundedness (or unboundedness) is determined when you create the PCollection
. Some
root transforms create bounded PCollections
, while others create unbounded ones; it
depends on the source of your input data.
Bounded PCollections
Your PCollection
is bounded if it represents a fixed data set, which has a
known size that doesn't change. An example of a fixed data set might be "server logs from the
month of October", or "all orders processed last week." TextIO
and
BigQueryIO
root transforms create bounded PCollection
s.
Data sources that create bounded PCollection
s include:
Java
TextIO
BigQueryIO
DatastoreIO
- Custom bounded data sources you create using the Custom Source API
Python
TextIO
BigQueryIO
- Custom bounded data sources you create using the Custom Source API
Data sinks that accept bounded PCollection
s include:
Java
TextIO
BigQueryIO
DatastoreIO
- Custom bounded data sinks you create using the Custom Sink API
Python
TextIO
BigQueryIO
- Custom bounded data sinks you create using the Custom Sink API
Unbounded PCollections
Your PCollection
is unbounded if it represents a continuously updating data
set, or streaming data. An example of a continuously updating data set might be "server logs
as they are generated" or "all new orders as they are processed." PubsubIO
root
transforms create unbounded PCollection
s.
Some sources, particularly those that create unbounded PCollection
s (such as
PubsubIO
), automatically append a timestamp to each element of the collection.
Data sources that create unbounded PCollection
s include:
PubsubIO
- Custom unbounded data sources you create using the Custom Source API
Data sinks that accept unbounded PCollection
s include:
PubsubIO
BigQueryIO
Processing Characteristics
The bounded (or unbounded) nature of your PCollection
affects how Dataflow processes
your data. Bounded PCollection
s can be processed using batch jobs, which might read
the entire data set once, and perform processing in a finite job. Unbounded
PCollection
s must be processed using streaming jobs, as the entire collection can
never be available for processing at any one time.
When grouping unbounded PCollection
s, Dataflow requires a concept called
Windowing to divide a continuously updating data set into
logical windows of finite size. Dataflow processes each window as a bundle, and processing continues as
the data set is generated. See the following section on Timestamps and
Windowing for more information.
PCollection Element Timestamps
Each element in a PCollection
has an associated timestamp.
Timestamps are useful for PCollection
s that contain elements with an inherent notion
of time. For example, a PCollection
of orders to process may use the time an order
was created as the element timestamp.
The timestamp for each element is initially assigned by the source that creates the
PCollection
. Sources that create unbounded
PCollection
often assign each new element a timestamp according to
when it was added to the unbounded PCollection
.
Java
Data sources that produce fixed data sets, such as BigQueryIO
or
TextIO
, also assign timestamps to each element; however, these data sources typically
assign the same timestamp (Long.MIN_VALUE
) to each element.
You can manually assign timestamps to the elements of a PCollection
. This is
commonly done when elements have an inherent timestamp, but that timestamp must be calculated,
for example by parsing it out of the structure of the element. To manually assign a timestamp,
use a ParDo transform; within the ParDo
transform, your DoFn
can produce output elements with timestamps. See
Assigning Timestamps for more
information.
Python
You can manually assign timestamps to the elements of a PCollection
. This is
commonly done when elements have an inherent timestamp, but that timestamp must be calculated,
for example by parsing it out of the structure of the element. To manually assign a timestamp,
use a ParDo transform; within the ParDo
transform, your
DoFn
can produce output elements with timestamps.
Windowing
The timestamps associated with each element in a PCollection
are used for a concept
called Windowing. Windowing divides the elements of a
PCollection
according to their timestamps. Windowing can be used on all
PCollection
s, but is required for some computations over unbounded
PCollection
s in order to divide the continuous data stream in finite chunks for
processing.
See the section on Windowing for more information on how to use Dataflow's Windowing concepts in your pipeline.
Creating a PCollection
To work with a data set in a Cloud Dataflow pipeline, you'll need to create a
PCollection
to represent the data, wherever it is stored. The Dataflow
SDKs provide two principal ways to create an initial PCollection
:
- You can read the data from an external data source, such as a file.
- You can create a
PCollection
of data that's stored in an in-memory collection class.
Reading External Data
See Pipeline I/O for more information on reading data from an external data source.
Creating a PCollection from Data In Local Memory
You can create a PCollection
out of data in local memory so that you can use that
data in your pipeline's transforms. Typically, you use data from local memory to test your
pipeline with smaller data sets, and to reduce your pipeline's dependence on external I/O while
testing.
Java
To create a PCollection
from an in-memory Java Collection
, you
apply
the Create
transform. Create
is a root
PTransform
provided by the Dataflow SDK for Java. Create
accepts a
Java Collection
and a Coder
object, which specifies how the elements
in the Collection
should be encoded.
The following code sample creates a PCollection
of String
, representing
individual lines of text, from a Java List
:
// Create a Java Collection, in this case a List of Strings. static final List<String> LINES = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()) // create the PCollection
The code above uses Create.of
, which produces a PCollection
containing the specified elements. Note that if your pipeline uses
windowing, you should use
Create.timestamped
instead. Create.timestamped
produces a PCollection
containing the
specified elements with specified timestamps.
Python
To create a PCollection
, you apply the Create
transform.
Create
is a standard transform provided by the Dataflow Python SDK.
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # argv = None # if None, uses sys.argv pipeline_options = PipelineOptions(argv) with beam.Pipeline(options=pipeline_options) as pipeline: lines = ( pipeline | beam.Create([ 'To be, or not to be: that is the question: ', "Whether 'tis nobler in the mind to suffer ", 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ', ]))
Using PCollection with Custom Data Types
You can create a PCollection
where the element type is a custom data type that you
provide. This can be useful if you need to create a collection of your own class or
structure with specific fields, like a Java class that holds a customer's name, address, and phone
number.
When you create a PCollection
of a custom type, you'll need to provide a
Coder
for that custom type. The
Coder
tells the Dataflow
service how to serialize and deserialize the elements of your PCollection
as your
dataset is parallelized and partitioned out to multiple pipeline worker instances; see
data encoding for more information.
Dataflow will attempt to infer a Coder
for any PCollection
for which
you do not explicitly set a Coder
. The default Coder
for a custom type is
SerializableCoder
, which uses Java serialization. However, Dataflow recommends
using AvroCoder
as the Coder
when possible.
You can register AvroCoder
as the default coder for your data type by using your
Pipeline
object's
CoderRegistry. Annotate your class as
follows:
Java
@DefaultCoder(AvroCoder.class) public class MyClass { ... }
To ensure that your custom class is compatible with AvroCoder
, you might need to
add some additional annotations—for example, you must annotate null fields in your data type
with org.apache.avro.reflect.Nullable
. See the API for Java reference documentation for
AvroCoder
and the package
documentation for org.apache.avro.reflect
for more information.
Dataflow's TrafficRoutes example pipeline
creates a PCollection
whose element type is a custom class called
StationSpeed
. StationSpeed
registers AvroCoder
as its
default coder as follows:
Java
/** * This class holds information about a station reading's average speed. */ @DefaultCoder(AvroCoder.class) static class StationSpeed { @Nullable String stationId; @Nullable Double avgSpeed; public StationSpeed() {} public StationSpeed(String stationId, Double avgSpeed) { this.stationId = stationId; this.avgSpeed = avgSpeed; } public String getStationId() { return this.stationId; } public Double getAvgSpeed() { return this.avgSpeed; } }