Dataflow supports exactly-once processing of records. This page explains how Dataflow implements exactly-once processing while also ensuring low latency.
Overview
Batch pipelines always use exactly-once processing. Streaming pipelines use exactly-once processing by default, but can also use at-least-once processing.
Exactly-once processing provides guarantees about the results of processing records, including the results from each pipeline stage. Specifically, for each record that arrives to the pipeline from a source, or arrives at a stage from a previous stage, Dataflow ensures the following:
- The record is processed and not lost.
- Any results of processing that stay within the pipeline are reflected at most once.
In other words, records are processed at least once, and the results are committed exactly once.
Exactly-once processing ensures that results are accurate, with no duplicate records in the output. Dataflow is optimized to minimize latency while maintaining exactly-once semantics. However, exactly-once processing still incurs cost to perform deduplication. For use cases that can tolerate duplicate records, you can often reduce cost and improve latency by enabling at-least-once mode. For more information about choosing between exactly-once and at-least-once streaming, see Set the pipeline streaming mode.
Late data
Exactly-once processing ensures the accuracy of the pipeline: If the pipeline processes a record, then Dataflow ensures that the record is reflected in the output, and that the record is not duplicated.
In a streaming pipeline, however, exactly-once processing cannot guarantee that
results are complete, because records might arrive late. For example,
suppose your pipeline performs an aggregation over a time window, such as
Count
. With exactly-once processing, the result is accurate for the records
that arrive within the window in a timely fashion, but late records might be
dropped.
Generally, there is no way to guarantee completeness in a streaming pipeline, because in theory records can arrive arbitrarily late. In the limiting case you would need to wait forever to produce a result. More practically, Apache Beam lets you configure the threshold for dropping late data and when to emit aggregated results. For more information, see Watermarks and late data in the Apache Beam documentation.
Side effects
Side effects are not guaranteed to have exactly-once semantics. Importantly, this includes writing output to an external store, unless the sink also implements exactly-once semantics.
Specifically, Dataflow does not guarantee that each record goes through each transform exactly one time. Due to retries or worker failures, Dataflow might send a record through a transform multiple times, or even simultaneously on multiple workers.
As part of exactly-once processing, Dataflow deduplicates the outputs. However, if the code in a transform has side effects, those effects might occur multiple times. For example, if a transform makes a remote service call, that call might be made multiple times for the same record. Side effects can even lead to data loss in some situations. For example, suppose that a transform reads a file to produce output, and then immediately deletes the file without waiting for the output to be committed. If an error occurs when committing the result, Dataflow retries the transform, but now the transform can't read the deleted file.
Logging
Log output from processing indicates that the processing occurred but does not indicate whether the data was committed. Therefore, log files might indicate that data was processed multiple times even though the results of the processed data are committed to persistent storage only once. In addition, logs don't always reflect processed and committed data. Logs might be dropped due to throttling or lost due to other logging service issues.
Exactly-once streaming
This section explains how Dataflow implements exactly-once processing for streaming jobs, including how Dataflow manages complexities like non-deterministic processing, late data, and custom code.
Dataflow streaming shuffle
Streaming Dataflow jobs run on many different workers in parallel
by assigning work ranges to each worker. Though assignments might change over
time in response to worker failures, autoscaling, or other events, after each
GroupByKey
transform,
all records with the same key are
processed on the same worker. The GroupByKey
transform is often used by composite
transformations, such as Count
, FileIO
, and so on. To ensure that records for a given key end up on
the same worker, Dataflow workers shuffle data between themselves
by using remote procedure calls (RPCs).
To guarantee that records aren't lost during shuffle, Dataflow uses upstream backup. With upstream backup, the worker that sends the records retries RPCs until it receives positive acknowledgment the record has been received. The side-effects of processing the record are committed to persistent storage downstream. If the worker that sends the records becomes unavailable, Dataflow continues to retry RPCs, which ensures that every record is delivered at least once.
Because these retries might create duplicates, every message is tagged with a unique ID. Each receiver stores a catalog of all IDs that have already been seen and processed. When a record is received, Dataflow looks up its ID in the catalog. If the ID is found, the record has already been received and committed, and it's dropped as a duplicate. To ensure that the record IDs are stable, every output from step to step is checkpointed to storage. As a result, if the same message is sent multiple times due to repeated RPC calls, the message is only committed to storage once.
Ensuring low latency
For exactly-once processing to be viable, I/O has to be reduced, in particular by preventing I/O on every record. To accomplish this goal, Dataflow uses Bloom filters and garbage collection.
Bloom filters
Bloom filters are compact data structures that allow for quick set-membership checks. In Dataflow, each worker keeps a Bloom filter of every ID it sees. When a new record ID arrives, the worker looks up the ID in the filter. If the filter returns false, then this record is not a duplicate, and the worker does not look up the ID in stable storage.
Dataflow keeps a set of rolling Bloom filters bucketed by time. When a record arrives, Dataflow picks the appropriate filter to check based on the system timestamp. This step prevents the Bloom filters from saturating as filters get garbage-collected, and also bounds the amount of data that needs to be scanned at startup.
Garbage collection
To avoid filling storage with record IDs, Dataflow uses garbage collection to remove old records. Dataflow uses the system timestamp to calculate a garbage-collection watermark.
This watermark is based on the amount of physical time spent waiting in a given stage. Therefore, it also provides information about what parts of the pipeline are slow. This metadata is the basis for the system lag metric shown in the Dataflow monitoring interface.
If a record arrives with a timestamp older than the watermark, and if IDs for this time have already been garbage collected, the record is ignored. Because the low watermark that triggers garbage collection doesn't advance until record deliveries are acknowledged, these late arriving records are duplicates.
Non-deterministic sources
Dataflow uses the Apache Beam SDK to read data into pipelines. If processing fails, Dataflow might retry reads from a source. In that situation, Dataflow needs to ensure that every unique record produced by a source is recorded exactly once. For deterministic sources, such as Pub/Sub Lite or Kafka, records are read based on a recorded offset, mitigating the need for this step.
Because Dataflow can't automatically assign record IDs, non-deterministic sources must tell Dataflow what the record IDs are in order to avoid duplication. When a source provides unique IDs for each record, the connector uses a shuffle in the pipeline to remove duplicates. Records with the same ID are filtered out. For an example of how Dataflow implements exactly-once processing when using Pub/Sub as a source, see the Exactly-once processing section in the Streaming with Pub/Sub page.
When you execute custom DoFn
s as part of your pipeline,
Dataflow does not guarantee that this code is run only once per
record. To guarantee at-least once processing in case of worker failures,
Dataflow might run a given record through a transform
multiple times, or it might run the same record simultaneously on multiple
workers. If you
include code in your pipeline that does things like contact an outside service,
the actions might be run more than once for a given record.
To make non-deterministic processing effectively deterministic, use checkpointing. When you use checkpointing, each output from a transform is checkpointed to stable storage with its unique ID before it's delivered to the next stage. Retries in the shuffle delivery of Dataflow relay the output that has been checkpointed. Though your code might run multiple times, Dataflow ensures that output from only one of those runs is stored. Dataflow uses a consistent store that prevents duplicates from being written to stable storage.
Exactly-once output delivery
The Apache Beam SDK includes built-in sinks that are designed to ensure that they do not produce duplicates. Whenever possible, use one of these built-in sinks.
If you need to write your own sink, the best approach is to make your function object idempotent so that it can be retried as often as necessary without causing unintended side effects. Nevertheless, often some component of the transform that implements the functionality of the sink is non-deterministic and might change if it's retried.
For example, in a windowed aggregation, the set of records in the window might
be non-deterministic. Specifically, the window might attempt to fire with
element e0, e1, e2. The worker might crash before committing the window
processing but not before those elements are sent as a side effect. When the
worker restarts, the window fires again, and a late element e3 arrives. Because
this element arrives before the window is committed, it's not counted as late
data, so the DoFn
is called again with elements e0, e1, e2, e3. These elements are then
sent to the side-effect operation. Idempotency does not help in this scenario, because
different logical record sets are sent each time.
To address non-determinacy in Dataflow, use the built-in Reshuffle
transform. When Dataflow shuffles data, Dataflow writes the
data durably so that any non-deterministically generated elements are stable if
operations are retried after the shuffle occurs. Using the Reshuffle
transform guarantees
that only one version of a DoFn
's output can make it past a shuffle boundary.
The following pattern ensures that the side-effect operation always receives a
deterministic record to output:
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());
To ensure that the Dataflow runner knows that elements must be
stable before executing a DoFn
, add the
RequiresStableInput
annotation to the DoFn
.
Learn more
- Set the pipeline streaming mode
- Streaming with Pub/Sub
- Streaming Engine: Execution Model for Highly Scalable, Low-Latency Data Processing
- Learn more about the Apache Beam execution model
- After Lambda: Exactly-once processing in Dataflow, Part 1
- After Lambda: Exactly-once processing in Dataflow, Part 2 (Ensuring low latency)
- After Lambda: Exactly-once processing in Dataflow, Part 3 (sources and sinks)