Triggers

When collecting the data in a PCollection and grouping that data in finite windows, Dataflow needs some way to determine when to emit the aggregated results of each window. Given time skew and data lag, Dataflow uses a mechanism called triggers to determine when "enough" data has been collected in a window, after which it emits the aggregated results of that window, each of which is referred to as a pane.

Dataflow's triggers system provides several different ways to determine when to emit the aggregated results of a given window, depending on your system's data processing needs. For example, a system that requires prompt or time-sensitive updates might use a strict time-based trigger that emits a window every N seconds, valuing promptness over data completeness; a system that values data completeness more than the exact timing of results might use a data-based trigger that waits for a certain number of data records to accumulate before closing a window.

Triggers are particularly useful for handling two kinds of conditions in your pipeline:

  • Triggers can help you handle instances of late data.
  • Triggers can help you emit early results, before all the data in a given window has arrived.

Note: You can set a trigger for an unbounded PCollection that uses a single global window for its windowing function. This can be useful when you want your pipeline to provide periodic updates on an unbounded data set—for example, a running average of all data provided to the present time, updated every N seconds or every N elements.

Types of Triggers

Dataflow provides a number of pre-built triggers that you can set for your PCollections. There are three major kinds of triggers:

  • Time-based triggers. These triggers operate on a time reference--either event time (as indicated by the timestamp on each data element) or the processing time (the time when the data element is processed at any given stage in the pipeline).
  • Data-driven triggers. These triggers operate by examining the data as it arrives in each window and firing when a data condition that you specify is met. For example, you can set a trigger to emit results from a window when that window has received a certain number of data elements.
  • Composite triggers. These triggers combine multiple time-based or data-driven triggers in some logical way. You can set a composite trigger to fire when all triggers are met (logical AND), when any trigger is met (logical OR), etc.

Time-Based Triggers

Dataflow's time-based triggers include AfterWatermark and AfterProcessingTime. These triggers take a time reference in either event time or processing time, and set a timer based on that time reference.

AfterWatermark

The AfterWatermark trigger operates on event time. The AfterWatermark trigger will emit the contents of a window after the watermark passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, Dataflow's notion of input completeness within your pipeline at any given point.

The AfterWatermark trigger only fires when the watermark passes the end of the window—it is the primary trigger that Dataflow uses to emit results when the system estimates that it has all the data in a given time-based window.

AfterProcessingTime

The AfterProcessingTime trigger operates on processing time. The AfterProcessingTime trigger emits a window after a certain amount of processing time has passed since the time reference, such as the start of a window. The processing time is determined by the system clock, rather than the data element's timestamp.

The AfterProcessingTime trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window.

Data-Driven Triggers

Dataflow currently provides only one data-driven trigger, AfterPane.elementCountAtLeast. This trigger works on a straight element count; it fires after the current pane has collected at least N elements.

The AfterPane.elementCountAtLeast() is a good way to cause a window to emit early results, before all the data has accumulated, especially in the case of a single global window.

Default Trigger

The default trigger for a PCollection is event time-based, and emits the results of the window when the system's watermark (Dataflow's notion of when it "should" have all the data) passes the end of the window. The default triggering configuration emits exactly once, and late data is discarded. This is because the default windowing and trigger configuration has an allowed lateness value of 0. See Handling Late Data for information on modifying this behavior.

The watermark depends on the data source; in some cases, it is an estimate. In others, such as Pub/Sub with system-assigned timestamps, the watermark can provide an exact bound of what data the pipeline has processed.

Handling Late Data

If your pipeline needs to handle late data (data that arrives after the watermark passes the end of the window), you can apply an allowed lateness when setting your windowing and trigger configuration. This will give your trigger the opportunity to react to the late data; in the default triggering configuration it will emit new results immediately whenever late data arrives.

You set the allowed lateness by using .withAllowedLateness() when setting your window and trigger, as follows:

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .withAllowedLateness(Duration.standardMinutes(30));

This allowed lateness propagates to all PCollections derived as a result of applying transforms to the original PCollection. If you want to change the allowed lateness later in your pipeline, you can apply Window.withAllowedLateness() again, explicitly.

Setting a Trigger

When you set a windowing function for a PCollection by using the Window transform, you can also specify a trigger.

You set the trigger(s) for a PCollection by invoking the method .triggering() on the result of your Window.into() transform, as follows:

Java

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

The preceding code sample sets a trigger for a PCollection; the trigger is time-based, and emits each window one minute after the first element in that window has been processed. The last line in the code sample, .discardingFiredPanes(), is the window's accumulation mode.

Window Accumulation Modes

When you specify a trigger, you must also set the the window's accumulation mode. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.

To set a window to accumulate the panes that are produced when the trigger fires, invoke .accumulatingFiredPanes() when you set the trigger. To set a window to discard fired panes, invoke .discardingFiredPanes().

Let's look an an example that uses a PCollection with fixed-time windowing and a data-based trigger. (This is something you'd do if, for example, each window represented a ten-minute running average, but you wanted to display the current value of the average in a UI more frequently than every ten minutes.) We'll assume the following conditions:

  • The PCollection uses 10-minute fixed-time windows.
  • The PCollection has a repeating trigger that fires every time 3 elements arrive.

The following diagram shows data events as they arrive in the PCollection and are assigned to windows*:

A diagram of data, per key, arriving in a PCollection with fixed-time windowing.
Figure 1: Data events in a PCollection with fixed-time windowing.

Note: To keep the diagram a bit simpler, we'll assume that the events all arrive in the pipeline in order.

For simplicity, let's only consider values associated with key X.

Accumulating Mode

If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires (remember, the trigger fires every time three elements arrive):

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [5, 8, 3, 15, 19, 23]
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]

