When you submit your Spark workload, Dataproc Serverless for Spark can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. Dataproc Serverless autoscaling is the default behavior, and uses Spark dynamic resource allocation to determine whether, how, and when to scale your workload.
Dataproc Serverless autoscaling V2
Dataproc Serverless autoscaling version 2 (V2) adds features and improvements to default version 1 (V1) to help you manage Dataproc Serverless workloads, improve workload performance, and save costs:
- Asynchronous node downscaling: Autoscaling V2 replaces V1's synchronous downscaling with asynchronous downscaling. Using asynchronous downscaling, Dataproc Serverless downscales workload resources without waiting for all nodes to finish shuffle migration. This means that long-tail nodes that scale down slowly won't block upscaling.
- Intelligent scaling down node selection: Autoscaling V2 replaces V1's random node selection with an intelligent algorithm that identifies the best nodes to scale down first. This algorithm considers factors such as the node's shuffle data size and idle time.
- Configurable Spark grace decommission and shuffle migration behavior: Autoscaling V2 lets you use standard Spark properties to configure Spark graceful decommissioning and shuffle migration. This feature can help you maintain migration compatibility with your customized Spark properties.
Dataproc Serverless autoscaling features
Feature | Dataproc Serverless Autoscaling V1 | Dataproc Serverless Autoscaling V2 |
Node downscaling | Synchronous | Asynchronous |
Node selection for downscaling | Random | Intelligent |
Spark graceful decommissioning and shuffle migration | Not configurable | Configurable |
Spark dynamic allocation properties
The following table lists Spark Dynamic Allocation properties that you can set when you submit a batch workload to control autoscaling (see how to set Spark properties).
Property | Description | Default |
---|---|---|
spark.dataproc.scaling.version |
The Dataproc Serverless Spark autoscaling version. Specify
version 1 or 2 (see
Dataproc Serverless autoscaling V2). |
1 |
spark.dynamicAllocation.enabled |
Whether to use dynamic resource allocation, which scales up and down the
number of executors based on the workload.
Setting the value to false disables autoscaling
for the workload. Default: true . |
true |
spark.dynamicAllocation.initialExecutors |
The initial number of executors allocated to the workload. After the
workload starts, autoscaling may change the number of active executors.
Minimum value is 2 ; maximum value is 500 . |
2 |
spark.dynamicAllocation.minExecutors |
The minimum number of executors to scale the workload down to.
Minimum value is 2 . |
2 |
spark.dynamicAllocation.maxExecutors |
The maximum number of executors to scale the workload up to.
Maximum value is 2000 . |
1000 |
spark.dynamicAllocation.executorAllocationRatio |
Customizes scaling up of the Spark workload. Accepts a value from
0 to 1 . A value of 1.0
provides maximum scale-up capability and helps achieve maximum
parallelism. A value of 0.5 sets scale-up capability and
parallelism at one-half the max value. |
0.3 |
spark.reducer.fetchMigratedShuffle.enabled |
When set to true , enables fetching the shuffle output location from the Spark driver after
a fetch fails from an executor that was decommissioned due to Spark dynamic allocation. This
reduces ExecutorDeadException errors caused by shuffle block migration
from decommissioned executors to live executors, and reduces stage retries caused by FetchFailedException
errors (see
FetchFailedException caused by ExecutorDeadException).
This property is available in Dataproc Serverless
Spark runtime
versions
1.1.12 and later and 2.0.20 and later. |
false |
Spark dynamic allocation metrics
Spark batch workloads generate the metrics listed below related to Spark dynamic resource allocation (for additional information on Spark metrics, see Monitoring and Instrumentation).
Metric | Description |
---|---|
maximum-needed |
The maximum number of executors needed under the current load to satisfy all running and pending tasks. |
running |
The number of running executors executing tasks. |
Spark dynamic allocation issues and solutions
FetchFailedException caused by ExecutorDeadException
Cause: When Spark dynamic allocation scales down an executor, the shuffle file is migrated to live executors. However, since the Spark reducer task on an executor fetches shuffle output from the location set by the Spark driver when the reducer task started, if a shuffle file is migrated, the reducer can continue to attempt to fetch shuffle output from a decommissioned executor, causing
ExecutorDeadException
andFetchFailedException
errors.Solution: Enable shuffle location refetching by setting the
spark.reducer.fetchMigratedShuffle.enabled
totrue
when you run your Dataproc Serverless for Spark batch workload (see Set Spark batch workload properties). When this property is enabled, the reducer task refetches the shuffle output location from the driver after a fetch from a decommissioned executor fails.