Google Cloud Platform

After Lambda: Exactly-once processing in Cloud Dataflow, Part 3 (sources and sinks)

The series concludes with a description of how exactly-once processing in Cloud Dataflow is supported by sources and sinks.

In the previous post in this series, we expanded on how Cloud Dataflow’s streaming shuffle supports exactly-once deliveries. We explained how a variety of clever optimizations allow Cloud Dataflow to efficiently filter duplicates out of shuffle deliveries. In this final installment, we’ll explore the remaining two pieces of the puzzle: sources and sinks.

Exactly once in sources

Apache Beam, the unified programming model for creating streaming- or batch-processing data pipelines, provides a Source API for reading data into a Cloud Dataflow pipeline. (At the time of this writing, the Beam community is considering building an even better API.) Cloud Dataflow might retry reads from a source if processing fails, and needs to ensure that every unique record produced by a source is processed exactly once.

For most sources Cloud Dataflow handles this process transparently; such sources are deterministic. For example, consider a source that reads data out of files. The records in a file will always be in a deterministic order and at deterministic byte locations, no matter how many times the file is read. The filename and byte location uniquely identify each record, so the service can automatically generate unique ids for each record. Another source that provides similar determinism guarantees is Apache Kafka; each Kafka topic is divided into partitions, and records in a partition always have a deterministic order. Such deterministic sources will work seamlessly in Cloud Dataflow with no duplicates.

However, not all sources are so simple. For example, one common source for Cloud Dataflow pipelines is Google Cloud Pub/Sub. Cloud Pub/Sub is a non-deterministic source: multiple subscribers can pull from a Cloud Pub/Sub topic but which subscribers receive a given message is unpredictable. If processing fails, Cloud Pub/Sub will redeliver messages but the messages might be delivered to different workers than those that processed them originally and in a different order. This non-deterministic behavior means that Cloud Dataflow needs assistance for detecting duplicates because there's no way for the service to deterministically assign record ids that will be stable upon retry. (We’ll dive into a more detailed use case involving Cloud Pub/Sub later.)

Since Cloud Dataflow cannot automatically assign record ids, non-deterministic sources are required to tell it what the record ids should be. Beam’s Source API provides the UnboundedReader.getCurrentRecordId method. If a source provides unique ids per record and tells Cloud Dataflow that it requires deduplication (Using the requiresDedupping override), then records with the same id will be filtered out.

Exactly once in sinks

At some point, every pipeline needs to output data to the outside world, and a sink is simply a transform that does exactly that. Keep in mind that delivering data externally is a side effect, and we've already mentioned that Cloud Dataflow does not guarantee exactly-once application of side effects. So, how can a sink guarantee that outputs are delivered exactly once?

The simplest answer is that a number of built-in sinks are provided as part of the Beam SDK. These sinks are carefully architected to ensure that they do not produce duplicates, even if executed multiple times. Whenever possible, pipeline authors are encouraged to use one of these built-in sinks.

However, sometimes the built-ins are insufficient and you need to write your own. The best approach is to ensure that your side-effect operation is idempotent and therefore robust in the face of replay. However, often some component of a side-effect DoFn is non-deterministic and thus might change on replay. For example, in a windowed aggregation, the set of records in the window can also be non-deterministic!

Specifically, the window might attempt to fire with element e0, e1, e2, but the worker crashes before committing the window processing (but not before those elements are sent as a side effect). When the worker restarts the window will fire again, but now a late element e3 shows up. Since this element shows up before the window is committed, it’s not counted as late data, so the DoFn is called again with elements e0, e1, e2, e3. These are then sent to the side-effect operation. Idempotency does not help here, as different logical record sets were sent each time.

There are other ways non-determinism can be introduced. The standard way to address this risk is to rely on the fact that Cloud Dataflow currently guarantees that only one version of a DoFn's output can make it past a shuffle boundary. (Note that these determinism boundaries may become more explicit in the Beam model at some point. Other Beam runners vary in their ability to handle non-deterministic user code.)

A simple way of using this guarantee is via the built-in Reshuffle transform. The following pattern ensures that the side-effect operation always receives a deterministic record to output.

  c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Use cases

To illustrate, let’s examine some of Cloud Dataflow’s built-in sources and sinks to see how they implement the above patterns.

 Example source: Cloud Pub/Sub

Cloud Pub/Sub is a fully-managed, scalable, reliable and low-latency system for delivering messages from publishers to subscribers. Publishers publish data on named topics, subscribers create named subscriptions to pull data from these topics. Multiple subscriptions can be created for a single topic, in which case each subscription receives a full copy of all data published on the topic from the time of the subscription’s creation.

Cloud Pub/Sub guarantees that records will continue to deliver until they're acknowledged; however a record might be delivered multiple times. Cloud Pub/Sub is intended for distributed use, so many publishing processes can publish to the same topic and many subscribing processes can pull from the same subscription. Once a record has been pulled, the subscriber must acknowledge it within a certain amount of time or that pull expires and Cloud Pub/Sub will redeliver that record to one of the subscribing processes.

