Read from Pub/Sub to Dataflow

This page describes best practices for reading from Pub/Sub in Dataflow.

Apache Beam provides a reference implementation of the Pub/Sub I/O connector for use by non-Dataflow runners. However, the Dataflow runner uses its own custom implementation of the connector. This implementation takes advantage of Google Cloud-internal APIs and services to offer low-latency watermarks, high watermark accuracy, and efficient deduplication for exactly-once message processing. The connector is available for Java, Python, and Go.

Exactly-once processing

Pub/Sub decouples event publishers from event consumers. The application publishes messages to a topic, and Pub/Sub asynchronously delivers the messages to subscribers.

Pub/Sub assigns a unique message ID to each message that is successfully published to a topic. By default, Pub/Sub performs at-least-once message delivery. To achieve at-least-once semantics, Pub/Sub retries delivery if it doesn't receive acknowledgement from the subscriber within the acknowledgement deadline. Retries can result in a message being delivered more than once. For example, re-delivery can occur if the subscriber acknowledges after the deadline, or if the acknowledgement is lost due to transient network issues.

If you run your Dataflow pipeline using exactly-once streaming mode, Dataflow deduplicates messages to achieve exactly-once semantics. If your pipeline can tolerate some duplicate records, then consider using at-least-once streaming mode instead. This mode can significantly lower latency and the total cost of your pipeline. The tradeoff is that some messages might be processed twice. For more information, see Choose which streaming mode to use.

Deduplicate by message attribute

By default, Dataflow deduplicates based on message ID. However, an application might send the same record twice as two distinct Pub/Sub messages. For example, the original source data might contain duplicate records, or the application might incorrectly publish the same message twice. The latter can happen due to retries, if acknowledgment was dropped due to network issues or other interruptions. In these situations, the duplicate messages have different message IDs.

Depending on your scenario, your data might contain a unique field that can be used to deduplicate. For example, records might contain a unique transaction ID. You can configure the Pub/Sub I/O connector to deduplicate messages based on the value of a message attribute, instead of using the Pub/Sub message ID. As long as the publisher sets this attribute consistently during retries, then Dataflow can detect the duplicates. Messages must be published to Pub/Sub within 10 minutes of each other for deduplication.

For more information about using ID attributes, see the following SDK reference topics:

Subscriptions

When you configure your pipeline, you specify either a Pub/Sub topic or a Pub/Sub subscription to read from. If you specify a subscription, don't use the same Pub/Sub subscription for multiple pipelines. If two pipelines read from a single subscription, each pipeline receives part of the data in a nondeterministic manner, which might cause duplicate messages, watermark lag, and inefficient autoscaling. Instead, create a separate subscription for each pipeline.

If you specify a topic, the connector creates a new temporary subscription. This subscription is unique per pipeline.

Timestamps and watermarks

All Pub/Sub messages have a timestamp, which represents the time when Pub/Sub receives the message. Your data might also have an event timestamp, which is the time when the record was generated by the source.

You can configure the connector to read the event timestamp from an attribute on the Pub/Sub message. In that case, the connector uses the event timestamp for watermarking. Otherwise, by default it uses the Pub/Sub message timestamp.

For more information about using event timestamps, see the following SDK reference topics:

The Pub/Sub connector has access to Pub/Sub's private API that provides the age of the oldest unacknowledged message in a subscription. This API provides lower latency than is available in Cloud Monitoring. It enables Dataflow to advance pipeline watermarks and emit windowed computation results with low latencies.

If you configure the connector to use event timestamps, then Dataflow creates a second Pub/Sub subscription. It uses this subscription to inspect the event times of messages that are still in the backlog. This approach allows Dataflow to estimate the event-time backlog accurately. For more information, see the StackOverflow page that covers how Dataflow computes Pub/Sub watermarks.

Pub/Sub Seek

Pub/Sub Seek lets users replay previously acknowledged messages. You can use Pub/Sub Seek with Dataflow to reprocess messages in a pipeline.

However, it's not recommended to use Pub/Sub Seek in a running pipeline. Seeking backwards in a running pipeline can lead to duplicate messages or messages being dropped. It also invalidates Dataflow's watermark logic and conflicts with the state of a pipeline that incorporates processed data.

To reprocess messages by using Pub/Sub Seek, the following workflow is recommended:

  1. Create a snapshot of the subscription.
  2. Create a new subscription for the Pub/Sub topic. The new subscription inherits the snapshot.
  3. Drain or cancel the current Dataflow job.
  4. Resubmit the pipeline using the new subscription.

For more information, see Message reprocessing with Pub/Sub Snapshot and Seek.

Unsupported Pub/Sub features

The following Pub/Sub features aren't supported in the Dataflow runner's implementation of the Pub/Sub I/O connector.

Exponential backoff

When you create a Pub/Sub subscription, you can configure it to use an exponential backoff retry policy. However, exponential backoff does not work with Dataflow. Instead, create the subscription with the Retry immediately retry policy.

Exponential backoff is triggered by a negative acknowledgment or when the acknowledgment deadline expires. However, Dataflow doesn't send negative acknowledgements when pipeline code fails. Instead, it retries message processing indefinitely, while continually extending the acknowledgment deadline for the message.

Dead-letter topics

Don't use Pub/Sub dead-letter topics with Dataflow, for the following reasons:

  • Dataflow sends negative acknowledgments for various internal reasons (for example, if a worker is shutting down). As a result, messages might be delivered to the dead-letter topic even when no failures occur in the pipeline code.

  • Dataflow might acknowledge messages before the pipeline fully processes the data. Specifically, Dataflow acknowledges messages after they are successfully processed by the first fused stage and side effects of that processing have been written to persistent storage. If the pipeline has multiple fused stages and failures occur at any point after the first stage, the messages are already acknowledged and don't go to the dead-letter topic.

Instead, implement the dead-letter pattern explicitly in the pipeline. Some I/O sinks have built-in support for dead-letter queues. The following examples implement dead-letter patterns;

Pub/Sub exactly-once delivery

Because Dataflow has its own mechanisms for exactly-once processing, it's not recommended to use Pub/Sub exactly-once delivery with Dataflow. Enabling Pub/Sub exactly-once delivery reduces pipeline performance, because it limits the number of messages that are available for parallel processing.

Pub/Sub message ordering

Message ordering is a feature in Pub/Sub that lets a subscriber receive messages in the order they were published.

It's not recommended to use message ordering with Dataflow, for the following reasons:

  • The Pub/Sub I/O connector might not preserve message ordering.
  • Apache Beam doesn't define strict guidelines regarding the order in which elements are processed. Therefore, ordering might not be preserved in downstream transforms.
  • Using Pub/Sub message ordering with Dataflow can increase latency and decrease performance.

What's next