本文档介绍了适用于 Spark 自动扩缩的 Dataproc Serverless。在提交 Spark 工作负载时,Dataproc Serverless for Spark 可以动态扩缩工作负载资源(例如执行程序的数量),以便高效地运行工作负载。Dataproc Serverless 自动扩缩是默认行为,它使用 Spark 动态资源分配来确定是否、如何以及何时扩缩工作负载。
Dataproc Serverless 自动扩缩 V2
Dataproc Serverless 自动扩缩版本 2 (V2) 在默认版本 1 (V1) 的基础上新增了一些功能并进行了一些改进,可帮助您管理 Dataproc Serverless 工作负载、提升工作负载性能并节省费用:
- 异步节点缩减:自动扩缩 V2 将 V1 的同步缩减替换为异步缩减。借助异步缩减功能,Dataproc Serverless 可以缩减工作负载资源,而无需等待所有节点完成 shuffle 迁移。这意味着,缩减速度缓慢的长尾节点不会阻止上缩。
- 智能缩减节点选择:自动伸缩 V2 使用智能算法取代了 V1 的随机节点选择,该算法可确定首先缩减的最佳节点。此算法会考虑节点的随机数据大小和空闲时间等因素。
- 可配置的 Spark 有序停用和 shuffle 迁移行为:借助自动扩缩 V2,您可以使用标准 Spark 属性来配置 Spark 有序停用和 shuffle 迁移。此功能可帮助您保持与自定义 Spark 属性的迁移兼容性。
Dataproc Serverless 自动扩缩功能
功能 | Dataproc Serverless 自动扩缩 V1 | Dataproc Serverless 自动扩缩 V2 |
节点缩减 | 同步 | 异步 |
缩减节点的选择 | 随机 | 智能 |
Spark 优雅停用和 shuffle 迁移 | 不可配置 | 可配置 |
Spark 动态分配属性
下表列出了 Spark 动态分配属性,您可以在提交批处理工作负载时设置这些属性以控制自动扩缩(请参阅如何设置 Spark 属性)。
属性 | 说明 | 默认 |
---|---|---|
spark.dataproc.scaling.version |
Dataproc Serverless Spark 自动扩缩版本。指定版本 1 或 2 (请参阅 Dataproc Serverless 自动扩缩 V2)。 |
1 |
spark.dynamicAllocation.enabled |
是否使用动态资源分配,该功能会根据工作负载扩缩和缩减执行程序的数量。
将该值设置为 false 会停用工作负载的自动扩缩功能。默认值:true 。 |
true |
spark.dynamicAllocation.initialExecutors |
分配给工作负载的初始执行程序数量。在工作负载启动后,自动扩缩功能可能会更改活跃执行程序的数量。最小值为 2 ;最大值为 500 。 |
2 |
spark.dynamicAllocation.minExecutors |
工作负载缩减到的最小执行程序数量。
最小值为 2 。 |
2 |
spark.dynamicAllocation.maxExecutors |
可将工作负载扩容到的执行程序数量上限。
最大值为 2000 。 |
1000 |
spark.dynamicAllocation.executorAllocationRatio |
自定义 Spark 工作负载的扩容。接受的值介于 0 和 1 之间。值为 1.0 时,可提供最大扩容能力,并有助于实现最大并行度。值 0.5 会将扩容能力和并行性设置为最大值的一半。 |
0.3 |
spark.reducer.fetchMigratedShuffle.enabled |
设置为 true 后,如果从因 Spark 动态分配而已弃用的执行器提取失败,则启用从 Spark 驱动程序提取 Shuffle 输出位置。这会减少由从已停用执行器迁移到活跃执行器的 Shuffle 分块引起的 ExecutorDeadException 错误,并减少由 FetchFailedException 错误引起的阶段重试(请参阅由 ExecutorDeadException 引起的 FetchFailedException)。
此属性适用于 Dataproc Serverless Spark 运行时版本
1.1.12 及更高版本,以及 2.0.20 及更高版本。 |
false |
Spark 动态分配指标
Spark 批处理工作负载会生成与 Spark 动态资源分配相关的以下指标(如需详细了解 Spark 指标,请参阅监控和插桩)。
指标 | 说明 |
---|---|
maximum-needed |
在当前负载下满足所有正在运行和待处理任务所需的执行器数量上限。 |
running |
正在执行任务的正在运行的执行程序的数量。 |
Spark 动态分配问题和解决方案
由 ExecutorDeadException 导致的 FetchFailedException
原因:当 Spark 动态分配缩减执行器时,Shuffle 文件会迁移到活跃的执行器。不过,由于执行器上的 Spark Reducer 任务会在 Reducer 任务启动时从 Spark 驱动程序设置的位置提取 Shuffle 输出,因此如果迁移了 Shuffle 文件,Reducer 可能会继续尝试从已停用的执行器提取 Shuffle 输出,从而导致
ExecutorDeadException
和FetchFailedException
错误。解决方案:在运行 Dataproc Serverless for Spark 批处理工作负载时,将
spark.reducer.fetchMigratedShuffle.enabled
设置为true
,以启用 shuffle 位置重新提取(请参阅设置 Spark 批处理工作负载属性)。启用此属性后,如果从已弃用的执行器提取失败,则缩减器任务会从驱动程序重新提取 Shuffle 输出位置。