When collecting the data in a
PCollection and grouping that data in finite windows,
Dataflow needs some way to determine when to emit the aggregated results of each window. Given time skew and data lag, Dataflow uses a mechanism
called triggers to determine when "enough" data has been collected in a window, after which
it emits the aggregated results of that window, each of which is referred to as a pane.
Dataflow's triggers system provides several different ways to determine when to emit the aggregated results of a given window, depending on your system's data processing needs. For example, a system that requires prompt or time-sensitive updates might use a strict time-based trigger that emits a window every N seconds, valuing promptness over data completeness; a system that values data completeness more than the exact timing of results might use a data-based trigger that waits for a certain number of data records to accumulate before closing a window.
Triggers are particularly useful for handling two kinds of conditions in your pipeline:
- Triggers can help you handle instances of late data.
- Triggers can help you emit early results, before all the data in a given window has arrived.
Note: You can set a trigger for an unbounded
uses a single global window for its windowing function. This can be useful when you want your
pipeline to provide periodic updates on an unbounded data set—for example, a running average
of all data provided to the present time, updated every N seconds or every N
Types of Triggers
Dataflow provides a number of pre-built triggers that you can set for your
PCollections. There are three major kinds of triggers:
- Time-based triggers. These triggers operate on a time reference--either event time (as indicated by the timestamp on each data element) or the processing time (the time when the data element is processed at any given stage in the pipeline).
- Data-driven triggers. These triggers operate by examining the data as it arrives in each window and firing when a data condition that you specify is met. For example, you can set a trigger to emit results from a window when that window has received a certain number of data elements.
- Composite triggers. These triggers combine multiple time-based or data-driven triggers in some logical way. You can set a composite trigger to fire when all triggers are met (logical AND), when any trigger is met (logical OR), etc.
Dataflow's time-based triggers include
AfterProcessingTime. These triggers take a time reference in either event time or
processing time, and set a timer based on that time reference.
AfterWatermark trigger operates on event time. The
AfterWatermark trigger will emit the contents of a window after the watermark passes
the end of the window, based on the timestamps attached to the data elements. The watermark is a
global progress metric, Dataflow's notion of input completeness within your pipeline at any given
AfterWatermark trigger only fires when the watermark passes the end of
the window—it is the primary trigger that Dataflow uses to emit results when the system
estimates that it has all the data in a given time-based window.
AfterProcessingTime trigger operates on processing time. The
AfterProcessingTime trigger emits a window after a certain amount of processing time
has passed since the time reference, such as the start of a window. The processing time is
determined by the system clock, rather than the data element's timestamp.
AfterProcessingTime trigger is useful for triggering early results from a
window, particularly a window with a large time frame such as a single global window.
Dataflow currently provides only one data-driven trigger,
AfterPane.elementCountAtLeast. This trigger works on a straight element count; it
fires after the current pane has collected at least N elements.
AfterPane.elementCountAtLeast() is a good way to cause a window to emit early
results, before all the data has accumulated, especially in the case of a single global
The default trigger for a
PCollection is event time-based, and emits the results of
the window when the system's watermark (Dataflow's notion of when it "should" have all the
data) passes the end of the window. The default triggering configuration emits exactly once,
and late data is discarded. This is because the default windowing and trigger configuration
has an allowed lateness value of 0. See Handling Late Data for
information on modifying this behavior.
The watermark depends on the data source; in some cases, it is an estimate. In others, such as Pub/Sub with system-assigned timestamps, the watermark can provide an exact bound of what data the pipeline has processed.
Handling Late Data
If your pipeline needs to handle late data (data that arrives after the watermark passes the end of the window), you can apply an allowed lateness when setting your windowing and trigger configuration. This will give your trigger the opportunity to react to the late data; in the default triggering configuration it will emit new results immediately whenever late data arrives.
You set the allowed lateness by using
.withAllowedLateness() when setting your
window and trigger, as follows:
PCollection<String> pc = ...; pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardMinutes(30));
This allowed lateness propagates to all
PCollections derived as a result of applying
transforms to the original
PCollection. If you want to change the allowed lateness
later in your pipeline, you can apply
Setting a Trigger
When you set a windowing function for a
PCollection by using the
transform, you can also specify a trigger.
You set the trigger(s) for a
PCollection by invoking the method
.triggering() on the result of your
Window.into() transform, as
PCollection<String> pc = ...; pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .discardingFiredPanes());
The preceding code sample sets a trigger for a
PCollection; the trigger is
time-based, and emits each window one minute after the first element in that window has been
processed. The last line in the code sample,
.discardingFiredPanes(), is the
window's accumulation mode.
Window Accumulation Modes
When you specify a trigger, you must also set the window's accumulation mode. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.
To set a window to accumulate the panes that are produced when the trigger fires, invoke
.accumulatingFiredPanes() when you set the trigger. To set a window to discard fired
Let's look at an example that uses a
PCollection with fixed-time windowing and a
data-based trigger. (This is something you'd do if, for example, each window represented a
ten-minute running average, but you wanted to display the current value of the average in a UI
more frequently than every ten minutes.) We'll assume the following conditions:
PCollectionuses 10-minute fixed-time windows.
PCollectionhas a repeating trigger that fires every time 3 elements arrive.
The following diagram shows data events as they arrive in the
PCollection and are
assigned to windows*:
Note: To keep the diagram a bit simpler, we'll assume that the events all arrive in the pipeline in order.
For simplicity, let's only consider values associated with key X.
If our trigger is set to
.accumulatingFiredPanes, the trigger emits the following
values each time it fires (remember, the trigger fires every time three elements arrive):
Key X: First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
If our trigger is set to
.discardingFiredPanes, the trigger emits the following
values on each firing:
Key X: First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]
Effects of Accumulating vs. Discarding
Now, let's add a per-key calculation to our pipeline. Every time the trigger fires, the pipeline
Combine.perKey that calculates a mean average for all values associated
with each key in the window.
Again, let's just consider key X:
With the trigger set to
Key X: First trigger firing: [5, 8, 3] Average after first trigger firing: 5.3 Second trigger firing: [5, 8, 3, 15, 19, 23] Average after second trigger firing: 12.167 Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10] Average after third trigger firing: 11.667
With the trigger set to
Key X: First trigger firing: [5, 8, 3] Average after first trigger firing: 5.3 Second trigger firing: [15, 19, 23] Average after second trigger firing: 19 Third trigger firing: [9, 13, 10] Average after third trigger firing: 10.667
Note that the
Combine.perKey that computes the mean average produces different
results in each case.
In general, a trigger set to
.accumulatingFiredPanes always outputs all data in
a given window, including any elements previously triggered. A trigger set to
.discardingFiredPanes outputs incremental changes since the last time the trigger
fired. Accumulating mode is most appropriate before a grouping operation that combines or
otherwise updates the elements; otherwise, use discarding mode.
When you apply an aggregating transform such as
Combine.perKey to a
PCollection for which
you've specified a trigger, keep in mind that the
Combine.perKey produces a new output
PCollection; the trigger that you
set for the input collection does not propagate onto the new output collection.
Instead, the Dataflow SDK creates a comparable trigger for the output
based on the trigger that you specified for the input collection. The new trigger attempts to emit
elements as fast as reasonably possible, at roughly the rate specified by the original trigger on
PCollection. Dataflow determines the properties of the new trigger based
on the parameters you provided for the input trigger:
AfterWatermark's continuation trigger is identical to the original trigger by default. If the
AfterWatermarktrigger has early or late firings specified, the early or late firings of the continuation will be the continuation of the original trigger's early or late firings.
AfterProcessingTime's default continuation trigger fires after the synchronized processing time for the amalgamated elements, and does not propagate any additional delays. For example, consider a trigger such as
AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour). After a
GroupByKey, the trigger Dataflow supplies for the output collection will be synchronized to the same aligned time for each key, but will not retain the 1 hour delay.
AfterCount's default continuation trigger fires on every element. For example,
AfterCount(n)on the input collection becomes
AfterCount(1)on the output collection.
If you feel the trigger that Dataflow generates for a
PCollection output from
Combine.perKey is not sufficient, you should explicitly
set a new trigger for that collection.
In Dataflow, you can combine multiple triggers to form composite triggers. You can use Dataflow's composite triggers system to logically combine multiple triggers. You can also specify a trigger to emit results repeatedly, at most once, or under other custom conditions.
Composite Trigger Types
Dataflow includes the following composite triggers:
- You can add additional early firings or late firings to
Repeatedly.foreverspecifies a trigger that executes forever. Any time the trigger's conditions are met, it causes a window to emit results, then resets and starts over. It can be useful to combine
.orFinallyto specify a condition to cause the repeating trigger to stop.
AfterEach.inOrdercombines multiple triggers to fire in a specific sequence. Each time a trigger in the sequence emits a window, the sequence advances to the next trigger.
AfterFirsttakes multiple triggers and emits the first time any of its argument triggers is satisfied. This is equivalent to a logical OR operation for multiple triggers.
AfterAlltakes multiple triggers and emits when all of its argument triggers are satisfied. This is equivalent to a logical AND operation for multiple triggers.
orFinallycan serve as a final condition to cause any trigger to fire one final time and never fire again.
Composition with AfterWatermark.pastEndOfWindow
Some of the most useful composite triggers fire a single time when the system estimates that all the data has arrived (i.e. when the watermark passes the end of the window) combined with either, or both, of the following:
- Speculative firings that precede the watermark passing the end of the window to allow faster processing of partial results.
- Late firings that happen after the watermark passes the end of the window, to allow for handling late-arriving data
You can express this pattern using
AfterWatermark.pastEndOfWindow. For example, the
following example trigger code fires on the following conditions:
- On the system's estimate that all the data has arrived (the watermark passes the end of the window)
- Any time late data arrives, after a ten-minute delay
- After two days, we assume no more data of interest will arrive, and the trigger stops executing
.apply(Window .triggering(AfterWatermark .pastEndOfWindow() .withLateFirings(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(10)))) .withAllowedLateness(Duration.standardDays(2)));
Other Composite Triggers
You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.
Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast(100), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
The following grammar describes the various ways that you can combine triggers into composite triggers:
TRIGGER ::= ONCE_TRIGGER Repeatedly.forever(TRIGGER) TRIGGER.orFinally(ONCE_TRIGGER) AfterEach.inOrder(TRIGGER, TRIGGER, ...) ONCE_TRIGGER ::= TIME_TRIGGER WATERMARK_TRIGGER AfterPane.elementCountAtLeast(Integer) AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...) AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...) TIME_TRIGGER ::= AfterProcessingTime.pastFirstElementInPane() TIME_TRIGGER.alignedTo(Duration) TIME_TRIGGER.alignedTo(Duration, Instant) TIME_TRIGGER.plusDelayOf(Duration) TIME_TRIGGER.mappedBy(Instant -> Instant) WATERMARK_TRIGGER ::= AfterWatermark.pastEndOfWindow() WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER) WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER) Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())