Estimating the “right” number of cluster workers (nodes) for a workload is difficult, and a single configuration for an entire pipeline often is not ideal. Cloud Dataproc helps you meet this challenge in two ways:
- User-initiated Cluster Scaling
- Cluster Autoscaling, which dynamically scales clusters up and down within min and max bounds set by the user after evaluating Apache Hadoop YARN cluster metrics found in Stackdriver Monitoring
Use Autoscaling with:
clusters that process many jobs
Use Autoscaling to:
scale up single-job clusters
Autoscaling is not recommended with/for:
HDFS: Autoscaling is not intended for scaling on-cluster HDFS. If you use Autoscaling with HDFS, make sure that the minimum number of primary workers is sufficient to handle all HDFS data. Also realize that decommissioning HDFS Datanodes can delay the removal of workers.
Idle Clusters: Autoscaling is not recommended for the purpose of scaling a cluster down to minimum size when the cluster is idle. Since creating a new cluster is as fast as resizing one, consider deleting idle clusters and recreating them instead. The following tools support this “ephemeral” model:
Use Cloud Dataproc Workflows to schedule a set of jobs on a dedicated cluster, and then delete the cluster when the jobs are finished. For more advanced orchestration, use Cloud Composer, which is based on Apache Airflow.
For clusters that process ad-hoc queries or externally scheduled workloads, use Cluster Scheduled Deletion to delete the cluster after a specified idle period or duration, or at a specific time.
Creating an Autoscaling cluster
You can create an autoscaling cluster by setting all required Autoscaling properties. Properties are specified with a "dataproc:alpha.autoscaling." prefix. Autoscaling property suffixes are listed in the Autoscaling properties table.
The following example creates an autoscaling cluster, setting the required Autoscaling properties:
gcloud beta dataproc clusters create cluster-name --properties "\ dataproc:alpha.autoscaling.enabled=true,\ dataproc:alpha.autoscaling.primary.max_workers=100,\ dataproc:alpha.autoscaling.secondary.max_workers=100,\ dataproc:alpha.autoscaling.cooldown_period=2m,\ dataproc:alpha.autoscaling.scale_up.factor=0.05,\ dataproc:alpha.autoscaling.graceful_decommission_timeout=0m"
Initial cluster size.
When you create an Autoscaling cluster, you are not required to set the initial number of primary and secondary workers. Autoscaling will start from the default cluster size (2 primary and 0 secondary workers) and scale up as necessary. However, setting an initial number of primary workers can help performance. For example, if you are certain that you need a large cluster, you can set the cluster to an initial large size to avoid waiting multiple Autoscaling periods for the cluster to reach the scaled-up size.
How Autoscaling works
Autoscaling checks cluster Hadoop YARN metrics as each "cool down" period elapses to determine whether to scale the cluster, and, if so, the magnitude of the update.
On each evaluation, Autoscaling examines pending and available cluster memory averaged over the last
cooldown_periodin order to determine the exact change needed to the number of workers:
exact Δworkers = avg(pending memory - available memory) / memory per worker
pending memoryis a signal that the cluster has tasks queued but not yet executed, and may need to be scaled up to better handle its workload.
available memoryis a signal that the cluster has extra bandwidth, and may need to be scaled down to conserve resources.
- See Autoscaling with Hadoop and Spark for additional information on these Apache Hadoop YARN metrics.
Given the exact change needed to the number of workers, Autoscaling uses a
scale_down.factorto calculate the actual change to the number of workers:
actual Δworkers = exact Δworkers * scale_factor
A scale_factor of 1.0 means Autoscaling will scale so that pending/available memory is 0 (perfect utilization).
Once the actual change to the number of workers is calculated, a
min_worker_fractionacts as a threshold to determine if Autoscaling will scale the cluster. A small
min_worker_fractionsignifies that Autoscaling should scale even if the
Δworkersis small. A larger
min_worker_fractionmeans that scaling should only occur when the
if (Δworkers > min_worker_fraction * cluster size) then scale
If the number of workers to scale is large enough to trigger scaling, Autoscaling uses the min/max bounds of the worker groups and the configured
secondary_worker_fractionto determine how to split the number of workers across the primary and secondary instance groups. The result of these calculations is the final Autoscaling change to the cluster for the scaling period.
Autoscaling is configured by setting properties. In the table, below:
- Standard workers (nodes) are referred to as "primary" workers
- Preemptible workers (nodes) are referred to as "secondary" workers
- Required properties have no default
- Property suffixes are listed in the table. The suffixes are added to the end
dataproc:alpha.autoscaling.prefix to form the property name (for example, the suffix for the property used to enable Autoscaling is
enabled, and the complete property name is
Required: boolean: true or false
||Minimum number of primary workers
Optional: int Bounds: [2, primary.max_workers] Default: 2
||Maximum number of primary workers
Required: int Bounds: [primary.min_workers, )
||Minimum number of secondary workers
Optional: int Bounds: [0, secondary.max_workers] Default: 0
||Maximum number of secondary workers. Since secondary VMs are terminated at least every 24 hours, they are best suited for short and non-critical workloads. Set to 0 to avoid using secondary workers.
Required: int Bounds: [secondary.max_workers, )
||Target fraction of secondary workers. The cluster may not reach this fraction if not allowed by worker-count bounds. For example, if
Optional: double Bounds: [0.0, 1.0] Default: 0.5
||Duration between scaling events. A scaling period starts after the update operation from the previous event has completed.
Required: duration (s,m,h,d) Bounds: [2m, )
||Fraction of average pending memory in the last cooldown period for which to add workers. A scale-up factor of 1.0 will result in scaling up so that there is no pending memory remaining after the update (more aggressive scaling). A scale-up factor closer to 0 will result in a smaller magnitude of scaling up (less aggressive scaling).
Required: double Bounds: [0.0, 1.0]
||Minimum scale-up threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2-worker scale-up for the cluster to scale. A threshold of 0 means the autoscaler will scale up on any recommended change.
Optional: double Bounds: [0.0, 1.0] Default: 0.0
||Fraction of average pending memory in the last cooldown period for which to remove workers. A scale-down factor of 1 will result in scaling down so that there is no available memory remaining after the update (more aggressive scaling). A scale-down factor of 0 disables removing workers, which can be beneficial for autoscaling a single job.
Optional: double Bounds: [0.0, 1.0] Default: 1.0
||Minimum scale-down threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2 worker scale-down for the cluster to scale. A threshold of 0 means the autoscaler will scale down on any recommended change.
Optional: double Bounds: [0.0, 1.0] Default: 0.0
||Timeout for YARN graceful decommissioning of Node Managers. Specifies the duration to wait for jobs to complete before forcefully removing workers (and potentially interrupting jobs). Only applicable to downscaling operations.
Required: duration (s,m,h,d) Bounds: [0s, 1d]
Autoscaling with Apache Hadoop and Apache Spark
The following sections discuss how Autoscaling does (or does not) interoperate with Hadoop YARN and Hadoop Mapreduce, and Apache Spark, Spark Streaming, and Spark Structured Streaming.
Hadoop YARN Metrics
Autoscaling configures Hadoop YARN to schedule jobs based on YARN memory requests, not on YARN core requests.
Autoscaling is centered around the following Hadoop YARN metrics:
Allocated memoryrefers to the total YARN memory taken up by running containers across the whole cluster. If there are 6 running containers that can use up to 1GB, there is 6GB of allocated memory.
Available memoryis YARN memory in the cluster not used by allocated containers. If there is 10GB of memory across all node managers and 6GB of allocated memory, there is 4GB of available memory. If there is available (unused) memory in the cluster, Autoscaling may remove workers from the cluster.
Pending memoryis the sum of YARN memory requests for pending containers. Pending containers are waiting for space to run in YARN. Pending memory is non-zero only if available memory is zero or too small to allocate to the next container. If there are pending containers, Autoscaling may add workers to the cluster.
You can view these metrics in Stackdriver Monitoring. YARN memory will be 0.8 * total memory on the cluster. Remaining memory is reserved for other daemons and operating system use, such as the page cache.
Autoscaling and Hadoop MapReduce
MapReduce runs each map and reduce task as a separate YARN container. When a job begins, MapReduce submits container requests for each map task, resulting in a large spike in pending YARN memory. As map tasks finish, pending memory decreases.
mapreduce.job.reduce.slowstart.completedmaps have completed (95% by default
on Cloud Dataproc), MapReduce enqueues container requests for all
reducers, resulting in another spike in pending memory.
Unless your map and reduce tasks take several minutes or longer, don't
set a high value for
dataproc:alpha.autoscaling.scale_up.factor. Adding workers to
the cluster takes at least 1.5 minutes, so make sure there is
sufficient pending work to utilize new worker for several minutes. A good starting
point is to set
dataproc:alpha.autoscaling.scale_up.factor to 0.05 (5%) or 0.1
(10%) of pending memory.
Autoscaling and Spark
Spark adds an additional layer of scheduling on top of YARN. Specifically, Spark Core’s dynamic allocation makes requests to YARN for containers to run Spark executors, then schedules Spark tasks on threads on those executors. Cloud Dataproc clusters enable dynamic allocation by default, so executors are added and removed as needed.
Spark always asks YARN for containers, but without dynamic allocation, it only asks for containers at the beginning of the job. With dynamic allocation, it will remove containers, or ask for new ones, as necessary.
Spark starts from a small number of executors – 2 on Autoscaling clusters – and
continues to double the number of executors while there are backlogged tasks.
This smooths out pending memory (fewer pending memory spikes). It is recommended that you set
dataproc:alpha.autoscaling.scale_up.factor to a large number, such as 1.0 (100%), for Spark jobs.
As a Spark default, executors with cached data do not exit
spark.dynamicAllocation.cachedExecutorIdleTimeout=0), and executors without
cached data will exit after 60 seconds
spark.dynamicAllocation.executorIdleTimeout=60). Since Cloud Dataproc
configures the external YARN-based shuffle service, executors often exit
before their shuffle data is served. Therefore, set a non-zero
cachedExecutorIdleTimeout so that executors with cached data eventually exit.
Also set a non-zero
to drain shuffle data on workers before Autoscaling removes them.
Disabling Spark dynamic allocation
If you are running separate Spark jobs that do not benefit from Spark dynamic
allocation, you can disable Spark dynamic allocation by setting
spark.dynamicAllocation.enabled=false and setting
You can still use Autoscaling to scale clusters up and down while the separate
Spark jobs run.
Autoscaling and Spark Streaming
To use Spark Streaming with Autoscaling:
Since Spark Streaming has its own version of dynamic allocation that uses streaming-specific signals to add and remove executors, set
spark.streaming.dynamicAllocation.enabled=trueand disable Spark Core’s dynamic allocation by setting
Until the issue of Spark Streaming dynamic allocation should respect spark.executor.instances is fixed, use an initialization action to remove spark.executor.instances from
Graceful decommissioning (
dataproc:alpha.autoscaling.graceful_decommission_timeout) does not apply to Spark Streaming jobs. Instead, to safely remove worker with Autoscaling, configure checkpointing for fault tolerance.
Alternatively, to use Spark Streaming without Autoscaling:
- Disable Spark Core’s dynamic allocation (
- Set the number of executors (
spark.executor.instances) for your job. See Cluster properties.
Autoscaling and Spark Structured Streaming
Autoscaling is not compatible with Spark Structured Streaming since Spark Structured Streaming currently does not support dynamic allocation (see SPARK-24815: Structured Streaming should support dynamic allocation).
Downscaling considerations and recommendations
Shuffle data: In Hadoop MapReduce, map tasks write shuffle data to local disk, which is served to reducer tasks via a server running on the Node Manager. Removing workers with shuffle data, even when tasks are not running, is problematic since doing so can set job progress back if map tasks need to be re-run. Spark also shuffles data between stage boundaries, and if it detects missing shuffle files, it re-runs the entire stage.
Recommendation: On multi-job clusters, set
dataproc:alpha.autoscaling.graceful_decommission_timeoutso that in-progress jobs have sufficient time to complete before Autoscaling removes workers. Generally, it's best to set the timeout to the duration of your longest job to make sure all jobs complete before workers are removed.
Alternate Strategy 1: Disable Autoscaling downscaling by setting
dataproc:alpha.autoscaling.scale_down.factor=0.0so that workers will not be removed regardless of available memory size. This strategy can be useful for single-job clusters where downscaling can set back job progress.
Alternate Strategy 2: Set the cluster to scale down only when the cluster is idle by setting
Cached data: Spark can cache datasets in executor memory or disk. Usually, Spark executors exit when they have no work to process, but if they have cached data, by default they do not (never) exit. Therefore, for notebooks and other applications that maintain cached data, the executors will stay alive—and Autoscaling will not scale down the cluster—after the applications are no longer needed.
Recommendation: To make sure executors with cached data exit eventually, consider setting
spark.dynamicAllocation.cachedExecutorIdleTimeoutor uncaching datasets when they are no longer needed.
Controlling Autoscaling through partitioning and parallelism
While parallelism is usually set or determined by cluster resources (for example, the number of HDFS blocks controls by the number of tasks), with Autoscaling, the converse applies: Autoscaling sets cluster resources (workers) according to job parallelism. The following are guidelines to help you set job parallelism:
- While Cloud Dataproc sets the default number of MapReduce reduce tasks based
on initial cluster size of your cluster, you can set
mapreduce.job.reducesto increase the parallelism of the reduce phase.
- Spark SQL and Dataframe parallelism is determined by
spark.sql.shuffle.partitions, which defaults to 200.
- Spark’s RDD functions default to
spark.default.parallelism, which is related to the number of executors when the job starts. However, all RDD functions that create shuffles take a parameter for the number of partitions, which overrides
You should ensure your data is evenly partitioned. If there is significant key skew, one or more tasks may take significantly longer than other tasks, resulting in low utilization.
Autoscaling default Spark/Hadoop property settings
Autoscaling clusters have default cluster property values that help avoid job failure when primary workers are removed or secondary workers are preempted. You can override default values when you create a cluster with Autoscaling (see Cluster Properties.
Defaults to increase the maximum number of retries for tasks, application masters, and stages:
yarn:yarn.resourcemanager.am.max-attempts=10 mapred:mapreduce.map.maxattempts=10 mapred:mapreduce.reduce.maxattempts=10 spark:spark.task.maxFailures=10 spark:spark.stage.maxConsecutiveAttempts=10
Defaults to reset retry counters (useful for long-running Spark Streaming jobs):
Default to have Spark’s slow-start dynamic allocation mechanism start from a small size:
Autoscaling metrics and logs
The following resources and tools can help you monitor Autoscaling operations and their effect on your cluster and its jobs.
Use Stackdriver Monitoring to:
- view the metrics used by Autoscaling
- view the number of Node Managers in your cluster
- understand why Autoscaling did or did not scale your cluster
Use Stackdriver logging to view logs from the Cloud Dataproc Autoscaler.
1) Find logs for your cluster.
3) Expand the log messages to view the
status field. The logs are in JSON, a
machine readable format.
4) Expand the log message to see scaling recommendations, metrics used for scaling decisions, the original cluster size, and the new target cluster size.
Frequently Asked Questions (FAQs)
Can Autoscaling be enabled on High Availability clusters and Single Node clusters?
Can you modify Autoscaling properties after cluster creation or on a running cluster?
No. Currently, Autoscaling properties cannot be modified after a cluster has been created, nor can they be added to running clusters.
Can you manually resize an Autoscaling cluster?
Yes. You may decide to manually resize a cluster as a stop-gap measure if Autoscaling is making poor decisions or you want to adjust workload configuration settings. However, these changes will only have a temporary effect, and Autoscaling will eventually scale back the cluster.
Instead of manually resizing a cluster, consider:
Deleting and recreating the cluster with a better workload configuration.
What image versions support autoscaling? What API versions?
Autoscaling is supported on versions 1.2.22+ and 1.3.0+. Autoscaling is not currently supported on versions 1.0 and 1.1. (see the Cloud Dataproc Version List). . Autoscaling is currently in Alpha release, but can be enabled through the Cloud Dataproc v1 or v1beta2 APIs.
How is Cloud Dataproc different from Cloud Dataflow autoscaling?
Autoscaling has many settable properties. Is there a simpler way to create Autoscaling clusters?
gcloud beta dataproc clusters create-from-file --file=cluster.yaml $ cat cluster.yaml projectId: PROJECT clusterName: NAME config: gceClusterConfig: zoneUri: us-central1-a softwareConfig: properties: dataproc:alpha.autoscaling.enabled: 'true' dataproc:alpha.autoscaling.primary.max_workers: '100' dataproc:alpha.autoscaling.secondary.max_workers: '100' dataproc:alpha.autoscaling.cooldown_period: '2m' dataproc:alpha.autoscaling.scale_up.factor: '0.05' dataproc:alpha.autoscaling.graceful_decommission_timeout: '0m'