Autoscaling clusters

Estimating the “right” number of cluster workers (nodes) for a workload is difficult, and a single configuration for an entire pipeline often is not ideal. Cloud Dataproc helps you meet this challenge in two ways:

  1. User-initiated Cluster Scaling
  2. Cluster Autoscaling, which dynamically scales clusters up and down within min and max bounds set by the user after evaluating Apache Hadoop YARN cluster metrics found in Stackdriver Monitoring

Autoscaling recommendations

Use Autoscaling with:

clusters that store data in external services, such as Cloud Storage or BigQuery

clusters that process many jobs

Use Autoscaling to:

scale up single-job clusters

Autoscaling is not recommended with/for:

  • HDFS: Autoscaling is not intended for scaling on-cluster HDFS. If you use Autoscaling with HDFS, make sure that the minimum number of primary workers is sufficient to handle all HDFS data. Also realize that decommissioning HDFS Datanodes can delay the removal of workers.

  • Spark Structured Streaming: Autoscaling does not support Spark Structured Streaming (see Autoscaling and Spark Structured Streaming).

  • Idle Clusters: Autoscaling is not recommended for the purpose of scaling a cluster down to minimum size when the cluster is idle. Since creating a new cluster is as fast as resizing one, consider deleting idle clusters and recreating them instead. The following tools support this “ephemeral” model:

    Use Cloud Dataproc Workflows to schedule a set of jobs on a dedicated cluster, and then delete the cluster when the jobs are finished. For more advanced orchestration, use Cloud Composer, which is based on Apache Airflow.

    For clusters that process ad-hoc queries or externally scheduled workloads, use Cluster Scheduled Deletion to delete the cluster after a specified idle period or duration, or at a specific time.

Creating an Autoscaling cluster

You can create an autoscaling cluster by setting all required Autoscaling properties. Properties are specified with a "dataproc:alpha.autoscaling." prefix. Autoscaling property suffixes are listed in the Autoscaling properties table.

The following example creates an autoscaling cluster, setting the required Autoscaling properties:

gcloud beta dataproc clusters create cluster-name --properties "\
dataproc:alpha.autoscaling.enabled=true,\
dataproc:alpha.autoscaling.primary.max_workers=100,\
dataproc:alpha.autoscaling.secondary.max_workers=100,\
dataproc:alpha.autoscaling.cooldown_period=1h,\
dataproc:alpha.autoscaling.scale_up.factor=0.05,\
dataproc:alpha.autoscaling.graceful_decommission_timeout=1h"

Initial cluster size.

When you create an Autoscaling cluster, you are not required to set the initial number of primary and secondary workers. Autoscaling will start from the default cluster size (2 primary and 0 secondary workers) and scale up as necessary. However, setting an initial number of primary workers can help performance. For example, if you are certain that you need a large cluster, you can set the cluster to an initial large size to avoid waiting multiple Autoscaling periods for the cluster to reach the scaled-up size.

How Autoscaling works

Autoscaling checks cluster Hadoop YARN metrics as each "cool down" period elapses to determine whether to scale the cluster, and, if so, the magnitude of the update.

  1. On each evaluation, Autoscaling examines pending and available cluster memory averaged over the last cooldown_period in order to determine the exact change needed to the number of workers:

    exact Δworkers = avg(pending memory - available memory) / memory per worker

    • pending memory is a signal that the cluster has tasks queued but not yet executed, and may need to be scaled up to better handle its workload.
    • available memory is a signal that the cluster has extra bandwidth, and may need to be scaled down to conserve resources.
    • See Autoscaling with Hadoop and Spark for additional information on these Apache Hadoop YARN metrics.
  2. Given the exact change needed to the number of workers, Autoscaling uses a scale_up.factor or scale_down.factor to calculate the actual change to the number of workers:

    actual Δworkers = exact Δworkers * scale_factor

    A scale_factor of 1.0 means Autoscaling will scale so that pending/available memory is 0 (perfect utilization).

  3. Once the actual change to the number of workers is calculated, a min_worker_fraction acts as a threshold to determine if Autoscaling will scale the cluster. A small min_worker_fraction signifies that Autoscaling should scale even if the Δworkers is small. A larger min_worker_fraction means that scaling should only occur when the Δworkers is large.

    if (Δworkers > min_worker_fraction * cluster size) then scale

  4. If the number of workers to scale is large enough to trigger scaling, Autoscaling uses the min/max bounds of the worker groups and the configured secondary_worker_fraction to determine how to split the number of workers across the primary and secondary instance groups. The result of these calculations is the final Autoscaling change to the cluster for the scaling period.

