Horizontal Autoscaling

With Horizontal Autoscaling enabled, the Dataflow service automatically chooses the appropriate number of worker instances required to run your job. The Dataflow service might also dynamically re-allocate more workers or fewer workers during runtime to account for the characteristics of your job. Certain parts of your pipeline might be computationally heavier than others, and the Dataflow service might automatically spin up additional workers during these phases of your job and shut them down when they're no longer needed.

Configure autoscaling

Use the following options to configure autoscaling.

Java

Horizontal Autoscaling is enabled by default on the streaming jobs that use Streaming Engine and all batch jobs. You can disable Horizontal Autoscaling by specifying the flag --autoscalingAlgorithm=NONE when you run your pipeline. When you disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on the --numWorkers option, which defaults to 3.

With Horizontal Autoscaling enabled, the Dataflow service does not allow user control of the exact number of worker instances allocated to your job. You might still cap the number of workers by specifying the --maxNumWorkers option when you run your pipeline.

For batch jobs, the --maxNumWorkers flag is optional. The default is 1000. For streaming jobs using Streaming Engine, the --maxNumWorkers flag is optional. The default is 100. For streaming jobs not using Streaming Engine, the --maxNumWorkers flag is required.

Python

Horizontal Autoscaling is enabled by default on the streaming jobs that use Streaming Engine and all batch Dataflow jobs created using the Apache Beam SDK for Python version 0.5.1 or later. You can disable Horizontal Autoscaling by explicitly specifying the flag --autoscaling_algorithm=NONE when you run your pipeline. When you disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on the --numWorkers option, which defaults to 3.

With Horizontal Autoscaling enabled, the Dataflow service doesn't let you control the exact number of worker instances allocated to your job. You might still cap the number of workers by specifying the --max_num_workers option when you run your pipeline.

For batch jobs, the --max_num_workers flag is optional. The default is 1000. For streaming jobs using Streaming Engine, the --max_num_workers flag is optional. The default is 100. For streaming jobs not using Streaming Engine, the --max_num_workers flag is required.

Go

Horizontal Autoscaling is enabled by default on the streaming jobs that use Streaming Engine and all batch Dataflow jobs created using the Apache Beam Go SDK. You can disable Horizontal Autoscaling by explicitly specifying the flag ‑‑autoscaling_algorithm=NONE when you run your pipeline. When you disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on the --numWorkers option, which defaults to 3.

With Horizontal Autoscaling enabled, the Dataflow service doesn't let you control the exact number of worker instances allocated to your job. You might still cap the number of workers by specifying the ‑‑max_num_workers option when you run your pipeline.

For batch jobs, the ‑‑max_num_workers flag is optional. The default is 1000. For streaming jobs using Streaming Engine, the ‑‑max_num_workers flag is optional. The default is 100. For streaming jobs not using Streaming Engine, the ‑‑max_num_workers flag is required.

Dataflow scales based on the parallelism of a pipeline. The parallelism of a pipeline is an estimate of the number of threads needed to most efficiently process data at any given time.

Batch autoscaling

For batch pipelines, Dataflow automatically chooses the number of workers based on the estimated total amount of work in each stage of your pipeline, which is dependent on both the input size and the current throughput. Dataflow re-evaluates the amount of work according to the progress of the execution every 30 seconds and dynamically scales up or down the number of workers as the estimated total amount of work increases or decreases.

If any of the following conditions occur, to save idle resources, Dataflow either maintains or decreases the number of workers:

  • Average worker CPU usage is lower than 5%.
  • Parallelism is limited due to unparallelizable work, such as un-splittable data, like compressed files or data processed by I/O modules that don't split.
  • Number of parallelism is fixed, such as writing to existing files at a Cloud Storage destination.

If your pipeline uses a custom data source that you implemented, you can potentially improve performance by implementing a few methods that provide more information to the Dataflow service's Horizontal Autoscaling algorithm:

Java

  • In your BoundedSource subclass, implement the method getEstimatedSizeBytes. The Dataflow service uses getEstimatedSizeBytes when calculating the initial number of workers to use for your pipeline.
  • In your BoundedReader subclass, implement the method getFractionConsumed. The Dataflow service uses getFractionConsumed to track read progress and converge on the correct number of workers to use during a read.

Python

  • In your BoundedSource subclass, implement the method estimate_size. The Dataflow service uses estimate_size when calculating the initial number of workers to use for your pipeline.
  • In your RangeTracker subclass, implement the method fraction_consumed. The Dataflow service uses fraction_consumed to track read progress and converge on the correct number of workers to use during a read.

Go

  • In your RangeTracker, implement the method GetProgress(). The Dataflow service uses GetProgress to track read progress and converge on the correct number of workers to use during a read.

Streaming autoscaling

Streaming autoscaling allows the Dataflow service to adaptively change the number of workers used to execute your streaming pipeline in response to changes in load and resource utilization. Streaming autoscaling is offered at no charge and is designed to reduce the costs of the resources used when executing streaming pipelines.

