Spark 作业调节提示

以下部分提供了相关提示来帮助您微调 Dataproc Spark 应用。

使用临时集群

当您使用 Dataproc“临时”集群模型, 为每项作业创建一个专用集群,并且在作业完成时,您可以删除该集群。 使用临时模型,您可以分别处理存储和计算, 在 Cloud Storage 或 BigQuery 中保存作业输入和输出数据, 仅使用集群来存储计算和临时数据。

永久性集群问题

使用临时的单作业集群可避免以下误区和潜在 与使用共享的长期运行的“永久性”集群:

  • 单点故障:共享集群错误状态可能会导致所有作业失败, 阻止整个数据流水线。调查错误并从错误中恢复 可能需要几个小时才能完成。由于临时集群仅保留集群内临时状态, 在发生错误时,可以快速删除并重新创建它们。
  • 在 HDFS、MySQL 或本地文件系统中难以维护和迁移集群状态
  • 对 SLO 产生负面影响的作业之间的资源争用
  • 由内存压力导致的服务守护程序无响应
  • 累积超出磁盘容量的日志和临时文件
  • 由于集群可用区缺货导致的扩容失败
  • 不支持过时的集群映像版本

临时集群的优势

从积极的方面来看,临时集群可让您执行以下操作:

  • 为具有不同配置的不同作业配置不同的 IAM 权限 Dataproc 虚拟机服务账号
  • 针对每个集群优化集群的硬件和软件配置 根据需要更改集群配置。
  • 在新集群中升级映像版本以获取最新的安全性 补丁、bug 修复和优化
  • 在隔离的单作业集群上更快地排查问题。
  • 只需为临时集群运行时间付费,无需付费,从而节省费用 用于计算共享集群上作业之间的空闲时间。

使用 Spark SQL

Spark SQL DataFrame API 对 RDD API 进行了重大优化。如果 与使用 RDD 的代码交互时 在代码中传递 RDD 之前,先将数据作为 DataFrame 进行读取。 在 Java 或 Scala 代码中,请考虑将 Spark SQL Dataset API 用作 RDD 和 DataFrame 的超集。

使用 Apache Spark 3

Dataproc 2.0 安装 Spark 3,它具有以下功能和性能 改进:

  • GPU 支持
  • 能够读取二进制文件
  • 性能改进
  • 动态分区删减
  • 自适应查询执行,可实时优化 Spark 作业

使用动态分配

Apache Spark 包含动态分配功能, 集群中工作器的 Spark 执行器数量。借助此功能,您可以 使用整个 Dataproc 集群的作业,即使集群 Dataproc 上默认启用此功能 (spark.dynamicAllocation.enabled 设置为 true)。请参阅 Spark 动态分配

使用 Dataproc 自动扩缩功能

Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。

这是一种最佳做法 将自动扩缩政策配置为仅扩缩辅助工作器

使用 Dataproc 增强的灵活模式

具有抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 工作器在完成服务之前被抢占或移除时的异常 shuffle 数据到缩减器。此异常可能会导致任务重试和更长的作业时间 完成时间。

建议:使用 Dataproc 增强的灵活性模式, 它不会在辅助工作器上存储中间 shuffle 数据, 辅助工作器可以被安全地抢占或缩容。

配置分区和重排

Spark 将数据存储在 集群。如果您的应用对 DataFrame 进行分组或联接,则会重排数据 根据分组和低层级配置转移到新分区中。

数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。

配置分区

以下属性用于控制分区的数量和大小:

  • spark.sql.files.maxPartitionBytes: 从 Cloud Storage 中读取数据默认值为 128MB 足以处理大多数处理小于 100 TB 的应用。

  • spark.sql.shuffle.partitions:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。

  • spark.default.parallelism:在 执行需要重排的 RDD 转换,例如 joinreduceByKeyparallelize。默认值为 vCPU 的总数 集群内在 Spark 作业中使用 RDD 时,您可以设置 提高到 vCPU 数量的 3 倍

限制文件数量

当 Spark 读取大量小文件时,性能会下降。 以较大的文件大小存储数据,例如, 介于 256MB–512MB 之间。 同样,限制输出文件的数量(要强制重排,请参阅 避免不必要的重排)。

配置自适应查询执行 (Spark 3)

自适应查询执行 (在 Dataproc 映像版本 2.0 中默认启用) 有助于提升 Spark 作业性能,包括:

虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes 设置为 spark.sqlfiles.maxPartitionBytes(默认值为 128 MB)会很有帮助。

避免不必要的重排

Spark 允许用户手动触发重排以使用 repartition 函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。

例外情况:将列分区数据写入到 对 Cloud Storage 而言,对特定列进行重新分区可以避免 以缩短写入时间。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

将数据存储在 Parquet 或 Avro 中

Spark SQL 默认在 Snappy 压缩文件中读取和写入数据 Parquet 文件。Parquet 采用高效的列式格式 一种文件格式,使得 Spark 能够仅读取执行 应用。在处理各种事务时 大型数据集。其他列格式,例如 Apache ORC 也性能出色。

对于非列式数据, Apache Avro 提供一种 一种高效的二进制行文件格式虽然通常比 Parquet 慢, Avro 的性能优于 CSV 或 JSON 等基于文本的格式。

优化磁盘大小

永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。

如需监控 Cloud Monitoring 中的工作器磁盘吞吐量, Google Cloud 控制台:

  1. 点击 集群 页面。
  2. 点击“虚拟机实例”标签页。
  3. 点击任意工作器名称。
  4. 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。

磁盘注意事项

临时 Dataproc 集群,此类集群 永久性存储的优势可以使用本地 SSD。 本地 SSD 以物理方式挂接到集群,并提供更高的吞吐量 (请参阅性能表)。 本地 SSD 的可用固定大小为 375 GB,但您可以添加多个 SSD 以提高性能。

集群关闭后,本地 SSD 不会保留数据。如果您需要永久性磁盘 您可以使用 SSD 永久性磁盘 提供更高的吞吐量 比标准永久性磁盘要大如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。

将 GPU 挂接到集群

Spark 3 支持 GPU。将 GPU 与 RAPIDS 初始化操作 使用 Cloud Build RAPIDS SQL 加速器。 通过 GPU 驱动程序初始化操作 来配置带有 GPU 的集群。

常见的作业故障和修复方案

内存不足

示例:

  • “执行器丢失”
  • “java.lang.OutOfMemoryError:已超出 GC 开销限制”
  • “由于超出内存限制,YARN 已终止容器”

可能的修复方案:

  • 如果使用 PySpark,请引发 spark.executor.memoryOverhead 和更低的 spark.executor.memory
  • 使用高内存 机器类型
  • 使用较小的分区

重排提取失败

示例:

  • "FetchFailedException"(Spark 错误)
  • “未能连接到...”(Spark 错误)
  • “无法提取”(MapReduce 错误)

通常由提前移除仍具有 shuffle 功能的 worker 导致 提供数据

可能的原因和解决方法:

  • 抢占式工作器虚拟机被收回,或者非抢占式工作器虚拟机被回收 由自动扩缩器移除解决方案:使用增强型灵活模式 使辅助工作器能够安全地抢占式或可伸缩。
  • 由于 OutOfMemory 错误,执行器或映射器崩溃。解决方法: 增加执行器或映射器的内存
  • Spark shuffle 服务可能会过载。解决方法: 减少作业分区数。

YARN 节点运行状况不佳

示例(来自 YARN 日志):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

通常与 shuffle 数据的磁盘空间不足有关。诊断依据 查看日志文件:

  • 打开项目的 集群 页面,然后点击该集群的名称。
  • 点击“查看日志”。
  • 按“hadoop-yarn-nodemanager”过滤日志。
  • 搜索“UNHEALTHY”。

可能的修复方法:

  • 用户缓存存储在由 yarn.nodemanager.local-dirs 属性(位于 yarn-site.xml file。此文件位于 /etc/hadoop/conf/yarn-site.xml。您可以查看可用空间 (位于 /hadoop/yarn/nm-local-dir 路径下),然后通过以下方式释放空间: 正在删除 /hadoop/yarn/nm-local-dir/usercache 用户缓存文件夹。
  • 如果日志报告“UNHEALTHY”重新创建集群 具有较大的磁盘空间 提高吞吐量上限

由于驱动程序内存不足,作业失败

在集群模式下运行作业时,如果主实例的内存大小不同,则作业将失败 节点明显大于工作器节点的内存大小。

驱动程序日志的示例:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

可能的修复方法:

  • spark:spark.driver.memory 设置为小于 yarn:yarn.scheduler.maximum-allocation-mb
  • 为主节点和工作器节点使用同一机器类型。

了解详情