Autoscaling properties

Autoscaling is configured by setting properties. In the table, below:

  • Standard workers (nodes) are referred to as "primary" workers
  • Preemptible workers (nodes) are referred to as "secondary" workers
  • Required properties have no default
  • Property suffixes are listed in the table. The suffixes are added to the end of the dataproc:alpha.autoscaling. prefix to form the property name (for example, the suffix for the property used to enable Autoscaling is enabled, and the complete property name is dataproc:alpha.autoscaling.enabled).
Property suffix Description
enabled Enables autoscaling
Required: boolean: true or false
primary.min_workers Minimum number of primary workers
Optional: int Bounds: [2, primary.max_workers] Default: 2
primary.max_workers Maximum number of primary workers
Required: int Bounds: [primary.min_workers, )
min_workers Minimum number of secondary workers
Optional: int Bounds: [0, secondary.max_workers] Default: 0
secondary.max_workers Maximum number of secondary workers. Since secondary VMs are terminated at least every 24 hours, they are best suited for short and non-critical workloads. Set to 0 to avoid using secondary workers.
Required: int Bounds: [secondary.max_workers, )
secondary_worker_fraction Target fraction of secondary workers. The cluster may not reach this fraction if not allowed by worker-count bounds. For example, if secondary.max_workers=0, only primary workers will be added. The cluster can also be out of balance when created.
Optional: double Bounds: [0.0, 1.0] Default: 0.5
cooldown_period Duration between scaling events. A scaling period starts after the update operation from the previous event has completed.
Required: duration (s,m,h,d) Bounds: [10m, )
scale_up.factor Fraction of average pending memory in the last cooldown period for which to add workers. A scale-up factor of 1.0 will result in scaling up so that there is no pending memory remaining after the update (more aggressive scaling). A scale-up factor closer to 0 will result in a smaller magnitude of scaling up (less aggressive scaling).
Required: double Bounds: [0.0, 1.0]
scale_up.min_worker_fraction Minimum scale-up threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2-worker scale-up for the cluster to scale. A threshold of 0 means the autoscaler will scale up on any recommended change.
Optional: double Bounds: [0.0, 1.0] Default: 0.0
scale_down.factor Fraction of average pending memory in the last cooldown period for which to remove workers. A scale-down factor of 1 will result in scaling down so that there is no available memory remaining after the update (more aggressive scaling). A scale-down factor of 0 disables removing workers, which can be beneficial for autoscaling a single job.
Optional: double Bounds: [0.0, 1.0] Default: 1.0
min_worker_fraction Minimum scale-down threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2 worker scale-down for the cluster to scale. A threshold of 0 means the autoscaler will scale down on any recommended change.
Optional: double Bounds: [0.0, 1.0] Default: 0.0
graceful_decommission_timeout Timeout for YARN graceful decommissioning of Node Managers. Specifies the duration to wait for jobs to complete before forcefully removing workers (and potentially interrupting jobs). Only applicable to downscaling operations.
Required: duration (s,m,h,d) Bounds: [0s, 1d]

Autoscaling with Apache Hadoop and Apache Spark

The following sections discuss how Autoscaling does (or does not) interoperate with Hadoop YARN and Hadoop Mapreduce, and Apache Spark, Spark Streaming, and Spark Structured Streaming.

Hadoop YARN Metrics

Autoscaling configures Hadoop YARN to schedule jobs based on YARN memory requests, not on YARN core requests.

Autoscaling is centered around the following Hadoop YARN metrics:

  1. Allocated memory refers to the total YARN memory taken up by running containers across the whole cluster. If there are 6 running containers that can use up to 1GB, there is 6GB of allocated memory.

  2. Available memory is YARN memory in the cluster not used by allocated containers. If there is 10GB of memory across all node managers and 6GB of allocated memory, there is 4GB of available memory. If there is available (unused) memory in the cluster, Autoscaling may remove workers from the cluster.

  3. Pending memory is the sum of YARN memory requests for pending containers. Pending containers are waiting for space to run in YARN. Pending memory is non-zero only if available memory is zero or too small to allocate to the next container. If there are pending containers, Autoscaling may add workers to the cluster.

You can view these metrics in Stackdriver Monitoring. YARN memory will be 0.8 * total memory on the cluster. Remaining memory is reserved for other daemons and operating system use, such as the page cache.

Autoscaling and Hadoop MapReduce

MapReduce runs each map and reduce task as a separate YARN container. When a job begins, MapReduce submits container requests for each map task, resulting in a large spike in pending YARN memory. As map tasks finish, pending memory decreases.