The streaming autoscaling determines when to scale by monitoring the estimated backlog time. The estimated backlog time is calculated from both the throughput and the backlog bytes still to be processed from the input source. A pipeline is considered backlogged when the estimated backlog time stays above 15 seconds.

  • Scaling up: If a streaming pipeline remains backlogged with sufficient parallelism on the workers for a couple of minutes, Dataflow scales up. Dataflow targets clearing the backlog in approximately 150 seconds after scaling up, given the current throughput per worker.

  • Scaling down: If a streaming pipeline backlog is lower than 10 seconds and workers are utilizing on average less than 75% of the CPUs for a period of a couple minutes, Dataflow scales down. After scaling down, workers utilize on average, 75% of their CPUs. In streaming jobs that do not use Streaming Engine, sometimes the 75% CPU utilization cannot be achieved due to disk distribution (each worker must have the same number of persistent disks), and a lower CPU utilization is used. For example, a job set to use a maximum of 100 workers with 1 disk per worker can be scaled down to 50 workers with 2 disks per worker. For this job, a 75% CPU utilization is not achievable because the next scale down from 100 workers is 50 workers, which is less than the required 75 workers. Consequently, Dataflow does not scale down this job, resulting in a lower than 75% CPU utilization.

  • No scaling: If there is no backlog but CPU usage is 75% or greater, the pipeline does not scale down. If there is backlog but the worker does not have enough parallelism for additional workers, the pipeline does not scale up.

  • Predictive scaling: The streaming engine also uses a predictive Horizontal Autoscaling technique based on timer backlog. In the Dataflow model, the unbounded data in a streaming pipeline is divided into windows grouped by timestamps. At the end of a window, timers fire for each key being processed in that window. The firing of a timer indicates that the window is expired for a given key. The streaming engine is able to measure the timer backlog, meaning that it can predict how many timers are going to fire at the end of a window. Using the timer backlog as a signal lets Dataflow "see the future" by estimating the amount of processing that will need to happen when future timers fire. Based on the estimated future load, Dataflow autoscales in advance to meet expected demand.

Without Horizontal Autoscaling, you choose a fixed number of workers by specifying numWorkers or num_workers to run your pipeline. As the input workload varies over time, this number can become either too high or too low. Provisioning too many workers results in unnecessary extra cost, and provisioning too few workers results in higher latency for processed data. By enabling Horizontal Autoscaling, resources are used only as they are needed.

The objective of Horizontal Autoscaling streaming pipelines is to minimize backlog while maximizing worker utilization and throughput and to quickly react to spikes in load. By enabling Horizontal Autoscaling, you don't have to choose between provisioning for peak load and fresh results. Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. This way, you're paying only for what you need, and the job is processed as efficiently as possible.

In some cases, horizontal autoscaling uses the following factors in autoscaling decisions. If these factors are used for your job, you can see that information in the Autoscaling metrics tab.

  • Key based throttling uses the number of processing keys received by the job to calculate the cap for user workers, because each key can only be processed by one worker at a time.

  • Dampening downscale slows the scale down steps when it detects that unstable autoscaling decisions have occurred.

Java

Custom unbounded sources

If your pipeline uses a custom unbounded source, the source must inform the Dataflow service about backlog. Backlog is an estimate of the input in bytes that has not yet been processed by the source. To inform the service about backlog, implement either one of the following methods in your UnboundedReader class.

  • getSplitBacklogBytes() - Backlog for the current split of the source. The service aggregates backlog across all the splits.
  • getTotalBacklogBytes() - The global backlog across all the splits. In some cases the backlog is not available for each split and can only be calculated across all the splits. Only the first split (split ID '0') needs to provide total backlog.

The Apache Beam repository contains several examples of custom sources that implement the UnboundedReader class.

Enable streaming autoscaling

For streaming jobs using Streaming Engine, Horizontal Autoscaling is enabled by default.

To enable Horizontal Autoscaling for jobs not using Streaming Engine, set the following execution parameters when you start your pipeline:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N

For streaming jobs not using Streaming Engine, the minimum number of workers is 1/15th of the --maxNumWorkers value, rounded up.

Streaming pipelines are deployed with a fixed pool of Persistent Disks, equal in number to --maxNumWorkers. Take this into account when you specify --maxNumWorkers, and ensure this value is a sufficient number of disks for your pipeline.

When you update a streaming job that is not using Streaming Engine, the updated job by default has Horizontal Autoscaling disabled. If you want to keep autoscaling enabled, specify --autoscalingAlgorithm and --maxNumWorkers for the updated job.

Usage and pricing

Compute Engine usage is based on the average number of workers, while Persistent Disk usage is based on the exact value of --maxNumWorkers. Persistent Disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where --maxNumWorkers=15, you pay for between 1 and 15 Compute Engine instances and exactly 15 Persistent Disks.

Python

Enable streaming autoscaling

To enable Horizontal Autoscaling, set the following execution parameters when you start your pipeline:

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

For streaming jobs not using Streaming Engine, the minimum number of workers is 1/15th of the --max_num_workers value, rounded up.

Streaming pipelines are deployed with a fixed pool of Persistent Disks, equal in number to --max_num_workers. Take this into account when you specify --max_num_workers, and ensure this value is a sufficient number of disks for your pipeline.

When you update a streaming job that is not using Streaming Engine, the updated job by default has Horizontal Autoscaling disabled. If you want to keep autoscaling enabled, specify --autoscaling_algorithm and --max_num_workers for the updated job.

Usage and pricing

Compute Engine usage is based on the average number of workers, while Persistent Disk usage is based on the exact value of ‑‑max_num_workers. Persistent Disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where --max_num_workers=15, you pay for between 1 and 15 Compute Engine instances and exactly 15 Persistent Disks.

Go

Enable streaming autoscaling

To enable Horizontal Autoscaling, set the following execution parameters when you start your pipeline:

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

The minimum number of workers is 1/15th of the --max_num_workers value, rounded up.

Streaming pipelines are deployed with a fixed pool of Persistent Disks, equal in number to --max_num_workers. Take this into account when you specify ‑‑max_num_workers, and ensure this value is a sufficient number of disks for your pipeline.

When you update a streaming job that is not using Streaming Engine, the updated job by default has Horizontal Autoscaling disabled. If you want to keep autoscaling enabled, specify --autoscaling_algorithm and --max_num_workers for the updated job.

Usage and pricing

Compute Engine usage is based on the average number of workers, while Persistent Disk usage is based on the exact value of ‑‑max_num_workers. Persistent Disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where ‑‑max_num_workers=15, you pay for between 1 and 15 Compute Engine instances and exactly 15 Persistent Disks.

Manually scale a streaming pipeline

You can manually scale the number of workers running your streaming pipeline by using Dataflow's Update feature.

Java

To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set --maxNumWorkers equal to the maximum number of workers you want available to your pipeline.
  • Set --numWorkers equal to the initial number of workers you want your pipeline to use when it starts running.

For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.

Update the following job options:

  • --min-num-workers: the minimum number of workers.
  • --max-num-workers: the maximum number of workers.

For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.

Your pipeline's maximum scaling range depends on the number of Persistent Disks deployed when the pipeline starts. The Dataflow service deploys one Persistent Disk per worker at the maximum number of workers. Deploying extra Persistent Disks by setting --maxNumWorkers to a higher value than --numWorkers provides some benefits to your pipeline. Specifically, it allows you the flexibility to scale your pipeline to a larger number of workers after startup, and might provide improved performance.. However, your pipeline might also incur additional cost for the extra Persistent Disks. Take note of the cost and quota implications of the additional Persistent Disk resources when planning your streaming pipeline and setting the scaling range.

Python

To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set --max_num_workers equal to the maximum number of workers you want available to your pipeline.
  • Set --num_workers equal to the initial number of workers you want your pipeline to use when it starts running.

For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.

Update the following job options:

  • --min-num-workers: the minimum number of workers.
  • --max-num-workers: the maximum number of workers.

For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.

Your pipeline's maximum scaling range depends on the number of Persistent Disks deployed when the pipeline starts. The Dataflow service deploys one Persistent Disk per worker at the maximum number of workers. Deploying extra Persistent Disks by setting --max_num_workers to a higher value than --num_workers provides some benefits to your pipeline. Specifically, it allows you the flexibility to scale your pipeline to a larger number of workers after startup and might provide improved performance.. However, your pipeline might also incur additional cost for the extra Persistent Disks. Take note of the cost and quota implications of the additional Persistent Disk resources when planning your streaming pipeline and setting the scaling range.

Go

To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set ‑‑max_num_workers equal to the maximum number of workers you want available to your pipeline.
  • Set ‑‑num_workers equal to the initial number of workers you want your pipeline to use when it starts running.

For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.

Update the following job options:

  • --min-num-workers: the minimum number of workers.
  • --max-num-workers: the maximum number of workers.

For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.

Your pipeline's maximum scaling range depends on the number of Persistent Disks deployed when the pipeline starts. The Dataflow service deploys one Persistent Disk per worker at the maximum number of workers. Deploying extra Persistent Disks by setting ‑‑max_num_workers to a higher value than ‑‑num_workers provides some benefits to your pipeline. Specifically, it allows you the flexibility to scale your pipeline to a larger number of workers after startup, and might provide improved performance.. However, your pipeline might also incur additional cost for the extra Persistent Disks. Take note of the cost and quota implications of the additional Persistent Disk resources when planning your streaming pipeline and setting the scaling range.

Metrics

To find the current autoscale limits for a job, query the following Cloud Monitoring metrics:

  • job/max_worker_instances_limit: Maximum number of workers.
  • job/min_worker_instances_limit: Minimum number of workers.

Next steps