Autoscaling clusters

What is autoscaling?

Estimating the "right" number of cluster workers (nodes) for a workload is difficult, and a single cluster size for an entire pipeline is often not ideal. User-initiated Cluster Scaling partially addresses this challenge, but requires monitoring cluster utilization and manual intervention.

The Cloud Dataproc AutoscalingPolicies API provides a mechanism for automating cluster resource management and enables cluster autoscaling. An Autoscaling Policy is a reusable configuration that describes how clusters using the autoscaling policy should scale. It defines scaling boundaries, frequency, and aggressiveness to provide fine-grained control over cluster resources throughout cluster lifetime.

When to use autoscaling

Use autoscaling:

on clusters that store data in external services, such as Cloud Storage or BigQuery

on clusters that process many jobs

to scale up single-job clusters

Autoscaling is not recommended with/for:

  • HDFS: Autoscaling is not intended for scaling on-cluster HDFS. If you use autoscaling with HDFS, make sure that the minimum number of primary workers is sufficient to handle all HDFS data. Also realize that decommissioning HDFS Datanodes can delay the removal of workers.

  • YARN Node Labels: Autoscaling does not support YARN Node Labels, nor the property dataproc:am.primary_only due to YARN-9088. YARN incorrectly reports cluster metrics when node labels are used.

  • Spark Structured Streaming: Autoscaling does not support Spark Structured Streaming (see Autoscaling and Spark Structured Streaming).

  • Idle Clusters: Autoscaling is not recommended for the purpose of scaling a cluster down to minimum size when the cluster is idle. Since creating a new cluster is as fast as resizing one, consider deleting idle clusters and recreating them instead. The following tools support this "ephemeral" model:

    Use Cloud Dataproc Workflows to schedule a set of jobs on a dedicated cluster, and then delete the cluster when the jobs are finished. For more advanced orchestration, use Cloud Composer, which is based on Apache Airflow.

    For clusters that process ad-hoc queries or externally scheduled workloads, use Cluster Scheduled Deletion to delete the cluster after a specified idle period or duration, or at a specific time.

Enabling autoscaling

To enable autoscaling on a cluster:

  1. Create an autoscaling policy.

  2. Either:

    1. Create an autoscaling cluster, or
    2. Enable autoscaling on an existing cluster.

Create an autoscaling policy

gcloud command

You can use the gcloud beta dataproc autoscaling-policies import command to create an autoscaling policy. It reads a local YAML file that defines an autoscaling policy. The format and content of the file should match config objects and fields defined by the autoscalingPolicies REST API.

The following YAML example defines a policy that specifies all required fields. It also provides maxInstances values for both primary and secondary (preemptible) workers, and also specifies a 4-minute cooldownPeriod (the default is 2 minutes).

workerConfig:
  maxInstances: 100
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Here is another YAML example that specifies all optional and required autoscaling policy fields.

workerConfig:
  minInstances: 2
  maxInstances: 100
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Run the following gcloud command from a local terminal or in Cloud Shell to create the autoscaling policy. Provide a name for the policy. This name will become the policy id, which you can use in later gcloud commands to reference the policy. Use the --source flag to specify the local file path and file name of the autoscaling policy YAML file to import.

gcloud beta dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml

REST API

Create an autoscaling policy by defining an AutoscalingPolicy as part of an autoscalingPolicies.create request.

Console

Currently, creating an autoscaling policy is not supported in the Google Cloud Platform Console.

Create an autoscaling cluster

After creating an autoscaling policy, create a cluster that will use the autoscaling policy. The cluster must be in the same region as the autoscaling policy.

gcloud command

Run the following gcloud command from a local terminal or in Cloud Shell to create an autoscaling cluster. Provide a name for the cluster, and use the --autoscaling-policy flag to specify the policy id (the name of the policy you specified when you created the policy) or the policy resource URI (resource name) (see the AutoscalingPolicy id and name fields).

gcloud beta dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI

REST API

Create an autoscaling cluster by including an AutoscalingConfig as part of a clusters.create request.

Console

Currently, creating an autoscaling cluster is not supported in the Google Cloud Platform Console.