When mapreduce.job.reduce.slowstart.completedmaps have completed (95% by default on Cloud Dataproc), MapReduce enqueues container requests for all reducers, resulting in another spike in pending memory.

Unless your map and reduce tasks take several minutes or longer, don't set a high value for dataproc:alpha.autoscaling.scale_up.factor. Adding workers to the cluster takes at least 1.5 minutes, so make sure there is sufficient pending work to utilize new worker for several minutes. A good starting point is to set dataproc:alpha.autoscaling.scale_up.factor to 0.05 (5%) or 0.1 (10%) of pending memory.

Autoscaling and Spark

Spark adds an additional layer of scheduling on top of YARN. Specifically, Spark Core’s dynamic allocation makes requests to YARN for containers to run Spark executors, then schedules Spark tasks on threads on those executors. Cloud Dataproc clusters enable dynamic allocation by default, so executors are added and removed as needed.

Spark always asks YARN for containers, but without dynamic allocation, it only asks for containers at the beginning of the job. With dynamic allocation, it will remove containers, or ask for new ones, as necessary.

Spark starts from a small number of executors – 2 on Autoscaling clusters – and continues to double the number of executors while there are backlogged tasks. This smooths out pending memory (fewer pending memory spikes). It is recommended that you set dataproc:alpha.autoscaling.scale_up.factor to a large number, such as 1.0 (100%), for Spark jobs.

As a Spark default, executors with cached data do not exit (spark.dynamicAllocation.cachedExecutorIdleTimeout=0), and executors without cached data will exit after 60 seconds (spark.dynamicAllocation.executorIdleTimeout=60). Since Cloud Dataproc configures the external YARN-based shuffle service, executors often exit before their shuffle data is served. Therefore, set a non-zero cachedExecutorIdleTimeout so that executors with cached data eventually exit. Also set a non-zero dataproc:alpha.autoscaling.graceful_decommission_timeout to drain shuffle data on workers before Autoscaling removes them.

Disabling Spark dynamic allocation

If you are running separate Spark jobs that do not benefit from Spark dynamic allocation, you can disable Spark dynamic allocation by setting spark.dynamicAllocation.enabled=false and setting spark.executor.instances. You can still use Autoscaling to scale clusters up and down while the separate Spark jobs run.

Autoscaling and Spark Streaming

To use Spark Streaming with Autoscaling:

  1. Since Spark Streaming has its own version of dynamic allocation that uses streaming-specific signals to add and remove executors, set spark.streaming.dynamicAllocation.enabled=true and disable Spark Core’s dynamic allocation by setting spark.dynamicAllocation.enabled=false.

  2. Until the issue of Spark Streaming dynamic allocation should respect spark.executor.instances is fixed, use an initialization action to remove spark.executor.instances from /etc/spark/conf/spark-defaults.conf.

  3. Graceful decommissioning (dataproc:alpha.autoscaling.graceful_decommission_timeout) does not apply to Spark Streaming jobs. Instead, to safely remove worker with Autoscaling, configure checkpointing for fault tolerance.

Alternatively, to use Spark Streaming without Autoscaling:

  1. Disable Spark Core’s dynamic allocation (spark.dynamicAllocation.enabled=false), and
  2. Set the number of executors (spark.executor.instances) for your job. See Cluster properties.

Autoscaling and Spark Structured Streaming

Autoscaling is not compatible with Spark Structured Streaming since Spark Structured Streaming currently does not support dynamic allocation.

Downscaling considerations and recommendations

  • Shuffle data: In Hadoop MapReduce, map tasks write shuffle data to local disk, which is served to reducer tasks via a server running on the Node Manager. Removing workers with shuffle data, even when tasks are not running, is problematic since doing so can set job progress back if map tasks need to be re-run. Spark also shuffles data between stage boundaries, and if it detects missing shuffle files, it re-runs the entire stage.

    Recommendation: On multi-job clusters, set dataproc:alpha.autoscaling.graceful_decommission_timeout so that in-progress jobs have sufficient time to complete before Autoscaling removes workers. Generally, it's best to set the timeout to the duration of your longest job to make sure all jobs complete before workers are removed.

    Alternate Strategy 1: Disable Autoscaling downscaling by setting dataproc:alpha.autoscaling.scale_down.factor=0.0 so that workers will not be removed regardless of available memory size. This strategy can be useful for single-job clusters where downscaling can set back job progress.

    Alternate Strategy 2: Set the cluster to scale down only when the cluster is idle by setting dataproc:alpha.autoscaling.scale_down.factor=1.0 and dataproc:alpha.autoscaling.scale_down.min_worker_fraction=1.0.

  • Cached data: Spark can cache datasets in executor memory or disk. Usually, Spark executors exit when they have no work to process, but if they have cached data, by default they do not (never) exit. Therefore, for notebooks and other applications that maintain cached data, the executors will stay alive—and Autoscaling will not scale down the cluster—after the applications are no longer needed.

    Recommendation: To make sure executors with cached data exit eventually, consider setting spark.dynamicAllocation.cachedExecutorIdleTimeout or uncaching datasets when they are no longer needed.