While these characteristics make Cloud Pub/Sub highly scalable, they also make it a challenging source for a system like Cloud Dataflow. It’s impossible to know which record will be delivered to which worker, and in which order. What’s more, in the case of failure, redelivery might send the records to different workers in different orders!

Cloud Pub/Sub provides a stable message id with each message, and this id will be the same upon redelivery. The Cloud Dataflow Pub/Sub source will default to using this id for removing duplicates from Cloud Pub/Sub. (The records are shuffled based on a hash of the id, so that redeliveries are always processed on the same worker.) In some cases however, this is not quite enough. The user’s publishing process might retry publishes, and as a result introduce duplicates into Cloud Pub/Sub. From that service’s perspective these are unique records, so will get unique record ids. Cloud Dataflow’s Pub/Sub source allows the user to provide their own record ids as a custom attribute. As long as the publisher sends the same id when retrying, Cloud Dataflow will be able to detect these duplicates.

Beam (and therefore Cloud Dataflow) provides a reference source implementation for Cloud Pub/Sub. However, keep in mind that this is not what Cloud Dataflow uses but rather an implementation used only by non-Cloud Dataflow runners (such as Apache Spark, Apache Flink, and the DirectRunner). For a variety of reasons, Cloud Dataflow handles Cloud Pub/Sub internally and does not use the public Cloud Pub/Sub source.

Example sink: Files

Beam’s file sinks (TextIO, AvroIO, and any other sink that implements FileBasedSink) can be used by the streaming runner to continuously output records to files. An example use case is:

  c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 apply(TextIO.writeStrings().to(new MyNamePolicy()).withNumShards(10));

The above snippet will write 10 new files each minute, containing data from that window. MyNamePolicy is a user-written function that determines output filenames based on the shard and the window. Triggers can also be used, in which case each trigger pane will be output as a new file.

This process is implemented using a variant on the above pattern. Files are written out to temporary locations, and these temporary filenames are sent to a subsequent transform through a GroupByKey. After the GroupByKey is a finalize transform, that atomically moves the temporary files into their final location. The following pseudocode provides a sketch of how a consistent streaming file sink is implemented in Beam. (For more details, see FileBasedSink and WriteFiles in the Beam codebase.)

  c
  // Tag each record with a random shard id.
  .apply(“AttachShard”, WithKeys.of(new RandomShardingKey(getNumShards())))
  // Group all records with the same shard.
  .apply(“GroupByShard”, GroupByKey.<..>())
  // For each window, write per-shard elements to a temporary file. This is the 
  // non-deterministic side effect. If this DoFn is executed multiple times, it will
  // simply write multiple temporary files; only one of these will pass on through to
  // the Finalize stage.
  .apply(“WriteTempFile”, ParDo.of(new DoFn<..> {
    @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
       // Write the contents of c.element() to a temporary file.
       // User-provided name policy used to generate a final filename.
      c.output(new FileResult()).
    }
  }))
  // Group the list of files onto a singleton key.
  .apply(“AttachSingletonKey”, WithKeys.<..>of((Void)null))
  .apply("FinalizeGroupByKey", GroupByKey.<..>create())
  // Finalize the files by atomically renaming them. This operation is idempotent. Once this
  // DoFn has executed once for a given FileResult, the temporary file is gone, so any further 
  // executions will have no effect. 
  .apply(“Finalize”, ParDo.of(new DoFn<..>, Void> {
    @ProcessElement
     public void processElement(ProcessContext c)  {
       for (FileResult result : c.element()) { 
         rename(result.getTemporaryFileName(), result.getFinalFilename());
       }
}}));

Example sink: Google BigQuery

BigQuery is a fully-managed, cloud-native data warehouse on GCP. Beam provides a BigQuery sink, and BigQuery provides a streaming insert API that supports extremely low-latency inserts. This streaming insert API allows inserts to be tagged with a unique identifier, and BigQuery will attempt to filter duplicate inserts with the same id. (See the Google BigQuery docs for limitations in BigQuery’s deduplication.) To use this capability, the BigQuery sink must generate statistically-unique ids for each record. It does this using the java.util.UUID package, which generates statistically-unique 128-bit ids.

Generating a random UUID is a non-deterministic operation, so we must add a reshuffle before we insert into BigQuery. Once that is done, any retries by Cloud Dataflow will always use the same UUID that was shuffled. Duplicate attempts to insert into BigQuery will always have the same insert id, so BigQuery is able to filter them. The following pseudocode illustrates how the BigQuery sink is implemented.

  // Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

Next steps

We hope this series has been educational about the meaning and importance of exactly-once guarantees and how Cloud Dataflow implements them for customers. If you’re not already a Cloud Dataflow user, consider doing our free trial.