以下部分提供了相关提示来帮助您微调 Dataproc Spark 应用。
使用临时集群
当您使用 Dataproc“临时”集群模型, 为每项作业创建一个专用集群,并且在作业完成时,您可以删除该集群。 使用临时模型,您可以分别处理存储和计算, 在 Cloud Storage 或 BigQuery 中保存作业输入和输出数据, 仅使用集群来存储计算和临时数据。
永久性集群问题
使用临时的单作业集群可避免以下误区和潜在 与使用共享的长期运行的“永久性”集群:
- 单点故障:共享集群错误状态可能会导致所有作业失败, 阻止整个数据流水线。调查错误并从错误中恢复 可能需要几个小时才能完成。由于临时集群仅保留集群内临时状态, 在发生错误时,可以快速删除并重新创建它们。
- 在 HDFS、MySQL 或本地文件系统中难以维护和迁移集群状态
- 对 SLO 产生负面影响的作业之间的资源争用
- 由内存压力导致的服务守护程序无响应
- 累积超出磁盘容量的日志和临时文件
- 由于集群可用区缺货导致的扩容失败
- 不支持过时的集群映像版本。
临时集群的优势
从积极的方面来看,临时集群可让您执行以下操作:
- 为具有不同配置的不同作业配置不同的 IAM 权限 Dataproc 虚拟机服务账号。
- 针对每个集群优化集群的硬件和软件配置 根据需要更改集群配置。
- 在新集群中升级映像版本以获取最新的安全性 补丁、bug 修复和优化
- 在隔离的单作业集群上更快地排查问题。
- 只需为临时集群运行时间付费,无需付费,从而节省费用 用于计算共享集群上作业之间的空闲时间。
使用 Spark SQL
Spark SQL DataFrame API 对 RDD API 进行了重大优化。如果 与使用 RDD 的代码交互时 在代码中传递 RDD 之前,先将数据作为 DataFrame 进行读取。 在 Java 或 Scala 代码中,请考虑将 Spark SQL Dataset API 用作 RDD 和 DataFrame 的超集。
使用 Apache Spark 3
Dataproc 2.0 安装 Spark 3,它具有以下功能和性能 改进:
- GPU 支持
- 能够读取二进制文件
- 性能改进
- 动态分区删减
- 自适应查询执行,可实时优化 Spark 作业
使用动态分配
Apache Spark 包含动态分配功能,
集群中工作器的 Spark 执行器数量。借助此功能,您可以
使用整个 Dataproc 集群的作业,即使集群
Dataproc 上默认启用此功能
(spark.dynamicAllocation.enabled
设置为 true
)。请参阅
Spark 动态分配
。
使用 Dataproc 自动扩缩功能
Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。
使用 Dataproc 增强的灵活模式
具有抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 工作器在完成服务之前被抢占或移除时的异常 shuffle 数据到缩减器。此异常可能会导致任务重试和更长的作业时间 完成时间。
建议:使用 Dataproc 增强的灵活性模式, 它不会在辅助工作器上存储中间 shuffle 数据, 辅助工作器可以被安全地抢占或缩容。
配置分区和重排
Spark 将数据存储在 集群。如果您的应用对 DataFrame 进行分组或联接,则会重排数据 根据分组和低层级配置转移到新分区中。
数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。
配置分区
以下属性用于控制分区的数量和大小:
spark.sql.files.maxPartitionBytes
: 从 Cloud Storage 中读取数据默认值为 128MB 足以处理大多数处理小于 100 TB 的应用。spark.sql.shuffle.partitions
:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。spark.default.parallelism
:在 执行需要重排的 RDD 转换,例如join
,reduceByKey
和parallelize
。默认值为 vCPU 的总数 集群内在 Spark 作业中使用 RDD 时,您可以设置 提高到 vCPU 数量的 3 倍
限制文件数量
当 Spark 读取大量小文件时,性能会下降。 以较大的文件大小存储数据,例如, 介于 256MB–512MB 之间。 同样,限制输出文件的数量(要强制重排,请参阅 避免不必要的重排)。
配置自适应查询执行 (Spark 3)
自适应查询执行 (在 Dataproc 映像版本 2.0 中默认启用) 有助于提升 Spark 作业性能,包括:
虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes
设置为 spark.sqlfiles.maxPartitionBytes
(默认值为 128 MB)会很有帮助。
避免不必要的重排
Spark 允许用户手动触发重排以使用 repartition
函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。
例外情况:将列分区数据写入到 对 Cloud Storage 而言,对特定列进行重新分区可以避免 以缩短写入时间。
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
将数据存储在 Parquet 或 Avro 中
Spark SQL 默认在 Snappy 压缩文件中读取和写入数据 Parquet 文件。Parquet 采用高效的列式格式 一种文件格式,使得 Spark 能够仅读取执行 应用。在处理各种事务时 大型数据集。其他列格式,例如 Apache ORC 也性能出色。
对于非列式数据, Apache Avro 提供一种 一种高效的二进制行文件格式虽然通常比 Parquet 慢, Avro 的性能优于 CSV 或 JSON 等基于文本的格式。
优化磁盘大小
永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。
如需监控 Cloud Monitoring 中的工作器磁盘吞吐量, Google Cloud 控制台:
- 点击 集群 页面。
- 点击“虚拟机实例”标签页。
- 点击任意工作器名称。
- 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。
磁盘注意事项
临时 Dataproc 集群,此类集群 永久性存储的优势可以使用本地 SSD。 本地 SSD 以物理方式挂接到集群,并提供更高的吞吐量 (请参阅性能表)。 本地 SSD 的可用固定大小为 375 GB,但您可以添加多个 SSD 以提高性能。
集群关闭后,本地 SSD 不会保留数据。如果您需要永久性磁盘 您可以使用 SSD 永久性磁盘 提供更高的吞吐量 比标准永久性磁盘要大如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。
将 GPU 挂接到集群
Spark 3 支持 GPU。将 GPU 与 RAPIDS 初始化操作 使用 Cloud Build RAPIDS SQL 加速器。 通过 GPU 驱动程序初始化操作 来配置带有 GPU 的集群。
常见的作业故障和修复方案
内存不足
示例:
- “执行器丢失”
- “java.lang.OutOfMemoryError:已超出 GC 开销限制”
- “由于超出内存限制,YARN 已终止容器”
可能的修复方案:
重排提取失败
示例:
- "FetchFailedException"(Spark 错误)
- “未能连接到...”(Spark 错误)
- “无法提取”(MapReduce 错误)
通常由提前移除仍具有 shuffle 功能的 worker 导致 提供数据
可能的原因和解决方法:
- 抢占式工作器虚拟机被收回,或者非抢占式工作器虚拟机被回收 由自动扩缩器移除解决方案:使用增强型灵活模式 使辅助工作器能够安全地抢占式或可伸缩。
- 由于 OutOfMemory 错误,执行器或映射器崩溃。解决方法: 增加执行器或映射器的内存
- Spark shuffle 服务可能会过载。解决方法: 减少作业分区数。
YARN 节点运行状况不佳
示例(来自 YARN 日志):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
通常与 shuffle 数据的磁盘空间不足有关。诊断依据 查看日志文件:
- 打开项目的 集群 页面,然后点击该集群的名称。
- 点击“查看日志”。
- 按“
hadoop-yarn-nodemanager
”过滤日志。 - 搜索“UNHEALTHY”。
可能的修复方法:
- 用户缓存存储在由
yarn.nodemanager.local-dirs
属性(位于yarn-site.xml file
。此文件位于/etc/hadoop/conf/yarn-site.xml
。您可以查看可用空间 (位于/hadoop/yarn/nm-local-dir
路径下),然后通过以下方式释放空间: 正在删除/hadoop/yarn/nm-local-dir/usercache
用户缓存文件夹。 - 如果日志报告“UNHEALTHY”重新创建集群 具有较大的磁盘空间 提高吞吐量上限。
由于驱动程序内存不足,作业失败
在集群模式下运行作业时,如果主实例的内存大小不同,则作业将失败 节点明显大于工作器节点的内存大小。
驱动程序日志的示例:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
可能的修复方法:
- 将
spark:spark.driver.memory
设置为小于yarn:yarn.scheduler.maximum-allocation-mb
。 - 为主节点和工作器节点使用同一机器类型。
了解详情
- 请参阅 Spark 性能调整