Controlling Autoscaling through partitioning and parallelism

While parallelism is usually set or determined by cluster resources (for example, the number of HDFS blocks controls by the number of tasks), with Autoscaling, the converse applies: Autoscaling sets cluster resources (workers) according to job parallelism. The following are guidelines to help you set job parallelism:

  • While Cloud Dataproc sets the default number of MapReduce reduce tasks based on initial cluster size of your cluster, you can set mapreduce.job.reduces to increase the parallelism of the reduce phase.
  • Spark SQL and Dataframe parallelism is determined by spark.sql.shuffle.partitions, which defaults to 200.
  • Spark’s RDD functions default to spark.default.parallelism, which is related to the number of executors when the job starts. However, all RDD functions that create shuffles take a parameter for the number of partitions, which overrides spark.default.parallelism.

You should ensure your data is evenly partitioned. If there is significant key skew, one or more tasks may take significantly longer than other tasks, resulting in low utilization.

Autoscaling default Spark/Hadoop property settings

Autoscaling clusters have default cluster property values that help avoid job failure when primary workers are removed or secondary workers are preempted. You can override default values when you create a cluster with Autoscaling (see Cluster Properties.

Defaults to increase the maximum number of retries for tasks, application masters, and stages:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Defaults to reset retry counters (useful for long-running Spark Streaming jobs):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Default to have Spark’s slow-start dynamic allocation mechanism start from a small size:

spark:spark.executor.instances=2

Autoscaling metrics and logs

The following resources and tools can help you monitor Autoscaling operations and their effect on your cluster and its jobs.

Stackdriver Monitoring

Use Stackdriver Monitoring to:

  • view the metrics used by Autoscaling
  • view the number of Node Managers in your cluster
  • understand why Autoscaling did or did not scale your cluster autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Stackdriver Audit Logging

Use Cloud Audit Logging to see how Autoscaling resized your cluster.

Frequently Asked Questions (FAQs)

Can Autoscaling be enabled on High Availability clusters and Single Node clusters?

Autoscaling can be enabled on High Availability clusters, but not on Single Node clusters (Single Node clusters do not support resizing).

Can you modify Autoscaling properties after cluster creation or on a running cluster?

No. Currently, Autoscaling properties cannot be modified after a cluster has been created, nor can they be added to running clusters.

Can you manually resize an Autoscaling cluster?

Yes. You may decide to manually resize a cluster as a stop-gap measure if Autoscaling is making poor decisions or you want to adjust workload configuration settings. However, these changes will only have a temporary effect, and Autoscaling will eventually scale back the cluster.

Instead of manually resizing a cluster, consider:

Deleting and recreating the cluster with a better workload configuration.

Getting Cloud Dataproc help.

What image versions support autoscaling? What API versions?

Autoscaling is supported on versions 1.2.22+ and 1.3.0+. Autoscaling is not currently supported on versions 1.0 and 1.1. (see the Cloud Dataproc Version List). . Autoscaling is currently in Alpha release, but can be enabled through the Cloud Dataproc v1 or v1beta2 APIs.

How is Cloud Dataproc different from Cloud Dataflow autoscaling?

See Comparing Cloud Dataflow autoscaling to Spark and Hadoop

Autoscaling has many settable properties. Is there a simpler way to create Autoscaling clusters?

You can create clusters from YAML files. The file format directly matches the REST API (you must encode Autoscaling properties as cluster properties strings).

Example:

gcloud beta dataproc clusters create-from-file --file=cluster.yaml
$ cat cluster.yaml
projectId: PROJECT
clusterName: NAME
config:
  gceClusterConfig:
    zoneUri: us-central1-a
  softwareConfig:
    properties:
      dataproc:alpha.autoscaling.enabled: 'true'
      dataproc:alpha.autoscaling.primary.max_workers: '100'
      dataproc:alpha.autoscaling.secondary.max_workers: '100'
      dataproc:alpha.autoscaling.cooldown_period: '1h'
      dataproc:alpha.autoscaling.scale_up.factor: '0.05'
      dataproc:alpha.autoscaling.graceful_decommission_timeout: '1h'
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataproc Documentation