以下部分提供了相关提示,可帮助您微调 Dataproc Spark 应用。
使用临时集群
当您使用 Dataproc“临时”集群模型, 一个专用集群,在该作业完成后,您可以删除集群。 借助暂时性模型,您可以将存储和计算分开处理,将作业输入和输出数据保存在 Cloud Storage 或 BigQuery 中,仅将集群用于计算和临时数据存储。
永久性集群问题
使用短时单任务集群可避免使用共享且长时间运行的“永久性”集群时出现以下陷阱和潜在问题:
- 单点故障:共享集群错误状态可能会导致所有作业失败, 阻止整个数据流水线。调查错误并从中恢复可能需要数小时。由于临时集群仅保留临时的集群内状态,因此当发生错误时,可以快速删除并重新创建这些集群。
- 在 HDFS、MySQL 或本地文件系统中难以维护和迁移集群状态
- 对 SLO 产生负面影响的作业之间的资源争用
- 由内存压力导致的服务守护程序无响应
- 日志和临时文件的累积可能会超出磁盘容量
- 由于集群可用区缺货,扩容失败
- 不支持过时的集群映像版本。
临时集群的优势
从积极的方面来看,借助临时集群,您可以执行以下操作:
- 为具有不同配置的不同作业配置不同的 IAM 权限 Dataproc 虚拟机服务账号。
- 针对每个作业优化集群的硬件和软件配置,并根据需要更改集群配置。
- 升级新集群中的映像版本以获取最新的安全性 补丁、bug 修复和优化
- 在隔离的单作业集群上更快地排查问题。
- 只需为临时集群的运行时间付费,而无需为共享集群上作业之间的空闲时间付费,从而节省费用。
使用 Spark SQL
Spark SQL DataFrame API 对 RDD API 进行了重大优化。如果您与使用 RDD 的代码进行交互,请考虑先将数据读取为 DataFrame,然后再在代码中传递 RDD。在 Java 或 Scala 代码中,请考虑将 Spark SQL Dataset API 用作 RDD 和 DataFrame 的超集。
使用 Apache Spark 3
Dataproc 2.0 会安装 Spark 3,其中包含以下功能和性能改进:
- GPU 支持
- 能够读取二进制文件
- 性能改进
- 动态分区修剪
- 自适应查询执行,可实时优化 Spark 作业
使用动态分配
Apache Spark 包含动态分配功能,
集群中工作器的 Spark 执行器数量。借助此功能,即使集群扩容,作业也能使用整个 Dataproc 集群。Dataproc 上默认启用此功能
(spark.dynamicAllocation.enabled
设置为 true
)。请参阅
Spark 动态分配
。
使用 Dataproc 自动扩缩功能
Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。
最佳实践是将自动扩缩政策配置为仅扩缩辅助工作器。
使用 Dataproc 增强的灵活性模式
如果工作器在向 reducer 提交 shuffle 数据之前被抢占或移除,则包含可抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 异常。此异常可能会导致任务重试和更长的作业时间 完成时间。
建议:使用 Dataproc 增强的灵活性模式,该模式不会将中间 shuffle 数据存储到辅助工作器上,因此可以安全地抢占或缩减辅助工作器。
配置分区和洗牌
Spark 会将数据存储在集群上的临时分区中。如果您的应用对 DataFrame 进行分组或联接,则会重排数据 根据分组和低层级配置复制到新的分区中。
数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。
配置分区
以下属性用于控制分区的数量和大小:
spark.sql.files.maxPartitionBytes
: 从 Cloud Storage 中读取数据默认值为 128MB 足以处理大多数处理小于 100 TB 的应用。spark.sql.shuffle.partitions
:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。spark.default.parallelism
:在 执行需要重排的 RDD 转换,例如join
,reduceByKey
和parallelize
。默认值为集群中的 vCPU 总数。在 Spark 作业中使用 RDD 时,您可以将此数量设置为 vCPU 数量的 3 倍
限制文件数量
当 Spark 读取大量小文件时,性能会下降。 以较大的文件大小存储数据,例如, 介于 256MB–512MB 之间。 同样,限制输出文件的数量(要强制重排,请参阅 避免不必要的重排)。
配置自适应查询执行 (Spark 3)
自适应查询执行 (在 Dataproc 映像版本 2.0 中默认启用) 有助于提升 Spark 作业性能,包括:
虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes
设置为 spark.sqlfiles.maxPartitionBytes
(默认值为 128 MB)会很有帮助。
避免不必要的重排
Spark 允许用户手动触发重排以使用 repartition
函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。
例外情况:将按列分区的数据写入 Cloud Storage 时,对特定列进行重新分区可避免写入许多小文件,从而缩短写入时间。
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
在 Parquet 或 Avro 中存储数据
Spark SQL 默认在 Snappy 压缩的 Parquet 文件中读取和写入数据。Parquet 采用高效的列式格式 一种文件格式,使得 Spark 能够只读取执行 应用。在处理各种事务时 大型数据集。其他列格式(例如 Apache ORC)的性能也很出色。
对于非列式数据,Apache Avro 提供了高效的二进制行文件格式。虽然通常比 Parquet 慢, Avro 的性能优于 CSV 或 JSON 等基于文本的格式。
优化磁盘大小
永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。
如需在 Google Cloud 控制台中监控工作器磁盘吞吐量,请执行以下操作:
- 在集群页面上,点击集群名称。
- 点击“虚拟机实例”标签页。
- 点击任意工作器名称。
- 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。
磁盘注意事项
临时 Dataproc 集群,此类集群 永久性存储的优势可以使用本地 SSD。 本地 SSD 以物理方式挂接到集群,并且比永久性磁盘提供更高的吞吐量(请参阅性能表)。本地 SSD 的可用固定大小为 375 GB,但您可以添加多个 SSD 以提高性能。
集群关停后,本地 SSD 不会保留数据。如果您需要永久性磁盘 您可以使用 SSD 永久性磁盘 提供更高的吞吐量 比标准永久性磁盘要大如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。
将 GPU 挂接到集群
Spark 3 支持 GPU。将 GPU 与 RAPIDS 初始化操作搭配使用,以使用 RAPIDS SQL 加速器加快 Spark 作业的速度。GPU 驱动程序初始化操作,用于配置包含 GPU 的集群。
常见的作业故障和修复方案
内存不足
示例:
- “执行器丢失”
- “java.lang.OutOfMemoryError:已超出 GC 开销限制”
- “由于超出内存限制,YARN 已终止容器”
可能的修复方案:
重排提取失败
示例:
- “FetchFailedException”(Spark 错误)
- “未能连接到...”(Spark 错误)
- “无法提取”(MapReduce 错误)
通常由提前移除仍具有 shuffle 功能的 worker 导致 提供数据
可能的原因和解决方法:
- 抢占式工作器虚拟机被收回,或非抢占式工作器虚拟机被回收 由自动扩缩器移除解决方案:使用增强的灵活性模式,让辅助工作器能够安全地被抢占或可扩缩。
- 执行器或映射器因 OutOfMemory 错误而崩溃。解决方法: 增加执行器或映射器的内存
- Spark 分区混洗服务可能过载。解决方案:减少作业分区的数量。
YARN 节点运行状况不佳
示例(来自 YARN 日志):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
通常与用于随机播放数据的磁盘空间不足有关。诊断依据 查看日志文件:
- 打开项目的 集群 页面,然后点击该集群的名称。
- 点击“查看日志”。
- 按“
hadoop-yarn-nodemanager
”过滤日志。 - 搜索“UNHEALTHY”。
可能的修复方法:
- 用户缓存存储在由
yarn.nodemanager.local-dirs
属性(位于yarn-site.xml file
。此文件位于/etc/hadoop/conf/yarn-site.xml
。您可以查看/hadoop/yarn/nm-local-dir
路径中的可用空间,并通过删除/hadoop/yarn/nm-local-dir/usercache
用户缓存文件夹来释放空间。 - 如果日志报告“UNHEALTHY”重新创建集群 具有较大的磁盘空间 提高吞吐量上限。
作业因驱动程序内存不足而失败
在集群模式下运行作业时,如果主节点的内存大小明显大于工作器节点的内存大小,则作业会失败。
驱动程序日志的示例:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
可能的修复方法:
- 将
spark:spark.driver.memory
设置为小于yarn:yarn.scheduler.maximum-allocation-mb
。 - 为主节点和工作器节点使用同一机器类型。
了解详情
- 请参阅 Spark 性能调优