How to efficiently process both real-time and aggregate data with Cloud Dataflow
Iñigo San Jose Visiers
Big Data Technical Solutions Representative, Google Cloud
Cloud Pub/Sub, Cloud Dataflow and BigQuery is a common stack that enables analytic workloads over event data streams. But when choosing the right implementation, many businesses need to consider both real-time constraints and historical analysis over the whole dataset, resulting in trade-offs. But it doesn’t have to be this way.
Imagine that we face a scenario where data can be conveniently divided into two categories: (1) actionable events that need to be delivered with stringent latency requirements, and (2) not-so-urgent data that can tolerate some delay. Should we opt for streaming inserts or go with load jobs? Is there a better solution? Spoiler alert: With a clever and simple pipeline design that combines these two worlds, we can meet all our requirements and provide significant cost savings.
Where can this be applied?
Before we continue, let’s examine some of the business use cases that can benefit from our approach.
Fraud detection—Potential fraudulent activity can be flagged immediately while all other transactions are logged to be later used to derive insights or train ML models.
Monitoring systems—Anomalies can be detected and instantly alerted while allowing for delay in data under normal conditions. Applications can range from earthquake detection to SRE dashboards.
Customer service ticketing systems—Critical issues filed by customers can be prioritized while non-critical issues (like feature requests) can be delayed without impacting the customer experience.
Online gaming health checks—By using a representative fraction of the incoming data for quick analysis, we can check that everything is in order while preserving the rest of the data for future deeper analysis or ML projects.
In three of the scenarios above, incoming data is classified as either urgent (when there is a need for low latency data) or non-urgent. But this approach can also be applied in other ways. For example, let’s say you need early speculative results (like in the online gaming health check use case described above). By sampling all incoming events, we can get an early analysis while preserving the complete data set for deeper future analysis. In other words, this approach can be easily adapted to stream a representative sample of the data while the rest is completed afterwards with load jobs.
Within our Cloud Pub/Sub, Cloud Dataflow, and BigQuery stack, Cloud Dataflow provides simple ways to connect to Cloud Pub/Sub and BigQuery via the Apache Beam for Java SDK built-in IO connectors.
In our pipeline, we will be reading the real-time events generated by a Cloud Pub/Sub topic with the PubsubIO connector. Once data has been processed, we will insert it into the BigQuery destination tables. The BigQueryIO connector provides two ways to insert our data: Load Jobs or Streaming Inserts.
With Load Jobs, elements are buffered in Cloud Storage and each batch is written to BigQuery in a single atomic update. On the other hand, with Streaming Inserts, each record will be immediately appended to the BigQuery table and available to be queried within seconds.
Choosing the right implementation
We can favor a play-it-safe design in which we stream all data directly into BigQuery. Streaming insert quotas are generous and it’s easy to be within them, but we will be paying for each inserted row, regardless of its urgency. In some of the previous examples, the fraction of high-priority events can be very low. Also, operations such as DML updates are disallowed (on a partition level) when a streaming buffer is attached to the table.
Instead, we can leverage load jobs which are free. To satisfy the real-time view of the data, we'll need to write data very frequently, which can lead us to exhaust the daily load jobs per table quota and hinder query performance, fragmenting the table into an excessive amount of files.
An interesting solution is to combine both: use streaming inserts to send urgent events right away and load jobs that contain all events. Herein we develop and (briefly) explain this design choice.
We read JSON-formatted messages from Cloud Pub/Sub with an attribute that manifests the event urgency. Events with an urgency factor equal or above the threshold will be stream-ingested into a BigQuery table using a side output. Depending on the urgency category of the event, it will be emitted to a different table. In the event that we need to query data from both tables, a simple UNION statement will suffice.
We add a timestamp field to all elements when the row is created. We can retrieve actual processing time even if two events belong to the same batch and were inserted simultaneously.
We’ll redirect each output to the corresponding table according to their tag. Changes are straightforward. Note that if we don’t specify the write method it will default to streaming inserts. For load jobs, we add Method.FILE_LOADS and the triggering frequency can be adjusted at will to better suit our use case.
In the alternative case where there is not an explicit priority field, we can modify the example to sample the data and send some immediate representative results while the rest is completed afterwards. By using a random function instead of an urgency value, we can get a desired percentage of our data for real-time analysis. There may be some cases where another sampling strategy is preferred and for that you would need to implement your own logic.
What are the benefits of this solution?
Here are some of the advantages we’ve experienced taking this approach.
Direct control on data availability—We can decide upfront which events will be streamed into our destination table.
Easier-to-accommodate quotas—Since we are splitting data into streamed rows and batched loads, we can relax both rates.
Cost expenditure—Load jobs are free of charge, so we would only pay for the important data that we choose to stream.
Avoiding duplicate work—We process elements once and send them to the corresponding side output. BigQueryIO makes the changes for each insert method trivial.
Sounds great, doesn't it?
Optimizing the pipeline further
Considering additional best practices in an upfront design phase can be the icing on the cake in terms of optimizing performance and cost:
The amount of records written to the table can be huge and can drive up the amount of scanned data in queries over time. Using partitioned tables, our queries can target only the days we want to analyze, thus reducing the analytics cost.
Another possible approach would be to have a table that hosts only high urgency events and another table that hosts all events no matter its urgency. In this case, even if it’s just a small fraction, we would be paying for extra storage. Again, we can resort to partitions and set a low TTL (partition expiration time) so that we don’t have to manually clean up the data.
Depending on the nature of our data, we can also add clustering into the equation. In this instance, we can force a better collocation of data with a daily query that overwrites the “closed” partition (we don’t expect new data to arrive for that partition) as explained in this documentation.
In this post, we explored a mixed streaming and batch approach in Cloud Dataflow to get the best performance out of our data pipeline, taking into consideration project needs and the latest BigQuery features. We considered many factors,
such as data availability requirements, code easiness, scalability and cost, and determined the optimal architecture for our use case. To learn more, about Cloud Dataflow and data analytics on Google Cloud, visit our website.
Acknowledgements: Guillem Xercavins and Alvaro Gomez, Big Data SMEs, and Berta Izquierdo, Big Data Team Lead, contributed to this post.