In a previous post we compared Google Cloud Dataflow’s programming model, now becoming Apache Beam (incubating), to Apache Spark’s. Today we introduce one of the advantages of running Beam pipelines on the Cloud Dataflow service: autoscaling.
Our previous experience with legacy systems at Google, like MapReduce, and with existing systems such as Spark and Hadoop shows that users spend a lot of time tuning their jobs. This includes having to guess the number of workers to use for their job/cluster. Moreover, a single configuration is often not the right answer, as the resource needs dynamically change over the lifetime of the job.
The autoscaling capability of Google Cloud Dataflow offers simple configuration (no specifying worker counts) and reduced cost (optimized worker count over time), using algorithms developed based on our experience running similar systems internally at Google.
Cloud Dataflow is the only system we are aware of where autoscaling is automatic and integrated with data-processing-specific signals.
The need for dynamic scaling
Provisioning workloads involves selecting the right number of workers (traditionally selected by the user) and data partitions (traditionally calculated based on worker count or data size). Selecting a fixed set of workers means that at any given time, a job is likely over or under provisioned. Even if you do get it right for a moment in time, changes in input — either through streaming or the next batch dataset — may render it out of tune.
With unbounded sources traditionally associated with streaming, the concern is mostly responding to variations in input rate. Because automation in this space is usually primitive to non-existent, users must often oscillate between conflicting needs: over-provision but pay more or under-provision but risk degradation of correctness, latency and throughput in their data pipelines.
Fig 1. Fixed-size provisioning
While pipelines with bounded sources (i.e., batch) don’t need to deal with real-time input rate variability, they're just as tricky. Figuring out the number of workers for a batch pipeline has traditionally been a guessing game. "Do I need 100 workers, or do 10 suffice to finish the job in a reasonable amount of time?" Moreover, bounded data is best processed in stages that each have scale limitations (e.g., map then reduce, etc.). The properties of one transformation may be more parallelizable than another, may have different data sizes and one may be I/O bound while another is CPU bound. Also, the same conceptual pipeline may be used to process datasets of varying input sizes (e.g., daily, monthly, yearly job), each with different tuning sweet spots. Using a fixed number of workers means that at any given stage, you likely have either excess resources (and increased cost) or your job is slower than it could be.
In addition to configuring the number of workers, another related problem is configuring the number of partitions for datasets, including intermediate datasets. Use too many and the per-partition overhead will dominate. Use too few, and the system won’t be able to utilize all available workers. Also, if the number of partitions is fixed (like it is in traditional systems, but not in Cloud Dataflow), the user has to either specify it, or the system has to guess the “right” number based on the cluster size, data size or profiles from previous runs of the same pipeline. We found that such guesses are often inaccurate, forcing users to specify manual overrides.
Both Cloud Dataflow and Spark have the means to automatically provision resources for a job, but they behave very differently. With Spark, users deploy a cluster and then also deploy a job on the cluster. This creates two boundaries for users to scale and makes autoscaling problematic, as we’ll see later. Cloud Dataflow, on the other hand, only deploys jobs; you can think of Google Cloud Platform as your autoscaled cluster if you like.
The workers for your job are deployed just in time to execute a job and then torn down when not needed anymore.
To automatically add workers to a job, Spark offers Dynamic Resource Allocation, which allocates available workers in a cluster to a job (see Figure 2). As an aside, this applies mostly to YARN as Mesos has similar functionality built-in. This feature is fine for a fixed set of machines — like an on-premise installation — but in the cloud scaling within a job, doesn’t reduce cost. To impact cost, the cluster needs to resize outside of Spark. Users do this themselves through manually resizing or scripting. A cloud provider’s autoscaling service could also add or remove workers, but it would be oblivious to domain-specific constraints such as limited data parallelism, or whether various stages are CPU-bound or IO-bound.
Upscaling the cluster allows more or bigger jobs to run, although it may not help existing jobs unless enough tasks/partitions were already specified, as Spark doesn’t have a way to dynamically change the number of tasks for the given stage. This means that even if we had a way to autoscale a Spark cluster, the jobs running in it wouldn’t necessarily benefit.
Cloud Dataflow, on the other hand, was designed to run in cloud, with autoscaling in mind. Cloud Dataflow autoscaling is the next generation of resource management algorithms derived from Flume and MapReduce (see also the Lessons learned section in our sorting blog post). In batch jobs, autoscaling not only dynamically adjusts the number of workers for a given job, it also adjusts the number of tasks to keep the workers busy.
In summary, with Spark, users have to configure, coordinate and monitor two limited scaling interfaces; with Cloud Dataflow there's a single interface, and the service is able to intelligently scale it for you.
Autoscaling relies on several signals to make decisions. Most of these assess how busy and/or behind workers are, including CPU utilization, throughput and the amount of work remaining (or backlog). Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. For example, a workload with regular variation would resize several times as work increased (or decreased) and then maintain size through a peak or trough before repeating in the opposite direction (see Figure 3).
Leaderboard pipeline (streaming)
With that context, let’s look at a few basic examples. The Leaderboard pipeline from our programming model comparison post is a streaming pipeline in the mobile gaming domain that given the stream of game scores, continuously calculates per-hour per-team scores and also per-user score totals over all time. The ingestion rate of a pipeline could vary on any number of factors, but there are often two strong ones, particularly in gaming:
Macro lifecycle: Is your game increasing or decreasing in popularity? This can be very dramatic near a game’s launch.
- Daily fluctuation: Users are more active during the day than at night
In our example we demonstrate the daily fluctuation scenario with a difference of 4x between the low and high activity.
We've built an injector that generates streaming data for this pipeline. We took the daily load shape of a real Cloud Dataflow streaming pipeline from production, and replayed it with our injector. The rate of messages (game events) varies from about 7,000 to about 35,000 per second, (see figure 4).
We can see that as the rate of the events increases, Cloud Dataflow upscales from the initial 3 workers to 4, 16 and eventually 32 workers to handle the peak load. As the rate of events decreases, Cloud Dataflow gradually downsizes the number of workers back down to 3. This translates to significant cost savings compared with always running this pipeline provisioned to handle the peak load.
User Scores pipeline (batch)
Let’s also take a look at a simple batch example. User Scores, also featured in the model comparison post, is a pipeline calculating per-user scores over bounded set of gaming events. Here we show how Cloud Dataflow automatically chooses a different number of workers for the same pipeline based on the scenario.
We use datasets of two sizes as an example:
small (about 22GiB): gs://dataflow-samples/game/gaming_data*.csv
- large (about 1.3TiB): gs://dataflow-samples/game/large/batch*.csv
For the small dataset, Cloud Dataflow starts with 3 workers, but as it learns about the size and throughput of the job, it upscales first to 5 then to 12 workers (note that the number of workers that Cloud Dataflow chooses depends not just on the size of the data, but also the processing speed per worker):
Under the hood, Cloud Dataflow is dynamically creating new tasks/partitions as it upscales the job to keep the workers busy. It doesn't have to pre-create many tasks initially. Traditional systems, like Hadoop MapReduce, pre-create tasks based on the data size (e.g., one task for every 64MB of data). This may have worked in this case, but the mechanism breaks down when the input is small and the records are expensive to process. Cloud Dataflow handles such scenarios robustly.
The job finishes in just under 5 minutes, utilizing about 30 VM-minutes in total. The large dataset is almost 60x larger than the small dataset. Here’s how the monitoring messages look for the large job:
Indeed, for this larger dataset, Cloud Dataflow chooses a lot more workers (294) than it did for the small dataset. Still, the job finishes in about 8 minutes utilizing 1139 VM-minutes (i.e., about 40x the cost of the small dataset). As expected, the larger dataset is an order of magnitude more expensive — because of the drastic difference in input size — but, due to autoscaling, doesn’t take that much longer to finish.
These were very simple examples of what autoscaling can do. We'll show more elaborate examples in future posts.
We introduced Cloud Dataflow’s autoscaling capabilities and described how they differ from other similar systems like Spark and Hadoop. We showed how Cloud Dataflow users no longer have to worry about specifying the number of workers or partitions, and how Cloud Dataflow dynamically adjusts the number of workers over time.
The response from users has been very exciting. For example, Alex Harvey, Chief Architect at SwiftIQ told us "We love not having to guess at the number of resources we’re going to need. We’d rather focus our time on the algorithms and let Dataflow perform optimized resource scaling for us." Riju Kallivalappil, software engineer at Nest, said similarly, “I'm a fan of Dataflow's autoscaling. It's great that I don't have to worry about trying to guess the number of mappers/reducers/partitions etc. required to run a job."
We're soon going to enable autoscaling by default for all batch jobs. For now, you can enable it by specifying
--autoscalingAlgorithm=THROUGHPUT_BASED on the command line of your batch pipeline. Streaming autoscaling is currently available by invitation only as an Early Access Program.
We monitor Cloud Dataflow autoscaling behavior in production and continue to evolve autoscaling algorithms and heuristics based on gathered quality metrics on signal stability, work prediction accuracy and autoscaling decisions. We're also planning to add job classes through which you can bias the default autoscaling heuristics and express whether you'd prefer your job to be executed in a cost-effective manner or run as fast as possible. Stay tuned :-)
Posted by Eric Anderson, Product Manager and Marian Dvorsky, Software Engineer