提交 Spark 工作负载时,Dataproc Serverless for Spark 可以动态扩缩工作负载资源(例如执行程序的数量),以高效地运行工作负载。Dataproc 无服务器自动扩缩是默认行为,它使用 Spark 动态资源分配来确定是否、如何以及何时扩缩您的工作负载。
Dataproc 无服务器自动扩缩 V2
Dataproc 无服务器自动扩缩版本 2 (V2) 为默认版本 1 (V1) 添加了功能和改进,可帮助您管理 Dataproc Serverless 工作负载、提高工作负载性能并节省费用:
- 异步节点缩减:自动扩缩 V2 用异步缩减取代 V1 的同步缩减。使用异步缩减,Dataproc Serverless 缩减工作负载资源,而无需等待所有节点完成 shuffle 迁移。这意味着缓慢纵向缩容的长尾节点不会阻碍纵向扩容。
- 智能缩减节点选择:自动伸缩 V2 使用智能算法来标识要首先缩减的最佳节点,从而替换 V1 的随机节点选择。此算法会考虑节点的 shuffle 数据大小和空闲时间等因素。
- 可配置的 Spark 宽限期停用和 shuffle 迁移行为:自动扩缩 V2 允许您使用标准 Spark 属性来配置 Spark 安全停用和 shuffle 迁移。此功能可帮助您保持与自定义 Spark 属性的迁移兼容性。
Dataproc 无服务器自动扩缩功能
特征 | Dataproc 无服务器自动扩缩 V1 | Dataproc 无服务器自动扩缩 V2 |
节点缩减 | 同步 | 异步 |
用于缩减的节点选择 | 随机 | 智能 |
Spark 安全停用和 shuffle 迁移 | 不可配置 | 可配置 |
Spark 动态分配属性
下表列出了在提交批量工作负载以控制自动扩缩时您可以设置的 Spark 动态分配属性(请参阅如何设置 Spark 属性)。
属性 | 说明 | 默认 |
---|---|---|
spark.dataproc.scaling.version |
Dataproc Serverless Spark 自动扩缩版本。指定版本 1 或 2 (请参阅 Dataproc 无服务器自动扩缩 V2)。 |
1 |
spark.dynamicAllocation.enabled |
是否使用动态资源分配,该功能可根据工作负载扩缩执行程序的数量。将值设置为 false 会停用工作负载的自动扩缩功能。默认值:true 。 |
true |
spark.dynamicAllocation.initialExecutors |
分配给工作负载的初始执行器数量。工作负载启动后,自动扩缩可能会更改活跃执行程序的数量。最小值为 2 ;最大值为 500 。 |
2 |
spark.dynamicAllocation.minExecutors |
要缩减工作负载的执行器数量下限。最小值为 2 。 |
2 |
spark.dynamicAllocation.maxExecutors |
工作负载扩容到的执行程序数上限。最大值为 2000 。 |
1000 |
spark.dynamicAllocation.executorAllocationRatio |
自定义 Spark 工作负载的纵向扩容。接受的值为 0 到 1 。值为 1.0 可提供最大的纵向扩容功能,并有助于实现最大并行数量。值为 0.5 会将纵向扩容功能和并行性设置为最大值的一半。 |
0.3 |
spark.reducer.fetchMigratedShuffle.enabled |
设置为 true 时,如果从因 Spark 动态分配而停用的执行程序中提取失败,则启用从 Spark 驱动程序提取 shuffle 输出位置。这样可以减少因 shuffle 块迁移(从已停用的执行器向活跃执行程序)而引起的 ExecutorDeadException 错误,并减少由 FetchFailedException 错误导致的阶段重试次数(请参阅由 ExecutorDeadException 导致的 FetchFailedException)。
此属性适用于 Dataproc Serverless Spark 运行时版本
1.1.12 及更高版本以及 2.0.20 及更高版本。 |
false |
Spark 动态分配指标
Spark 批量工作负载会生成下面列出的与 Spark 动态资源分配相关的指标(如需详细了解 Spark 指标,请参阅监控和插桩)。
指标 | 说明 |
---|---|
maximum-needed |
当前负载下满足所有正在运行的任务和待处理任务所需的执行程序数上限。 |
running |
正在执行任务的执行程序数量。 |
Spark 动态分配问题与解决方案
由 ExecutorDeadException 导致的 FetchFailedException
原因:当 Spark 动态分配对执行器进行缩容时,重排文件会迁移到实时执行器。但是,由于执行器上的 Spark 缩减器任务会从 Spark 驱动程序启动的位置提取重排输出,因此如果迁移重排文件,则缩减器可以继续尝试从已停用的执行器提取重排输出,从而导致出现
ExecutorDeadException
和FetchFailedException
错误。解决方案:运行 Dataproc Serverless for Spark 批量工作负载时,通过将
spark.reducer.fetchMigratedShuffle.enabled
设置为true
来启用 shuffle 位置重新提取(请参阅设置 Spark 批量工作负载属性)。启用此属性后,从已停用的执行器提取操作失败后,缩减器任务会从驱动程序重新提取 shuffle 输出位置。