Enable autoscaling on an existing cluster

After creating an autoscaling policy, you can enable the policy on an existing cluster in the same region.

gcloud command

Run the following gcloud command from a local terminal or in Cloud Shell to enable an autoscaling policy on an existing cluster. Provide the cluster name, and use the --autoscaling-policy flag to specify the policy id (the name of the policy you specified when you created the policy) or the policy resource URI (resource name) (see the AutoscalingPolicy id and name fields).

gcloud beta dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI

REST API

To enable an autoscaling policy on an existing cluster, set the AutoscalingConfig.policyUri of the policy in the updateMask of a clusters.patch request.

Console

Currently, enabling an autoscaling policy on an existing cluster is not supported in the Google Cloud Platform Console.

How autoscaling works

Autoscaling checks cluster Hadoop YARN metrics as each "cooldown" period elapses to determine whether to scale the cluster, and, if so, the magnitude of the update.

  1. On each evaluation, autoscaling examines pending and available cluster memory averaged over the last cooldown_period in order to determine the exact change needed to the number of workers:

    exact Δworkers = avg(pending memory - available memory) / memory per worker

    • pending memory is a signal that the cluster has tasks queued but not yet executed and may need to be scaled up to better handle its workload.
    • available memory is a signal that the cluster has extra bandwidth and may need to be scaled down to conserve resources.
    • See Autoscaling with Hadoop and Spark for additional information on these Apache Hadoop YARN metrics.
  2. Given the exact change needed to the number of workers, autoscaling uses a scaleUpFactor or scaleDownFactor to calculate the actual change to the number of workers:

    if exact Δworkers > 0:
      actual Δworkers = ROUND_UP(exact Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(exact Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(exact Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(exact Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(exact Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(exact Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(exact Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    A scaleUpFactor or scaleDownFactor of 1.0 means autoscaling will scale so that pending/available memory is 0 (perfect utilization).

  3. Once the actual change to the number of workers is calculated, either the scaleUpMinWorkerFraction and scaleDownMinWorkerFraction acts as threshold to determine if autoscaling will scale the cluster. A small fraction signifies that autoscaling should scale even if the Δworkers is small. A larger fraction means that scaling should only occur when the Δworkers is large.

    if (Δworkers >  scaleUpMinWorkerFraction* cluster size) then scale up
    or
    if (Δworkers >  scaleDownMinWorkerFraction* cluster size) then scale down

  4. If the number of workers to scale is large enough to trigger scaling, autoscaling uses the minInstances maxInstances bounds of workerConfig and secondaryWorkerConfig and weight (ratio of primary to secondary workers) to determine how to split the number of workers across the primary and secondary worker instance groups. The result of these calculations is the final autoscaling change to the cluster for the scaling period.

Multi-cluster policy usage

  • An autoscaling policy defines scaling behavior that can be applied to multiple clusters. An autoscaling policy is best applied across multiple clusters when the clusters will share similar workloads or run jobs with similar resource usage patterns.

  • You can update a policy that is being used by multiple clusters. The updates immediately affect the autoscaling behavior for all clusters that use the policy (see autoscalingPolicies.update). If you do not want a policy update to apply to a cluster that is using the policy, disable autoscaling on the cluster before updating the policy.

gcloud command

Run the following gcloud command from a local terminal or in Cloud Shell to disable autoscaling on a cluster.

gcloud beta dataproc clusters update cluster-name --disable-autoscaling

REST API

To disable autoscaling on a cluster, set AutoscalingConfig.policyUri to the empty string and set update_mask=config.autoscaling_config.policy_uri in a clusters.patch request.

Console

Currently, disabling autoscaling on a cluster is not supported in the Google Cloud Platform Console.

Autoscaling recommendations

  • Cooldown period. The minimum and default cooldownPeriod is two minutes. If a shorter cooldownPeriod is set in a policy, workload changes will more quickly affect cluster size, but clusters may unnecessarily scale up and down. The recommended practice is to set a policy's scale_up, scale_down, and min_worker_fractions to a non-zero value when using a shorter cooldownPeriod. This ensures that the cluster only scales up or down when the change in memory utilization is sufficient to warrant a cluster update.

  • Scaling Down MapReduce and Spark write intermediate shuffle data to local disk. Removing workers with shuffle data will job progress back, as map tasks will need to be re-run to reproduce the shuffle data. Spark re-runs entire stages if it detects shuffle files are missing.

  • Spark jobs with cached data Set spark.dynamicAllocation.cachedExecutorIdleTimeout or uncaching datasets when they are no longer needed. By default Spark does not remove executors that have cached data.

Autoscaling with Apache Hadoop and Apache Spark

The following sections discuss how autoscaling does (or does not) interoperate with Hadoop YARN and Hadoop Mapreduce, and Apache Spark, Spark Streaming, and Spark Structured Streaming.

Hadoop YARN Metrics

Autoscaling configures Hadoop YARN to schedule jobs based on YARN memory requests, not on YARN core requests.

Autoscaling is centered around the following Hadoop YARN metrics:

  1. Allocated memory refers to the total YARN memory taken up by running containers across the whole cluster. If there are 6 running containers that can use up to 1GB, there is 6GB of allocated memory.

  2. Available memory is YARN memory in the cluster not used by allocated containers. If there is 10GB of memory across all node managers and 6GB of allocated memory, there is 4GB of available memory. If there is available (unused) memory in the cluster, autoscaling may remove workers from the cluster.

  3. Pending memory is the sum of YARN memory requests for pending containers. Pending containers are waiting for space to run in YARN. Pending memory is non-zero only if available memory is zero or too small to allocate to the next container. If there are pending containers, autoscaling may add workers to the cluster.

You can view these metrics in Stackdriver Monitoring. YARN memory will be 0.8 * total memory on the cluster. Remaining memory is reserved for other daemons and operating system use, such as the page cache.

Autoscaling and Hadoop MapReduce

MapReduce runs each map and reduce task as a separate YARN container. When a job begins, MapReduce submits container requests for each map task, resulting in a large spike in pending YARN memory. As map tasks finish, pending memory decreases.

When mapreduce.job.reduce.slowstart.completedmaps have completed (95% by default on Cloud Dataproc), MapReduce enqueues container requests for all reducers, resulting in another spike in pending memory.

Unless your map and reduce tasks take several minutes or longer, don't set a high value for autoscaling scaleUpFactor. Adding workers to the cluster takes at least 1.5 minutes, so make sure there is sufficient pending work to utilize new worker for several minutes. A good starting point is to set scaleUpFactor to 0.05 (5%) or 0.1 (10%) of pending memory.

Autoscaling and Spark

Spark adds an additional layer of scheduling on top of YARN. Specifically, Spark Core’s dynamic allocation makes requests to YARN for containers to run Spark executors, then schedules Spark tasks on threads on those executors. Cloud Dataproc clusters enable dynamic allocation by default, so executors are added and removed as needed.

Spark always asks YARN for containers, but without dynamic allocation, it only asks for containers at the beginning of the job. With dynamic allocation, it will remove containers, or ask for new ones, as necessary.

Spark starts from a small number of executors – 2 on autoscaling clusters – and continues to double the number of executors while there are backlogged tasks. This smooths out pending memory (fewer pending memory spikes). It is recommended that you set the autoscaling scaleUpFactor to a large number, such as 1.0 (100%), for Spark jobs.

Disabling Spark dynamic allocation

If you are running separate Spark jobs that do not benefit from Spark dynamic allocation, you can disable Spark dynamic allocation by setting spark.dynamicAllocation.enabled=false and setting spark.executor.instances. You can still use autoscaling to scale clusters up and down while the separate Spark jobs run.

Autoscaling and Spark Streaming

  1. Since Spark Streaming has its own version of dynamic allocation that uses streaming-specific signals to add and remove executors, set spark.streaming.dynamicAllocation.enabled=true and disable Spark Core’s dynamic allocation by setting spark.dynamicAllocation.enabled=false.

  2. Don't use Graceful decommissioning (autoscaling gracefulDecommissionTimeout) with Spark Streaming jobs. Instead, to safely remove workers with autoscaling, configure checkpointing for fault tolerance.

Alternatively, to use Spark Streaming without autoscaling:

  1. Disable Spark Core’s dynamic allocation (spark.dynamicAllocation.enabled=false), and
  2. Set the number of executors (spark.executor.instances) for your job. See Cluster properties.

Autoscaling and Spark Structured Streaming

Autoscaling is not compatible with Spark Structured Streaming since Spark Structured Streaming currently does not support dynamic allocation (see SPARK-24815: Structured Streaming should support dynamic allocation).

Controlling autoscaling through partitioning and parallelism

While parallelism is usually set or determined by cluster resources (for example, the number of HDFS blocks controls by the number of tasks), with autoscaling the converse applies: autoscaling sets cluster resources (workers) according to job parallelism. The following are guidelines to help you set job parallelism:

  • While Cloud Dataproc sets the default number of MapReduce reduce tasks based on initial cluster size of your cluster, you can set mapreduce.job.reduces to increase the parallelism of the reduce phase.
  • Spark SQL and Dataframe parallelism is determined by spark.sql.shuffle.partitions, which defaults to 200.
  • Spark’s RDD functions default to spark.default.parallelism, which is related to the number of executors when the job starts. However, all RDD functions that create shuffles take a parameter for the number of partitions, which overrides spark.default.parallelism.

You should ensure your data is evenly partitioned. If there is significant key skew, one or more tasks may take significantly longer than other tasks, resulting in low utilization.

Autoscaling default Spark/Hadoop property settings

Autoscaling clusters have default cluster property values that help avoid job failure when primary workers are removed or secondary workers are preempted. You can override these default values when you create a cluster with autoscaling (see Cluster Properties).

Defaults to increase the maximum number of retries for tasks, application masters, and stages:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Defaults to reset retry counters (useful for long-running Spark Streaming jobs):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Default to have Spark’s slow-start dynamic allocation mechanism start from a small size:

spark:spark.executor.instances=2

Autoscaling metrics and logs

The following resources and tools can help you monitor autoscaling operations and their effect on your cluster and its jobs.

Stackdriver Monitoring

Use Stackdriver Monitoring to:

  • view the metrics used by autoscaling
  • view the number of Node Managers in your cluster
  • understand why autoscaling did or did not scale your cluster autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Stackdriver Logging

Use Stackdriver logging to view logs from the Cloud Dataproc Autoscaler.

1) Find logs for your cluster.

autoscaling-logs-for-cluster

2) Select dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Expand the log messages to view the status field. The logs are in JSON, a machine readable format.

autoscaling-three-logs autoscaling-update-operation

4) Expand the log message to see scaling recommendations, metrics used for scaling decisions, the original cluster size, and the new target cluster size.

autoscaling-recommendation-message

Frequently Asked Questions (FAQs)

Can autoscaling be enabled on High Availability clusters and Single Node clusters?

Autoscaling can be enabled on High Availability clusters, but not on Single Node clusters (Single Node clusters do not support resizing).

Can you manually resize an autoscaling cluster?

Yes. You may decide to manually resize a cluster as a stop-gap measure when tuning an autoscaling policy. However, these changes will only have a temporary effect, and Autoscaling will eventually scale back the cluster.

Instead of manually resizing an autoscaling cluster, consider:

Updating the autoscaling policy. Any changes made to the autoscaling policy will affect all clusters that are currently using the policy (see Multi-cluster policy usage).

Detaching the policy and manually scaling the cluster to the preferred size.

Getting Cloud Dataproc support.

What image versions support autoscaling? What API versions?

Autoscaling is supported through the v1beta2 API on cluster image versions 1.0.99+, 1.1.90+, 1.2.22+, 1.3.0+, and 1.4.0+ (see the Cloud Dataproc Version List) and through gcloud beta dataproc autoscaling-policies commands.

How is Cloud Dataproc different from Cloud Dataflow autoscaling?

See Comparing Cloud Dataflow autoscaling to Spark and Hadoop

Var denne side nyttig? Giv os en anmeldelse af den:

Send feedback om...

Cloud Dataproc Documentation
Har du brug for hjælp? Besøg vores supportside.