Jump to Content
Google Cloud

How Qubit deduplicates streaming data at scale with Google Cloud Platform

June 26, 2017
Jibran Saithi

Lead Architect, Qubit

At Qubit, we enable over 400 million personalized experiences per month, and to power those experiences we process data from websites, mobile apps and other customer touchpoints. Managing and processing this data in near-real time is a significant challenge; various stages of the pipeline process events at over 500K messages per second, all of which need to be validated, enriched and routed with an overall latency of a few seconds end to end.

Since Qubit retrieves data from multiple client environments (mobile, server-to-server, web), we end up being exposed quite dramatically to the challenges of real-world data collection. One of these, in particular, is duplicated data.

Duplicates may occur for numerous reasons. One of the most common is client-side retries, which can occur when mobile or web clients re-send events because they cannot confirm that the previous request succeeded. Exactly-once message delivery — hard in the best of times — is even harder on the open internet. As a consequence, in a number of cases (and particularly over flaky internet conditions), data records may end up being received by our data processing pipeline multiple times.

Note that this problem differs from the exactly-once processing challenge in that even if you process each duplicate message exactly once, you still have a problem. Most real-time data pipelines are —​ very reductively —​ counting things, so when messages are duplicated, these counts go awry. For example, a sum of total amount of products purchased would be incorrect if computed naively across duplicated event data because of double (or even multiple!) counting. This presents a problem: duplicated events may throw off calculations and make inferences on the data inaccurate, or possibly even completely wrong.

Toward scoped deduplication

Our first approach to this problem was to implement a global service: messages would be de-duplicated upon ingest to ensure that client side retries and other such global duplicates are filtered out. As we thought more about the problem however, we realized that if we were to allow what we call scoped deduplication (deduplication specific to each application), we could also give application developers an easy solution for the related challenge of safely replaying messages, for example, for disaster recovery.

Our ideas rapidly crystallized into the following goals:

  • Provide a single, performant duplicate tagger that detects messages sent multiple times to our APIs
  • Make it easy for developers to add granular, application-specific deduplication to their own systems (for when duplicates may be generated for reasons other than multiple sends from clients)
One efficient solution for deduplication generally is to use probabilistic data structures (such as Bloom filters or Cuckoo filters) to record events. These data structures can then be queried to determine if events have been seen before, with very minimal memory requirements. Unfortunately, for our purposes we required deduplication to be exact, not approximate — not least because a false positive will mean we would ignore valid data — so instead we explored how to store full message ids and efficiently make them available for lookups.

Choosing a storage system

An initial iteration of the dedupe system was written to use Redis. However, with the substantially increased retention requirement and increase in dataset size that scoped ids require, deploying and managing a Redis instance at this scale would have been too expensive in time and resources.

As we're happy users of Google Cloud Bigtable, it was a natural choice for storing a large dataset that requires low-latency access even under relatively large, spiky data throughput requirements and high-availability demands.

Designing a schema

A simple and obvious Cloud Bigtable schema — and one that we evaluated first — is:

Row KeyColumn Family:Column QualifierColumn Value
hash(message id)“data”:application identifieroptional application metadata

This is an efficient representation for the data: checkpoint cells are created as and when records are actually processed by applications. The hashing of the id will ensure that keys are well distributed across tablet servers, which helps avoid hotspots regardless of the nature of the original id. Additionally, often engineers like to store small amounts of metadata —​ such as the time when the record was processed, for instance —​ and we allow this metadata to be stored alongside the checkpoint data.

Since the row tends to grow horizontally, i.e., we add a cell when a new application checkpoint arrives, with a new cell created per checkpoint, this is what we call a fat table. In such tables, there are only as many rows as there are ids, but a large number of cells (in this case, across a single column family). However, after many years of working with Apache HBase (which is an open source implementation of Bigtable), we learned that skinny tables, which are taller than they are wide, are usually much easier to manage in a high-performance way as new use cases emerge for tables.

So, a skinny transposition of the schema looks something like this:

Row KeyColumn Family:Column QualifierColumn Value
hash(application identifier) + hash(message id)“data”:”metadata”Optional application metadata

We generate a fixed length rowkey — something we tend to try to do with most of our schemas — which is a concatenation of the hash of message id with a hash of the application identifier (typically its name). By using hashes of the ids and the app name, we get a fixed-length rowkey that allows efficient scanning of sub-ranges. In this case, we have at least as many rows as there are checkpoints, in contrast to the earlier schema where the cardinality of ids informed the upper bound on the number of rows.

An immediately obvious drawback is that this schema requires more storage, as each rowkey also stores a hash of the application name. This isn’t a total loss, though, for a couple reasons.

First of all, the identifier is a common and repeated key prefix across millions or billions of rows, which allows for efficient compression. More important, it's more than compensated by the fact that these rowkeys allow efficient range queries, particularly on a per-application basis. In practice, this identifier also often includes an internal client-tracking code, so that we generate a unique hash for each application and client combination.

These ranged scans are useful because they allow bounded scans (i.e., without a full table scan) to efficiently read all the previous ids an application has already processed. This approach is useful for large batch jobs — in disaster-recovery use cases, for example — that can download the full list of processed message ids instead of having to make individual point lookups.

Application users are also allowed to supply custom metadata (such as timestamps) that can be stored alongside the entry to record when, for example, the record was processed.

Optimizing checkpointing

A common use case of the system is that application authors will, upon receiving a message, check if the message is a duplicate, and if not, process the message (typically executing a different code path if the message is indeed a duplicate).

A naive get followed by a put to insert the checkpoint is both relatively slow and error prone: there's an obvious race condition if the state is checked concurrently. Fortunately, Cloud Bigtable supports conditional mutations (changes are applied if a condition is met and rejected otherwise). We use this feature to checkpoint only if a checkpoint cell has not previously been created, rejecting the checkpoint otherwise — all done atomically. Users can now be informed if the message has been previously seen, and if not, they can be sure only a single worker is allowed to checkpoint.

Data processing with Apache Beam and Google Cloud Dataflow

When writing stream-processing pipelines, our standard framework of choice is Apache Beam, the programming model for building streaming (or batch) data processing pipelines. We're big fans of Beam’s data watermarks, which let programmers reason about, and deal with (amongst other things) late-arriving data. In addition, running our pipelines on Google Cloud Dataflow lets us focus on programming without having to worry about deploying and maintaining instances running our code (a hallmark of Google Cloud Platform overall).

While we bundle our dedupe system as a library (to make it easy to integrate into existing code in multiple environments), the primary dedupe via Cloud Dataflow is a trivial Beam DoFn.

(Protip: use AbstractCloudBigtableTableDoFn to help take care of the Cloud Bigtable connection lifecycle.)

Looking ahead, as state-management APIs in Beam are formalized, we intend to optimize this process further by detecting duplicated messages in our core flows via state held in the Cloud Dataflow service.

Next steps

In a relatively small amount of work (hiding a lot of complexity managed for us by GCP), we were able to deploy a system that lets us dedupe messages at scale, with no self-managed components, and that easily scales with traffic. The data this system produces as a side effect is useful to analyze because it often points to fundamental changes in systems that are usually worth investigating.

In a future post, we’ll describe the analytic systems we built at Qubit to analyze this data at scale.

Posted in