Windowing

The Dataflow SDKs use a concept called Windowing to subdivide a PCollection according to the timestamps of its individual elements. Dataflow transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis—that is, they process each PCollection as a succession of multiple, finite windows, though the entire collection itself may be of unlimited or infinite size.

The Dataflow SDKs use a related concept called Triggers to determine when to "close" each finite window as unbounded data arrives. Using a trigger can help to refine the windowing strategy for your PCollection to deal with late-arriving data or to provide early results. See Triggers for more information.

Windowing Basics

Windowing is most useful with an unbounded PCollection, which represents a continuously updating data set of unknown/unlimited size (e.g. streaming data). Some Dataflow transforms, such as GroupByKey and Combine, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key in the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added.

In the Dataflow model, any PCollection can be subdivided into logical windows. Each element in a PCollection gets assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each PCollection's elements on a per-window basis. GroupByKey, for example, implicitly groups the elements of a PCollection by key and window. Dataflow only groups data within the same window, and doesn't group data in other windows.

Windowing Constraints

Once you set the windowing function for a PCollection, the elements' windows are used the next time you apply a grouping transform to that PCollection. Dataflow performs the actual window grouping on an as-needed basis; if you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until you group the PCollection with GroupByKey or Combine. This can have different effects on your pipeline.

Consider the example pipeline in Figure 1 below:

A pipeline that applies windowing, a ParDo, and a GroupByKey in order.
Figure 1: Pipeline Applying Windowing

In the above pipeline, we create an unbounded PCollection by reading a set of key/value pairs using PubsubIO, and then apply a windowing function to that collection using the Window transform. We then apply a ParDo to the the collection, and then later group the result of that ParDo using GroupByKey. The windowing function has no effect on the ParDo transform, because the windows are not actually used until they're needed for the GroupByKey.

Subsequent transforms, however, are applied to the result of the GroupByKey--that is, data grouped by both key and window.

Using Windowing With Bounded PCollections

You can use windowing with fixed-size data sets in bounded PCollections. Note, however, that windowing considers only the implicit timestamps attached to each element of a PCollection, and data sources that create fixed data sets (such as TextIO and BigQueryIO) assign the same timestamp to every element. This means that all the elements are by default part of a single, global window. Having all elements assigned to the same window will cause a pipeline to execute in classic MapReduce batch style.

To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, you use a ParDo transform with a DoFn that outputs each element with a new timestamp.

Using windowing with a bounded PCollection can affect how your pipeline processes data. For example, consider the following pipeline:

A pipeline that applies a GroupByKey followed by a ParDo on a bounded collection.
Figure 2: GroupByKey and ParDo without windowing, on a bounded collection.

In the above pipeline, we create a bounded PCollection by reading a set of key/value pairs using TextIO. We then group the collection using GroupByKey, and apply a ParDo transform to the grouped PCollection. In this example, the GroupByKey creates a collection of unique keys, and then ParDo gets applied exactly once per key.

Now, consider the same pipeline, but using a windowing function:

A pipeline that applies windowing, then a GroupByKey followed by a ParDo on a bounded
              collection.
Figure 3: GroupByKey and ParDo with windowing, on a bounded collection.

As before, the pipeline creates a bounded PCollection of key/value pairs. We then set a windowing function for that PCollection. The GroupByKey transform now groups the elements of the PCollection by both key and window. The subsequent ParDo transform gets applied multiple times per key, once for each window.

Windowing Functions

The Dataflow SDKs let you define different kinds of windows to divide the elements of your PCollection. The SDK provides several windowing functions, including:

  • Fixed Time Windows
  • Sliding Time Windows
  • Per-Session Windows
  • Single Global Window

Note that each element can logically belong to more than one window, depending on the windowing function you use. Sliding time windowing, for example, creates overlapping windows wherein a single element can be assigned to multiple windows.

Fixed Time Windows

The most simple form of windowing is a fixed time window: given a timestamped PCollection, which might be continously updating, each window might capture (for example) five minutes worth of elements.

A fixed time window represents the time interval in the data stream that defines a bundle of data for processing. Consider a window that operates at five-minute intervals: all of the elements in your unbounded PCollection with timestamp values between 0:00:00 and 0:04:59 belong to the first window, elements with timestamp values between 0:05:00 and 0:09:59 belong to the second window, and so on.

A diagram representing fixed-time windowing.
Figure 4: Fixed time windows, 30s in size.

Sliding Time Windows

A sliding time window also uses time intervals in the data stream to define bundles of data; however, with sliding time windowing, the windows overlap. Each window might capture five minutes worth of data, but a new window starts every ten seconds. The frequency with which sliding windows begin is called the period. Therefore, our example would have a window size of five minutes and a period of ten seconds.

Because multiple windows overlap, most elements in a data set will belong to more than one window. This kind of Windowing is useful for taking running averages of data; using sliding time windows, you can compute a running average of the past five minutes' worth of data, updated every ten seconds, in our example.

A diagram representing sliding-time windowing.
Figure 5: Sliding time windows, with 1 minute window size and 30s window period.

Session Windows

A session window function defines windows around areas of concentration in the data. Session windowing is useful for data that is irregularly distributed with respect to time; for example, a data stream representing user mouse activity may have long periods of idle time interspersed with high concentrations of clicks. Session windowing groups the high concentrations of data into separate windows and filters out the idle sections of the data stream.

Note that session windowing applies on a per-key basis; that is, grouping into sessions only takes into account data that has the same key. Each key in your data collection will therefore be grouped into disjoint windows of differing sizes.

The simplest kind of session windowing specifies a minimum gap duration. All data arriving below a minimum threshold of time delay is grouped into the same window. If data arrives after the minimum specified gap duration time, this initiates the start of a new window.

