Spark 作业调节提示

以下部分提供了相关提示,可帮助您微调 Dataproc Spark 应用。

使用临时集群

当您使用 Dataproc“临时”集群模型, 一个专用集群,在该作业完成后,您可以删除集群。 借助暂时性模型,您可以将存储和计算分开处理,将作业输入和输出数据保存在 Cloud Storage 或 BigQuery 中,仅将集群用于计算和临时数据存储。

永久性集群问题

使用短时单任务集群可避免使用共享且长时间运行的“永久性”集群时出现以下陷阱和潜在问题:

  • 单点故障:共享集群错误状态可能会导致所有作业失败, 阻止整个数据流水线。调查错误并从中恢复可能需要数小时。由于临时集群仅保留临时的集群内状态,因此当发生错误时,可以快速删除并重新创建这些集群。
  • 在 HDFS、MySQL 或本地文件系统中难以维护和迁移集群状态
  • 对 SLO 产生负面影响的作业之间的资源争用
  • 由内存压力导致的服务守护程序无响应
  • 日志和临时文件的累积可能会超出磁盘容量
  • 由于集群可用区缺货,扩容失败
  • 不支持过时的集群映像版本

临时集群的优势

从积极的方面来看,借助临时集群,您可以执行以下操作:

  • 为具有不同配置的不同作业配置不同的 IAM 权限 Dataproc 虚拟机服务账号
  • 针对每个作业优化集群的硬件和软件配置,并根据需要更改集群配置。
  • 升级新集群中的映像版本以获取最新的安全性 补丁、bug 修复和优化
  • 在隔离的单作业集群上更快地排查问题。
  • 只需为临时集群的运行时间付费,而无需为共享集群上作业之间的空闲时间付费,从而节省费用。

使用 Spark SQL

Spark SQL DataFrame API 对 RDD API 进行了重大优化。如果您与使用 RDD 的代码进行交互,请考虑先将数据读取为 DataFrame,然后再在代码中传递 RDD。在 Java 或 Scala 代码中,请考虑将 Spark SQL Dataset API 用作 RDD 和 DataFrame 的超集。

使用 Apache Spark 3

Dataproc 2.0 会安装 Spark 3,其中包含以下功能和性能改进:

  • GPU 支持
  • 能够读取二进制文件
  • 性能改进
  • 动态分区修剪
  • 自适应查询执行,可实时优化 Spark 作业

使用动态分配

Apache Spark 包含动态分配功能, 集群中工作器的 Spark 执行器数量。借助此功能,即使集群扩容,作业也能使用整个 Dataproc 集群。Dataproc 上默认启用此功能 (spark.dynamicAllocation.enabled 设置为 true)。请参阅 Spark 动态分配

使用 Dataproc 自动扩缩功能

Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。

最佳实践是将自动扩缩政策配置为仅扩缩辅助工作器

使用 Dataproc 增强的灵活性模式

如果工作器在向 reducer 提交 shuffle 数据之前被抢占或移除,则包含可抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 异常。此异常可能会导致任务重试和更长的作业时间 完成时间。

建议:使用 Dataproc 增强的灵活性模式,该模式不会将中间 shuffle 数据存储到辅助工作器上,因此可以安全地抢占或缩减辅助工作器。

配置分区和洗牌

Spark 会将数据存储在集群上的临时分区中。如果您的应用对 DataFrame 进行分组或联接,则会重排数据 根据分组和低层级配置复制到新的分区中。

数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。

配置分区

以下属性用于控制分区的数量和大小:

  • spark.sql.files.maxPartitionBytes: 从 Cloud Storage 中读取数据默认值为 128MB 足以处理大多数处理小于 100 TB 的应用。

  • spark.sql.shuffle.partitions:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。

  • spark.default.parallelism:在 执行需要重排的 RDD 转换,例如 joinreduceByKeyparallelize。默认值为集群中的 vCPU 总数。在 Spark 作业中使用 RDD 时,您可以将此数量设置为 vCPU 数量的 3 倍

限制文件数量

当 Spark 读取大量小文件时,性能会下降。 以较大的文件大小存储数据,例如, 介于 256MB–512MB 之间。 同样,限制输出文件的数量(要强制重排,请参阅 避免不必要的重排)。

配置自适应查询执行 (Spark 3)

自适应查询执行 (在 Dataproc 映像版本 2.0 中默认启用) 有助于提升 Spark 作业性能,包括:

虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes 设置为 spark.sqlfiles.maxPartitionBytes(默认值为 128 MB)会很有帮助。

避免不必要的重排

Spark 允许用户手动触发重排以使用 repartition 函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。

例外情况:将按列分区的数据写入 Cloud Storage 时,对特定列进行重新分区可避免写入许多小文件,从而缩短写入时间。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

在 Parquet 或 Avro 中存储数据

Spark SQL 默认在 Snappy 压缩的 Parquet 文件中读取和写入数据。Parquet 采用高效的列式格式 一种文件格式,使得 Spark 能够只读取执行 应用。在处理各种事务时 大型数据集。其他列格式(例如 Apache ORC)的性能也很出色。

对于非列式数据,Apache Avro 提供了高效的二进制行文件格式。虽然通常比 Parquet 慢, Avro 的性能优于 CSV 或 JSON 等基于文本的格式。

优化磁盘大小

永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。

如需在 Google Cloud 控制台中监控工作器磁盘吞吐量,请执行以下操作:

  1. 集群页面上,点击集群名称。
  2. 点击“虚拟机实例”标签页。
  3. 点击任意工作器名称。
  4. 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。

磁盘注意事项

临时 Dataproc 集群,此类集群 永久性存储的优势可以使用本地 SSD。 本地 SSD 以物理方式挂接到集群,并且比永久性磁盘提供更高的吞吐量(请参阅性能表)。本地 SSD 的可用固定大小为 375 GB,但您可以添加多个 SSD 以提高性能。

集群关停后,本地 SSD 不会保留数据。如果您需要永久性磁盘 您可以使用 SSD 永久性磁盘 提供更高的吞吐量 比标准永久性磁盘要大如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。

将 GPU 挂接到集群

Spark 3 支持 GPU。将 GPU 与 RAPIDS 初始化操作搭配使用,以使用 RAPIDS SQL 加速器加快 Spark 作业的速度。GPU 驱动程序初始化操作,用于配置包含 GPU 的集群。

常见的作业故障和修复方案

内存不足

示例:

  • “执行器丢失”
  • “java.lang.OutOfMemoryError:已超出 GC 开销限制”
  • “由于超出内存限制,YARN 已终止容器”

可能的修复方案:

  • 如果使用 PySpark,请引发 spark.executor.memoryOverhead 和更低的 spark.executor.memory
  • 使用高内存 机器类型
  • 使用较小的分区

重排提取失败

示例:

  • “FetchFailedException”(Spark 错误)
  • “未能连接到...”(Spark 错误)
  • “无法提取”(MapReduce 错误)

通常由提前移除仍具有 shuffle 功能的 worker 导致 提供数据

可能的原因和解决方法:

  • 抢占式工作器虚拟机被收回,或非抢占式工作器虚拟机被回收 由自动扩缩器移除解决方案:使用增强的灵活性模式,让辅助工作器能够安全地被抢占或可扩缩。
  • 执行器或映射器因 OutOfMemory 错误而崩溃。解决方法: 增加执行器或映射器的内存
  • Spark 分区混洗服务可能过载。解决方案:减少作业分区的数量。

YARN 节点运行状况不佳

示例(来自 YARN 日志):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

通常与用于随机播放数据的磁盘空间不足有关。诊断依据 查看日志文件:

  • 打开项目的 集群 页面,然后点击该集群的名称。
  • 点击“查看日志”。
  • 按“hadoop-yarn-nodemanager”过滤日志。
  • 搜索“UNHEALTHY”。

可能的修复方法:

  • 用户缓存存储在由 yarn.nodemanager.local-dirs 属性(位于 yarn-site.xml file。此文件位于 /etc/hadoop/conf/yarn-site.xml。您可以查看 /hadoop/yarn/nm-local-dir 路径中的可用空间,并通过删除 /hadoop/yarn/nm-local-dir/usercache 用户缓存文件夹来释放空间。
  • 如果日志报告“UNHEALTHY”重新创建集群 具有较大的磁盘空间 提高吞吐量上限

作业因驱动程序内存不足而失败

在集群模式下运行作业时,如果主节点的内存大小明显大于工作器节点的内存大小,则作业会失败。

驱动程序日志的示例:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

可能的修复方法:

  • spark:spark.driver.memory 设置为小于 yarn:yarn.scheduler.maximum-allocation-mb
  • 为主节点和工作器节点使用同一机器类型。

了解详情