Discarding Mode

If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [15, 19, 23]
    Third trigger firing:  [9, 13, 10]

Effects of Accumulating vs. Discarding

Now, let's add a per-key calculation to our pipeline. Every time the trigger fires, the pipeline applies a Combine.perKey that calculates a mean average for all values associated with each key in the window.

Again, let's just consider key X:

With the trigger set to .accumulatingFiredPanes:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [5, 8, 3, 15, 19, 23]
      Average after second trigger firing: 12.167
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
      Average after third trigger firing: 11.667

With the trigger set to .discardingFiredPanes:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [15, 19, 23]
      Average after second trigger firing: 19
    Third trigger firing:  [9, 13, 10]
      Average after third trigger firing: 10.667

Note that the Combine.perKey that computes the mean average produces different results in each case.

In general, a trigger set to .accumulatingFiredPanes always outputs all data in a given window, including any elements previously triggered. A trigger set to .discardingFiredPanes outputs incremental changes since the last time the trigger fired. Accumulating mode is most appropriate before a grouping operation that combines or otherwise updates the elements; otherwise, use discarding mode.

Trigger Continuation

When you apply an aggregating transform such as GroupByKey or Combine.perKey to a PCollection for which you've specified a trigger, keep in mind that the GroupByKey or Combine.perKey produces a new output PCollection; the trigger that you set for the input collection does not propagate onto the new output collection.

Instead, the Dataflow SDK creates a comparable trigger for the output PCollection based on the trigger that you specified for the input collection. The new trigger attempts to emit elements as fast as reasonably possible, at roughly the rate specified by the original trigger on the input PCollection. Dataflow determines the properties of the new trigger based on the parameters you provided for the input trigger:

  • AfterWatermark's continuation trigger is identical to the original trigger by default. If the AfterWatermark trigger has early or late firings specified, the early or late firings of the continuation will be the continuation of the original trigger's early or late firings.
  • AfterProcessingTime's default continuation trigger fires after the synchronized processing time for the amalgamated elements, and does not propagate any additional delays. For example, consider a trigger such as AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour). After a GroupByKey, the trigger Dataflow supplies for the output collection will be synchronized to the same aligned time for each key, but will not retain the 1 hour delay.
  • AfterCount's default continuation trigger fires on every element. For example, AfterCount(n) on the input collection becomes AfterCount(1) on the output collection.

If you feel the trigger that Dataflow generates for a PCollection output from GroupByKey or Combine.perKey is not sufficient, you should explicitly set a new trigger for that collection.

Combining Triggers

In Dataflow, you can combine multiple triggers to form composite triggers. You can use Dataflow's composite triggers system to logically combine multiple triggers. You can also specify a trigger to emit results repeatedly, at most once, or under other custom conditions.

Composite Trigger Types

Dataflow includes the following composite triggers:

  • You can add additional early firings or late firings to AfterWatermark.pastEndOfWindow.
  • Repeatedly.forever specifies a trigger that executes forever. Any time the trigger's conditions are met, it causes a window to emit results, then resets and starts over. It can be useful to combine Repeatedly.forever with .orFinally to specify a condition to cause the repeating trigger to stop.
  • AfterEach.inOrder combines multiple triggers to fire in a specific sequence. Each time a trigger in the sequence emits a window, the sequence advances to the next trigger.
  • AfterFirst takes multiple triggers and emits the first time any of its argument triggers is satisfied. This is equivalent to a logical OR operation for multiple triggers.
  • AfterAll takes multiple triggers and emits when all of its argument triggers are satisfied. This is equivalent to a logical AND operation for multiple triggers.
  • orFinally can serve as a final condition to cause any trigger to fire one final time and never fire again.

Composition with AfterWatermark.pastEndOfWindow

Some of the most useful composite triggers fire a single time when the system estimates that all the data has arrived (i.e. when the watermark passes the end of the window) combined with either, or both, of the following:

  • Speculative firings that precede the watermark passing the end of the window to allow faster processing of partial results.
  • Late firings that happen after the watermark passes the end of the window, to allow for handling late-arriving data

You can express this pattern using AfterWatermark.pastEndOfWindow. For example, the following example trigger code fires on the following conditions:

  • On the system's estimate that all the data has arrived (the watermark passes the end of the window)
  • Any time late data arrives, after a ten-minute delay
  • After two days, we assume no more data of interest will arrive, and the trigger stops executing

Java

  .apply(Window
      .triggering(AfterWatermark
           .pastEndOfWindow()
           .withLateFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(10))))
      .withAllowedLateness(Duration.standardDays(2)));

Other Composite Triggers

You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.

Java

Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(100),
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

Trigger Grammar

The following grammar describes the various ways that you can combine triggers into composite triggers:

TRIGGER ::=
   ONCE_TRIGGER
   Repeatedly.forever(TRIGGER)
   TRIGGER.orFinally(ONCE_TRIGGER)
   AfterEach.inOrder(TRIGGER, TRIGGER, ...)

ONCE_TRIGGER ::=
  TIME_TRIGGER
  WATERMARK_TRIGGER
  AfterPane.elementCountAtLeast(Integer)
  AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)
  AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)

TIME_TRIGGER ::=
  AfterProcessingTime.pastFirstElementInPane()
  TIME_TRIGGER.alignedTo(Duration)
  TIME_TRIGGER.alignedTo(Duration, Instant)
  TIME_TRIGGER.plusDelayOf(Duration)
  TIME_TRIGGER.mappedBy(Instant -> Instant)

WATERMARK_TRIGGER ::=
  AfterWatermark.pastEndOfWindow()
  WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER)
  WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER)

Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation