With Horizontal Autoscaling enabled, the Dataflow service automatically chooses the appropriate number of worker instances required to run your job. The Dataflow service might also dynamically re-allocate more workers or fewer workers during runtime to account for the characteristics of your job. Certain parts of your pipeline might be computationally heavier than others, and the Dataflow service might automatically spin up additional workers during these phases of your job and shut them down when they're no longer needed.
Configure autoscaling
Use the following options to configure autoscaling.
Java
Horizontal Autoscaling is enabled by default on the streaming jobs that use
Streaming Engine and all batch jobs. You can disable
Horizontal Autoscaling by specifying
the flag --autoscalingAlgorithm=NONE
when you run your pipeline. When you
disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on
the --numWorkers
option, which defaults to 3.
With Horizontal Autoscaling enabled, the Dataflow service does
not allow user control of the exact number of worker instances allocated to
your job. You might still cap the number of workers by
specifying the --maxNumWorkers
option when you run your pipeline.
For batch jobs, the --maxNumWorkers
flag is optional. The default is 1000
.
For streaming jobs using Streaming Engine, the --maxNumWorkers
flag is
optional. The default is 100
. For streaming jobs not using Streaming Engine,
the --maxNumWorkers
flag is required.
Python
Horizontal Autoscaling is enabled by default on the streaming jobs that use
Streaming Engine and all batch
Dataflow jobs created using the Apache Beam SDK for Python
version 0.5.1 or later. You can disable Horizontal Autoscaling by explicitly
specifying the flag
--autoscaling_algorithm=NONE
when you run your pipeline. When you
disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on
the --numWorkers
option, which defaults to 3.
With Horizontal Autoscaling enabled, the Dataflow service
doesn't let you control the exact number of worker instances allocated to your
job. You might still cap the number of workers by
specifying the
--max_num_workers
option when you run your pipeline.
For batch jobs, the --max_num_workers
flag is optional. The default is
1000
. For streaming jobs using Streaming Engine, the --max_num_workers
flag is optional. The default is 100
. For streaming jobs not using Streaming
Engine, the --max_num_workers
flag is required.
Go
Horizontal Autoscaling is enabled by default on the streaming jobs that use
Streaming Engine and all batch
Dataflow jobs created using the Apache Beam Go SDK. You can
disable Horizontal Autoscaling by explicitly specifying
the flag ‑‑autoscaling_algorithm=NONE
when you run your pipeline. When you
disable Horizontal Autoscaling, the Dataflow service sets the number of workers based on
the --numWorkers
option, which defaults to 3.
With Horizontal Autoscaling enabled, the Dataflow service
doesn't let you control the exact number of worker instances allocated to your
job. You might still cap the number of workers by
specifying the ‑‑max_num_workers
option when you run your pipeline.
For batch jobs, the ‑‑max_num_workers
flag is optional. The default is
1000
. For streaming jobs using Streaming Engine, the ‑‑max_num_workers
flag
is optional. The default is 100
. For streaming jobs not using Streaming
Engine, the ‑‑max_num_workers
flag is required.
Dataflow scales based on the parallelism of a pipeline. The parallelism of a pipeline is an estimate of the number of threads needed to most efficiently process data at any given time.
Batch autoscaling
For batch pipelines, Dataflow automatically chooses the number of workers based on the estimated total amount of work in each stage of your pipeline, which is dependent on both the input size and the current throughput. Dataflow re-evaluates the amount of work according to the progress of the execution every 30 seconds and dynamically scales up or down the number of workers as the estimated total amount of work increases or decreases.
If any of the following conditions occur, to save idle resources, Dataflow either maintains or decreases the number of workers:
- Average worker CPU usage is lower than 5%.
- Parallelism is limited due to unparallelizable work, such as un-splittable data, like compressed files or data processed by I/O modules that don't split.
- Number of parallelism is fixed, such as writing to existing files at a Cloud Storage destination.
If your pipeline uses a custom data source that you implemented, you can potentially improve performance by implementing a few methods that provide more information to the Dataflow service's Horizontal Autoscaling algorithm:
Java
- In your
BoundedSource
subclass, implement the methodgetEstimatedSizeBytes
. The Dataflow service usesgetEstimatedSizeBytes
when calculating the initial number of workers to use for your pipeline. - In your
BoundedReader
subclass, implement the methodgetFractionConsumed
. The Dataflow service usesgetFractionConsumed
to track read progress and converge on the correct number of workers to use during a read.
Python
- In your
BoundedSource
subclass, implement the methodestimate_size
. The Dataflow service usesestimate_size
when calculating the initial number of workers to use for your pipeline. - In your
RangeTracker
subclass, implement the methodfraction_consumed
. The Dataflow service usesfraction_consumed
to track read progress and converge on the correct number of workers to use during a read.
Go
- In your
RangeTracker
, implement the methodGetProgress()
. The Dataflow service usesGetProgress
to track read progress and converge on the correct number of workers to use during a read.
Streaming autoscaling
Streaming autoscaling allows the Dataflow service to adaptively change the number of workers used to execute your streaming pipeline in response to changes in load and resource utilization. Streaming autoscaling is offered at no charge and is designed to reduce the costs of the resources used when executing streaming pipelines.
The streaming autoscaling determines when to scale by monitoring the estimated backlog time. The estimated backlog time is calculated from both the throughput and the backlog bytes still to be processed from the input source. A pipeline is considered backlogged when the estimated backlog time stays above 15 seconds.
Scaling up: If a streaming pipeline remains backlogged with sufficient parallelism on the workers for a couple of minutes, Dataflow scales up. Dataflow targets clearing the backlog in approximately 150 seconds after scaling up, given the current throughput per worker.
Scaling down: If a streaming pipeline backlog is lower than 10 seconds and workers are utilizing on average less than 75% of the CPUs for a period of a couple minutes, Dataflow scales down. After scaling down, workers utilize on average, 75% of their CPUs. In streaming jobs that do not use Streaming Engine, sometimes the 75% CPU utilization cannot be achieved due to disk distribution (each worker must have the same number of persistent disks), and a lower CPU utilization is used. For example, a job set to use a maximum of 100 workers with 1 disk per worker can be scaled down to 50 workers with 2 disks per worker. For this job, a 75% CPU utilization is not achievable because the next scale down from 100 workers is 50 workers, which is less than the required 75 workers. Consequently, Dataflow does not scale down this job, resulting in a lower than 75% CPU utilization.
No scaling: If there is no backlog but CPU usage is 75% or greater, the pipeline does not scale down. If there is backlog but the worker does not have enough parallelism for additional workers, the pipeline does not scale up.
Predictive scaling: The streaming engine also uses a predictive Horizontal Autoscaling technique based on timer backlog. In the Dataflow model, the unbounded data in a streaming pipeline is divided into windows grouped by timestamps. At the end of a window, timers fire for each key being processed in that window. The firing of a timer indicates that the window is expired for a given key. The streaming engine is able to measure the timer backlog, meaning that it can predict how many timers are going to fire at the end of a window. Using the timer backlog as a signal lets Dataflow "see the future" by estimating the amount of processing that will need to happen when future timers fire. Based on the estimated future load, Dataflow autoscales in advance to meet expected demand.
Without Horizontal Autoscaling, you choose a fixed number of workers by
specifying numWorkers
or num_workers
to run your pipeline. As the input
workload varies over time, this number can become either too high or too low.
Provisioning too many workers results in unnecessary extra cost, and
provisioning too few workers results in higher latency for processed data. By
enabling Horizontal Autoscaling, resources are used only as they are needed.
The objective of Horizontal Autoscaling streaming pipelines is to minimize backlog while maximizing worker utilization and throughput and to quickly react to spikes in load. By enabling Horizontal Autoscaling, you don't have to choose between provisioning for peak load and fresh results. Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. This way, you're paying only for what you need, and the job is processed as efficiently as possible.
In some cases, horizontal autoscaling uses the following factors in autoscaling decisions. If these factors are used for your job, you can see that information in the Autoscaling metrics tab.
Key based throttling uses the number of processing keys received by the job to calculate the cap for user workers, because each key can only be processed by one worker at a time.
Dampening downscale slows the scale down steps when it detects that unstable autoscaling decisions have occurred.
Java
Custom unbounded sources
If your pipeline uses a custom unbounded source, the source must inform the
Dataflow service about backlog. Backlog is an estimate of the
input in bytes that has not yet been processed by the source. To inform the
service about backlog, implement either one of the following methods in your
UnboundedReader
class.
getSplitBacklogBytes()
- Backlog for the current split of the source. The service aggregates backlog across all the splits.getTotalBacklogBytes()
- The global backlog across all the splits. In some cases the backlog is not available for each split and can only be calculated across all the splits. Only the first split (split ID '0') needs to provide total backlog.
The Apache Beam repository contains several
examples
of custom sources that implement the UnboundedReader
class.
Enable streaming autoscaling
For streaming jobs using Streaming Engine, Horizontal Autoscaling is enabled by default.
To enable Horizontal Autoscaling for jobs not using Streaming Engine, set the following execution parameters when you start your pipeline:
--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N
For streaming jobs not using Streaming Engine, the minimum number of workers
is 1/15th of the --maxNumWorkers
value, rounded up.
Streaming pipelines are deployed with a fixed pool of
Persistent Disks, equal in number to --maxNumWorkers
.
Take this into account when you specify --maxNumWorkers
, and ensure this
value is a sufficient number of disks for your pipeline.
When you update a streaming job that is not using Streaming Engine, the
updated job by default has Horizontal Autoscaling disabled. If you want to
keep autoscaling enabled, specify --autoscalingAlgorithm
and
--maxNumWorkers
for the updated job.
Usage and pricing
Compute Engine usage is based on the average number of workers, while
Persistent Disk usage is based on the exact value of --maxNumWorkers
.
Persistent Disks are redistributed such that each worker gets an equal number of
attached disks.
In the example above, where --maxNumWorkers=15
, you pay
for between 1 and 15 Compute Engine instances and exactly 15
Persistent Disks.
Python
Enable streaming autoscaling
To enable Horizontal Autoscaling, set the following execution parameters when you start your pipeline:
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N
For streaming jobs not using Streaming Engine, the minimum number of workers
is 1/15th of the --max_num_workers
value, rounded up.
Streaming pipelines are deployed with a fixed pool of
Persistent Disks, equal in number to
--max_num_workers
. Take this into account when you specify
--max_num_workers
, and ensure this value is a sufficient number of disks for
your pipeline.
When you update a streaming job that is not using Streaming Engine, the
updated job by default has Horizontal Autoscaling disabled. If you want to
keep autoscaling enabled, specify --autoscaling_algorithm
and
--max_num_workers
for the updated job.
Usage and pricing
Compute Engine usage is based on the average number of workers, while
Persistent Disk usage is based on the exact value of ‑‑max_num_workers
.
Persistent Disks are redistributed such that each worker gets an equal number of
attached disks.
In the example above, where --max_num_workers=15
, you pay
for between 1 and 15 Compute Engine instances and exactly 15
Persistent Disks.
Go
Enable streaming autoscaling
To enable Horizontal Autoscaling, set the following execution parameters when you start your pipeline:
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N
The minimum number of workers is 1/15th of the --max_num_workers
value,
rounded up.
Streaming pipelines are deployed with a fixed pool of
Persistent Disks, equal in number to
--max_num_workers
. Take this into account when you specify
‑‑max_num_workers
, and ensure this value is a sufficient number of disks for
your pipeline.
When you update a streaming job that is not using Streaming Engine, the
updated job by default has Horizontal Autoscaling disabled. If you want to
keep autoscaling enabled, specify --autoscaling_algorithm
and
--max_num_workers
for the updated job.
Usage and pricing
Compute Engine usage is based on the average number of workers, while
Persistent Disk usage is based on the exact value of ‑‑max_num_workers
.
Persistent Disks are redistributed such that each worker gets an equal number of
attached disks.
In the example above, where ‑‑max_num_workers=15
, you pay
for between 1 and 15 Compute Engine instances and exactly 15
Persistent Disks.
Manually scale a streaming pipeline
You can manually scale the number of workers running your streaming pipeline by using Dataflow's Update feature.
Java
To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:
- Set
--maxNumWorkers
equal to the maximum number of workers you want available to your pipeline. - Set
--numWorkers
equal to the initial number of workers you want your pipeline to use when it starts running.
For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.
Update the following job options:
--min-num-workers
: the minimum number of workers.--max-num-workers
: the maximum number of workers.
For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.
Your pipeline's maximum scaling range depends on the number of
Persistent Disks deployed when the pipeline starts. The Dataflow
service deploys one Persistent Disk per worker at the maximum number of
workers. Deploying extra Persistent Disks by setting --maxNumWorkers
to a higher
value than --numWorkers
provides some benefits to your pipeline. Specifically,
it allows you the flexibility to scale your pipeline to a larger number of
workers after startup, and might provide
improved performance..
However, your pipeline might also incur additional cost for the extra
Persistent Disks. Take note of the cost and quota implications of the additional
Persistent Disk resources when planning your streaming pipeline and setting the
scaling range.
Python
To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:
- Set
--max_num_workers
equal to the maximum number of workers you want available to your pipeline. - Set
--num_workers
equal to the initial number of workers you want your pipeline to use when it starts running.
For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.
Update the following job options:
--min-num-workers
: the minimum number of workers.--max-num-workers
: the maximum number of workers.
For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.
Your pipeline's maximum scaling range depends on the number of
Persistent Disks deployed when the pipeline starts. The Dataflow
service deploys one Persistent Disk per worker at the maximum number of
workers. Deploying extra Persistent Disks by setting --max_num_workers
to a
higher value than --num_workers
provides some benefits to your
pipeline. Specifically, it allows you the flexibility to scale your pipeline
to a larger number of workers after startup and might provide
improved performance..
However, your pipeline might also incur additional cost for the extra
Persistent Disks. Take note of the cost and quota implications of the additional
Persistent Disk resources when planning your streaming pipeline and setting the
scaling range.
Go
To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:
- Set
‑‑max_num_workers
equal to the maximum number of workers you want available to your pipeline. - Set
‑‑num_workers
equal to the initial number of workers you want your pipeline to use when it starts running.
For jobs that use Streaming Engine, you can update the minimum and maximum workers without stopping or replacing the job. For more information, see in-flight job option updates.
Update the following job options:
--min-num-workers
: the minimum number of workers.--max-num-workers
: the maximum number of workers.
For jobs that don't use Streaming Engine, you can replace the existing job. See Launch a replacement job.
Your pipeline's maximum scaling range depends on the number of
Persistent Disks deployed when the pipeline starts. The Dataflow
service deploys one Persistent Disk per worker at the maximum number of
workers. Deploying extra Persistent Disks by setting ‑‑max_num_workers
to a
higher value than ‑‑num_workers
provides some benefits to your
pipeline. Specifically, it allows you the flexibility to scale your pipeline to
a larger number of workers after startup, and might provide
improved performance..
However, your pipeline might also incur additional cost for the extra
Persistent Disks. Take note of the cost and quota implications of the additional
Persistent Disk resources when planning your streaming pipeline and setting the
scaling range.
Metrics
To find the current autoscale limits for a job, query the following Cloud Monitoring metrics:
job/max_worker_instances_limit
: Maximum number of workers.job/min_worker_instances_limit
: Minimum number of workers.