Dataflow commits every record to storage exactly once, which ensures that data is not dropped or duplicated. Though records might be processed multiple times, Dataflow ensures that the results of processing a record are committed to storage only once. This page explains how Dataflow implements exactly once processing while also ensuring low latency.
Exactly once processing
This section explains how Dataflow implements exactly-once processing for streaming jobs, including how Dataflow manages complexities like non-deterministic processing, late data, and custom code.
Dataflow streaming shuffle
Streaming Dataflow jobs run on many different workers in parallel
by assigning work ranges to each worker. Though assignments might change over
time in response to worker failures, autoscaling, or other events, after each
GroupByKey
transform,
all records with the same key are
processed on the same worker. The GroupByKey
transform is often used by composite
transformations, such as Count
, FileIO
, and so on. To ensure that records for a given key end up on
the same worker, Dataflow workers shuffle data between themselves
by using remote procedure calls (RPCs).
To guarantee that records aren't lost during shuffle, Dataflow uses upstream backup. With upstream backup, the worker that sends the records retries RPCs until it receives positive acknowledgment the record has been received. The side-effects of processing the record are committed to persistent storage downstream. If the worker that sends the records becomes unavailable, Dataflow continues to retry RPCs, which ensures that every record is delivered at least once.
Because these retries might create duplicates, every message is tagged with a unique ID. Each receiver stores a catalog of all IDs that have already been seen and processed. When a record is received, Dataflow looks up its ID in the catalog. If the ID is found, the record has already been received and committed, and it's dropped as a duplicate. To ensure that the record IDs are stable, every output from step to step is checkpointed to storage. As a result, if the same message is sent multiple times due to repeated RPC calls, the message is only committed to storage once. To ensure low-latency during this process, Dataflow uses Bloom filters and garbage collection.
Non-deterministic sources
Dataflow uses the Apache Beam SDK to read data into pipelines. If processing fails, Dataflow might retry reads from a source. In that situation, Dataflow needs to ensure that every unique record produced by a source is recorded exactly once. For deterministic sources, such as Pub/Sub Lite or Kafka, records are read based on a recorded offset, mitigating the need for this step.
Because Dataflow can't automatically assign record IDs, non-deterministic sources must tell Dataflow what the record IDs are in order to avoid duplication. When a source provides unique IDs for each record, the connector uses a shuffle in the pipeline to remove duplicates. Records with the same ID are filtered out. For an example of how Dataflow implements exactly-once processing when using Pub/Sub as a source, see the Efficient deduplication section in the Streaming with Pub/Sub page.
When you execute custom DoFn
s as part of your pipeline,
Dataflow does not guarantee that this code is run only once per
record. To guarantee at-least once processing in case of worker failures,
Dataflow might run a given record through a transform
multiple times, or it might run the same record simultaneously on multiple
workers. If you
include code in your pipeline that does things like contact an outside service,
the actions might be run more than once for a given record.
To make non-deterministic processing effectively deterministic, use checkpointing. When you use checkpointing, each output from a transform is checkpointed to stable storage with its unique ID before it's delivered to the next stage. Retries in the shuffle delivery of Dataflow relay the output that has been checkpointed. Though your code might run multiple times, Dataflow ensures that output from only one of those runs is stored. Dataflow uses a consistent store that prevents duplicates from being written to stable storage.
Late data
In streaming pipelines, records sometimes show up late. Apache Beam lets you configure the threshold for dropping late data and the method for triggering for late data. For more information, see Watermarks and late data in the Apache Beam documentation.
Records are assigned to windows. All records in windows that arrive in time for processing are processed exactly once. Records in windows that arrive later than the deadline are dropped.
Logging
Log output from processing indicates that the processing occurred but does not indicate whether the data was committed. Therefore, log files might indicate that data was processed multiple times even though the results of the processed data are committed to persistent storage only once. In addition, logs don't always reflect processed and committed data. Logs might be dropped due to throttling or lost due to other logging service issues.
Exactly once output delivery
The Apache Beam SDK includes built-in sinks that are designed to ensure that they do not produce duplicates. Whenever possible, use one of these built-in sinks.
If you need to write your own sink, the best approach is to make your function object idempotent so that it can be retried as often as necessary without causing unintended side effects. Nevertheless, often some component of the transform that implements the functionality of the sink is non-deterministic and might change if it's retried.
For example, in a windowed aggregation, the set of records in the window might
be non-deterministic. Specifically, the window might attempt to fire with
element e0, e1, e2. The worker might crash before committing the window
processing but not before those elements are sent as a side effect. When the
worker restarts, the window fires again, and a late element e3 arrives. Because
this element arrives before the window is committed, it's not counted as late
data, so the DoFn
is called again with elements e0, e1, e2, e3. These elements are then
sent to the side-effect operation. Idempotency does not help in this scenario, because
different logical record sets are sent each time.
To address non-determinacy in Dataflow, use the built-in Reshuffle
transform. When Dataflow shuffles data, Dataflow writes the
data durably so that any non-deterministically generated elements are stable if
operations are retried after the shuffle occurs. Using the Reshuffle
transform guarantees
that only one version of a DoFn
's output can make it past a shuffle boundary.
The following pattern ensures that the side-effect operation always receives a
deterministic record to output:
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());
To ensure that the Dataflow runner knows that elements must be
stable before executing a DoFn
, add the
RequiresStableInput
annotation to the DoFn
.
Ensuring low latency
For exactly once processing to be viable, I/O has to be reduced, in particular by preventing I/O on every record. To accomplish this goal, Dataflow uses Bloom filters and garbage collection.
Bloom filters
Bloom filters are compact data structures that allow for quick set-membership checks. In Dataflow, each worker keeps a Bloom filter of every ID it sees. When a new record ID arrives, the worker looks up the ID in the filter. If the filter returns false, then this record is not a duplicate, and the worker does not look up the ID in stable storage.
Dataflow keeps a set of rolling Bloom filters bucketed by time. When a record arrives, Dataflow picks the appropriate filter to check based on the system timestamp. This step prevents the Bloom filters from saturating as filters get garbage-collected, and also bounds the amount of data that needs to be scanned at startup.
Garbage collection
To avoid filling storage with record IDs, Dataflow uses garbage collection to remove old records. Dataflow uses the system timestamp to calculate a garbage-collection watermark.
This watermark is based on the amount of physical time spent waiting in a given stage. Therefore, it also provides information about what parts of the pipeline are slow. This metadata is the basis for the system lag metric shown in the Dataflow monitoring interface.
If a record arrives with a timestamp older than the watermark, and if IDs for this time have already been garbage collected, the record is ignored. Because the low watermark that triggers garbage collection doesn't advance until record deliveries are acknowledged, these late arriving records are duplicates.
Learn more
- Streaming with Pub/Sub
- Streaming Engine: Execution Model for Highly Scalable, Low-Latency Data Processing
- Learn more about the Apache Beam execution model
- After Lambda: Exactly once processing in Dataflow, Part 1
- After Lambda: Exactly once processing in Dataflow, Part 2 (Ensuring low latency)
- After Lambda: Exactly once processing in Dataflow, Part 3 (sources and sinks)