Spark 作业调节提示

以下部分提供了一些提示,可帮助您优化 Dataproc Spark 应用。

使用临时集群

使用 Dataproc“临时”集群模型时,您需要为每个作业创建一个专用集群,并在作业完成后删除该集群。借助暂时性模型,您可以将存储和计算分开处理,将作业输入和输出数据保存在 Cloud Storage 或 BigQuery 中,仅将集群用于计算和临时数据存储。

永久性集群误区

使用短时单任务集群可避免使用共享且长时间运行的“永久性”集群时出现以下陷阱和潜在问题:

  • 单点故障:共享集群错误状态可能会导致所有作业都失败,从而阻塞整个数据流水线。调查错误并从中恢复可能需要数小时。由于临时集群仅保留临时的集群内状态,因此当发生错误时,可以快速删除并重新创建这些集群。
  • 难以在 HDFS、MySQL 或本地文件系统中维护和迁移集群状态
  • 作业之间的资源争用会对 SLO 产生负面影响
  • 内存压力导致的服务守护程序无响应
  • 日志和临时文件堆积,可能会超出磁盘容量
  • 由于集群可用区缺货,扩容失败
  • 不支持过时的集群映像版本

临时集群的优势

从积极的方面来看,借助临时集群,您可以执行以下操作:

  • 使用不同的 Dataproc VM 服务账号为不同的作业配置不同的 IAM 权限。
  • 针对每个作业优化集群的硬件和软件配置,并根据需要更改集群配置。
  • 升级新集群中的映像版本,以获取最新的安全补丁、bug 修复和优化。
  • 在隔离的单作业集群上更快地排查问题。
  • 只需为临时集群的运行时间付费,而无需为共享集群上作业之间的空闲时间付费,从而节省费用。

使用 Spark SQL

Spark SQL DataFrame API 是对 RDD API 的重大优化。如果您与使用 RDD 的代码进行交互,请考虑先将数据读取为 DataFrame,然后再在代码中传递 RDD。在 Java 或 Scala 代码中,不妨将 Spark SQL Dataset API 用作 RDD 和 DataFrame 的超集。

使用 Apache Spark 3

Dataproc 2.0 会安装 Spark 3,其中包含以下功能和性能改进:

  • GPU 支持
  • 能够读取二进制文件
  • 性能改进
  • 动态分区剪裁
  • 自适应查询执行,可实时优化 Spark 作业

使用动态分配

Apache Spark 包含动态分配功能,用于扩缩集群中工作器上的 Spark 执行程序数量。借助此功能,即使集群扩容,作业也能使用整个 Dataproc 集群。此功能在 Dataproc 上默认处于启用状态(spark.dynamicAllocation.enabled 设置为 true)。如需了解详情,请参阅 Spark 动态分配

使用 Dataproc 自动扩缩功能

Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。

最佳实践是将自动扩缩政策配置为仅扩缩辅助工作器

使用 Dataproc 增强的灵活性模式

如果工作器在向 reducer 提交 shuffle 数据之前被抢占或移除,则包含可抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 异常。此异常可能会导致任务重试,并延长作业完成时间。

建议:使用 Dataproc 增强的灵活性模式,该模式不会将中间 shuffle 数据存储到辅助工作器上,因此可以安全地抢占或缩减辅助工作器。

配置分区和洗牌

Spark 会将数据存储在集群上的临时分区中。如果您的应用对 DataFrame 进行分组或联接,则会根据分组和低级配置将数据随机分到新的分区中。

数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。

配置分区

以下属性用于控制分区的数量和大小:

  • spark.sql.files.maxPartitionBytes:从 Cloud Storage 读取数据时分区的大小上限。默认值为 128 MB,对于处理数据量不超过 100 TB 的大多数应用来说,这个值已经足够大了。

  • spark.sql.shuffle.partitions:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。

  • spark.default.parallelism:执行需要重排的 RDD 转换(例如 joinreduceByKeyparallelize)后返回的分区数量。默认值为集群中的 vCPU 总数。在 Spark 作业中使用 RDD 时,您可以将此数量设置为 vCPU 数量的 3 倍

限制文件数量

当 Spark 读取大量小文件时,性能会有所下降。以较大的文件大小存储数据,例如,文件大小在 256MB-512MB 范围内。同样,请限制输出文件的数量(如需强制进行随机播放,请参阅避免不必要的随机播放)。

配置自适应查询执行 (Spark 3)

自适应查询执行(在 Dataproc 映像版本 2.0 中默认处于启用状态)可提升 Spark 作业性能,包括:

虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes 设置为 spark.sqlfiles.maxPartitionBytes(默认值为 128 MB)会很有帮助。

避免不必要的重排

Spark 允许用户手动触发重排以使用 repartition 函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。

例外情况:将按列分区的数据写入 Cloud Storage 时,对特定列进行重新分区可避免写入许多小文件,从而缩短写入时间。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

在 Parquet 或 Avro 中存储数据

Spark SQL 默认在 Snappy 压缩的 Parquet 文件中读取和写入数据。Parquet 采用高效的列式文件格式,使 Spark 能够仅读取执行应用所需的数据。在处理大型数据集时,这是一个重要优势。其他列格式(例如 Apache ORC)的性能也很出色。

对于非列式数据,Apache Avro 提供了高效的二进制行文件格式。虽然通常比 Parquet 慢,但 Avro 的性能优于基于文本的格式(例如 CSV 或 JSON)。

优化磁盘大小

永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。

如需在 Google Cloud 控制台中监控工作器磁盘吞吐量,请执行以下操作:

  1. 集群页面上,点击集群名称。
  2. 点击“虚拟机实例”标签页。
  3. 点击任意工作器名称。
  4. 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。

磁盘注意事项

临时 Dataproc 集群无法受益于永久性存储空间,可以使用本地 SSD。本地 SSD 以物理方式挂接到集群,并且比永久性磁盘提供更高的吞吐量(请参阅性能表)。本地 SSD 的大小固定为 375 GB,但您可以添加多个 SSD 以提高性能。

集群关停后,本地 SSD 不会保留数据。如果您需要使用永久性存储,则可以使用 SSD 永久性磁盘,它们为其大小提供比标准永久性磁盘更高的吞吐量。如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。

将 GPU 挂接到集群

Spark 3 支持 GPU。将 GPU 与 RAPIDS 初始化操作搭配使用,以使用 RAPIDS SQL 加速器加快 Spark 作业的速度。GPU 驱动程序初始化操作,用于配置包含 GPU 的集群。

常见的作业故障和修复方案

内存不足

示例:

  • “执行器丢失”
  • “java.lang.OutOfMemoryError:已超出 GC 开销限制”
  • “由于超出内存限制,YARN 已终止容器”

可能的修复方案:

  • 如果使用 PySpark,请提高 spark.executor.memoryOverhead 并降低 spark.executor.memory
  • 使用高内存机器类型。
  • 使用较小的分区

重排提取失败

示例:

  • “FetchFailedException”(Spark 错误)
  • “Failed to connect to…”(Spark error)
  • “无法提取”(MapReduce 错误)

通常由过早移除仍有要分发的 Shuffle 数据的工作器所致。

可能的原因和解决方法:

  • 自动扩缩器回收了抢占式工作器虚拟机,或移除了非抢占式工作器虚拟机。解决方案:使用增强的灵活性模式,让辅助工作器能够安全地被抢占或可伸缩。
  • 执行器或映射器因 OutOfMemory 错误而崩溃。解决方案:增加执行器或映射程序的内存。
  • Spark 分区混洗服务可能过载。解决方案:减少作业分区数量。

YARN 节点运行状况不佳

示例(来自 YARN 日志):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

通常与用于随机播放数据的磁盘空间不足有关。通过查看日志文件进行诊断:

  • 在 Google Cloud 控制台中打开项目的 集群页面,然后点击集群的名称。
  • 点击“查看日志”。
  • hadoop-yarn-nodemanager 过滤日志。
  • 搜索“UNHEALTHY”。

可能的修复方法:

  • 用户缓存存储在 yarn-site.xml file 中的 yarn.nodemanager.local-dirs 属性指定的目录中。此文件位于 /etc/hadoop/conf/yarn-site.xml。您可以查看 /hadoop/yarn/nm-local-dir 路径中的可用空间,并通过删除 /hadoop/yarn/nm-local-dir/usercache 用户缓存文件夹来释放空间。
  • 如果日志报告“UNHEALTHY”状态,请使用更大的磁盘空间重新创建集群,这将提高吞吐量上限

作业因驱动程序内存不足而失败

在集群模式下运行作业时,如果主节点的内存大小明显大于工作器节点的内存大小,则作业会失败。

驱动程序日志中的示例:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

可能的修复方法:

  • spark:spark.driver.memory 设置为小于 yarn:yarn.scheduler.maximum-allocation-mb
  • 为主节点和工作器节点使用相同的机器类型。

了解详情