Dataproc Serverless for Spark autoscaling

When you submit your Spark workload, Dataproc Serverless for Spark can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. Dataproc Serverless autoscaling is the default behavior, and uses Spark dynamic resource allocation to determine whether, how, and when to scale your workload.

Dataproc Serverless autoscaling V2

Dataproc Serverless autoscaling version 2 (V2) adds features and improvements to default version 1 (V1) to help you manage Dataproc Serverless workloads, improve workload performance, and save costs:

  • Asynchronous node downscaling: Autoscaling V2 replaces V1's synchronous downscaling with asynchronous downscaling. Using asynchronous downscaling, Dataproc Serverless downscales workload resources without waiting for all nodes to finish shuffle migration. This means that long-tail nodes that scale down slowly won't block upscaling.
  • Intelligent scaling down node selection: Autoscaling V2 replaces V1's random node selection with an intelligent algorithm that identifies the best nodes to scale down first. This algorithm considers factors such as the node's shuffle data size and idle time.
  • Configurable Spark grace decommission and shuffle migration behavior: Autoscaling V2 lets you use standard Spark properties to configure Spark graceful decommissioning and shuffle migration. This feature can help you maintain migration compatibility with your customized Spark properties.

Dataproc Serverless autoscaling features

Feature Dataproc Serverless Autoscaling V1 Dataproc Serverless Autoscaling V2
Node downscaling Synchronous Asynchronous
Node selection for downscaling Random Intelligent
Spark graceful decommissioning and shuffle migration Not configurable Configurable

Spark dynamic allocation properties

The following table lists Spark Dynamic Allocation properties that you can set when you submit a batch workload to control autoscaling (see how to set Spark properties).

Property Description Default
spark.dataproc.scaling.version The Dataproc Serverless Spark autoscaling version. Specify version 1 or 2 (see Dataproc Serverless autoscaling V2). 1
spark.dynamicAllocation.enabled Whether to use dynamic resource allocation, which scales up and down the number of executors based on the workload. Setting the value to false disables autoscaling for the workload. Default: true. true
spark.dynamicAllocation.initialExecutors The initial number of executors allocated to the workload. After the workload starts, autoscaling may change the number of active executors. Minimum value is 2; maximum value is 500. 2
spark.dynamicAllocation.minExecutors The minimum number of executors to scale the workload down to. Minimum value is 2. 2
spark.dynamicAllocation.maxExecutors The maximum number of executors to scale the workload up to. Maximum value is 2000. 1000
spark.dynamicAllocation.executorAllocationRatio Customizes scaling up of the Spark workload. Accepts a value from 0 to 1. A value of 1.0 provides maximum scale-up capability and helps achieve maximum parallelism. A value of 0.5 sets scale-up capability and parallelism at one-half the max value. 0.3
spark.reducer.fetchMigratedShuffle.enabled When set to true, enables fetching the shuffle output location from the Spark driver after a fetch fails from an executor that was decommissioned due to Spark dynamic allocation. This reduces ExecutorDeadException errors caused by shuffle block migration from decommissioned executors to live executors, and reduces stage retries caused by FetchFailedException errors (see FetchFailedException caused by ExecutorDeadException). This property is available in Dataproc Serverless Spark runtime versions 1.1.12 and later and 2.0.20 and later. false

Spark dynamic allocation metrics

Spark batch workloads generate the metrics listed below related to Spark dynamic resource allocation (for additional information on Spark metrics, see Monitoring and Instrumentation).

Metric Description
maximum-needed The maximum number of executors needed under the current load to satisfy all running and pending tasks.
running The number of running executors executing tasks.

Spark dynamic allocation issues and solutions

  • FetchFailedException caused by ExecutorDeadException

    Cause: When Spark dynamic allocation scales down an executor, the shuffle file is migrated to live executors. However, since the Spark reducer task on an executor fetches shuffle output from the location set by the Spark driver when the reducer task started, if a shuffle file is migrated, the reducer can continue to attempt to fetch shuffle output from a decommissioned executor, causing ExecutorDeadException and FetchFailedException errors.

    Solution: Enable shuffle location refetching by setting the spark.reducer.fetchMigratedShuffle.enabled to true when you run your Dataproc Serverless for Spark batch workload (see Set Spark batch workload properties). When this property is enabled, the reducer task refetches the shuffle output location from the driver after a fetch from a decommissioned executor fails.