A diagram representing session windowing.
Figure 5: Session windows, with a minimum gap duration. Note how each data key has different windows, according to its data distribution.

Single Global Window

By default, all data in a PCollection is assigned to a single global window. If your data set is of a fixed size, you can leave the global window default for your PCollection. If the elements of your PCollection all belong to a single global window, your pipeline will execute much like a batch processing job (as in MapReduce-based processing).

Other Windowing Functions

The Dataflow SDKs provide more windowing functions beyond fixed, sliding, session, and global windows, such as Calendar-based windows.

Java

See the package com.google.cloud.dataflow.sdk.transforms.windowing for a complete list of the available windowing functions in the Dataflow SDK for Java.

Setting Your PCollection's Windowing Function

You can set the windowing function for a PCollection by applying the Window transform. When you apply the Window transform, you must provide a WindowFn. The WindowFn determines the windowing function your PCollection will use for subsequent grouping transforms, such as a fixed or sliding time window.

The Dataflow SDKs provide pre-defined WindownFns for the basic windowing functions, or you can define your own WindowFn in advanced cases.

Technically, like all transforms, Window takes an input PCollection and outputs a new PCollection with each element assigned to one or more logical, finite windows.

Setting Fixed-Time Windows

The following example code shows how to apply Window to divide a PCollection into fixed windows, each one minute in length:

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

Setting Sliding Time Windows

The following example code shows how to apply Window to divide a PCollection into sliding time windows. Each window is 30 minutes in length, and a new window begins every five seconds:

Java

  PCollection<String> items = ...;
  PCollection<String> sliding_windowed_items = items.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

Setting Session Windows

The following example code shows how to apply Window to divide a PCollection into session windows, where each session must be separated by a time gap of at least 10 minutes:

Java

  PCollection<String> items = ...;
  PCollection<String> session_windowed_items = items.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

Note that the sessions are per-key—each key in the collection will have its own session groupings depending on the data distribution.

Setting a Single Global Window

If your PCollection is bounded (the size is fixed), you can assign all the elements to a single global window. The following example code shows how to set a single global window for a PCollection:

To set a single global window for your PCollection, pass new GlobalWindows() as the WindowFn when you apply the Window transform. The following example code shows how to apply Window to assign a PCollection into a single global window:

Java

  PCollection<String> items = ...;
  PCollection<String> batch_items = items.apply(
    Window.<String>into(new GlobalWindows()));

Time Skew, Data Lag, and Late Data

In any data processing system, there is a certain amount of lag between the time a data event occurs (the "event time", determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the "processing time", determined by the clock on the system processing the element).

In a perfect system, the event time for each data element and the processing time would be equal, or at least have a consistent delta. However, in any real-world computing system, data generation and delivery are subject to any number of temporal limitations. In large or distributed systems, such as a distributed collection of web front-ends generating customer orders or log files, there are no guarantees that data events will appear in your pipeline in the same order that they were generated in various places on the web.

For example, let's say we have a PCollection that's using fixed-time windowing, with windows that are five minutes long. For each window, Dataflow must collect all the data with an event time timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belong to a different window.

However, data isn't always guaranteed to arrive in a pipeline in correct time order, or to always arrive at predictable intervals. Dataflow tracks a watermark, which is the system's notion of when all data in a certain window can be expected to have arrived in the pipeline. Data that arrives with a timestamp after the watermark is considered late data.

From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Dataflow would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.

Note: For simplicity, we've assumed that we're using a very straightforward watermark that estimates the lag time/time skew. In practice, your PCollection's data source determines the watermark, and watermarks can be more precise or complex.

Managing Time Skew and Late Data

You can allow late data by invoking the .withAllowedLateness operation when you set your PCollection's windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window.

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .withAllowedLateness(Duration.standardDays(2)));

When you set .withAllowedLateness on a PCollection, that allowed lateness propagates forward to any subsequent PCollection derived from the first PCollection you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying Window.withAllowedLateness() again.

You can also use Dataflow's Triggers API to help you refine the windowing strategy for a PCollection. You can use triggers to determine exactly when each individual window aggregates and reports its results, including how the window emits late elements.

Note: Dataflow's default windowing and trigger strategies discard late data. If you want to ensure that your pipeline handles instances of late data, you'll need to explicitly set .withAllowedLateness when you set your PCollection's windowing strategy and set triggers for your PCollections accordingly.

Adding Timestamps To a PCollection's Elements

You can assign new timestamps to the elements of a PCollection by applying a ParDo transform that outputs new elements with timestamps that you set. Assigning timestamps can be useful if you want to use Dataflow's windowing features, but your data set comes from a source without implicit timestamps (such as a file from TextIO).

This is a good pattern to follow when your data set includes timestamp data, but the timestamps are not generated by the Dataflow data source. An example might be if your pipeline reads log records from an input file, and each log record includes a timestamp field; since your pipeline reads the records in from a file, the file source doesn't assign timestamps automatically. You can parse the timestamp field from each record and use a ParDo transform to attach the timestamps to each element in your PCollection.

Java

To assign timestamps, your ParDo transform needs to use a DoFn that outputs elements using ProcessContext.outputWithTimestamp (rather than the usual ProcessContext.output used to emit elements to the main output collection). The following example code shows a ParDo with a DoFn that outputs elements with new timestamps:

  PCollection<LogEntry> unstampedLogs = ...;
  PCollection<LogEntry> stampedLogs =
      unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
        public void processElement(ProcessContext c) {
          // Extract the timestamp from log entry we're currently processing.
          Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
          // Use outputWithTimestamp to emit the log entry with timestamp attached.
          c.outputWithTimestamp(c.element(), logTimeStamp);
        }
      }));

Send feedback about...

Cloud Dataflow Documentation