Spark 作业调节提示

以下部分提供了一些提示,可帮助您微调 Dataproc Spark 应用。

使用临时集群

使用 Dataproc“临时”集群模型时,您可以为每个作业创建一个专用集群,并在作业完成后删除集群。借助临时模型,您可以单独处理存储和计算,将作业输入和输出数据保存在 Cloud Storage 或 BigQuery 中,仅将集群用于计算和临时数据存储。

永久性集群陷阱

使用临时单作业集群可避免与使用长时间运行的“永久性”共享集群相关的以下陷阱和潜在问题:

  • 单点故障:共享集群错误状态可能会导致所有作业失败,进而阻止整个数据流水线。调查错误并从错误中恢复可能需要数小时。由于临时集群仅保留临时集群内状态,因此在发生错误时,您可以快速删除并重新创建这些临时集群。
  • 难以维护和迁移 HDFS、MySQL 或本地文件系统中的集群状态
  • 会对 SLO 产生负面影响的作业之间的资源争用
  • 内存压力导致的服务守护程序无响应
  • 可能会超出磁盘容量的日志和临时文件累积
  • 由于集群可用区缺货而引起的扩容失败
  • 缺乏对过时的集群映像版本的支持。

临时集群优势

从积极的方面来看,临时集群可让您执行以下操作:

  • 使用不同的 Dataproc 虚拟机服务帐号为不同的作业配置不同的 IAM 权限。
  • 为每个作业优化集群的硬件和软件配置,并根据需要更改集群配置。
  • 升级新集群中的映像版本,以获取最新的安全补丁、bug 修复和优化。
  • 在隔离的单作业集群上更快地排查问题。
  • 您只需为临时集群运行时间付费,而不是为共享集群上作业之间的空闲时间付费,从而节省费用。

使用 Spark SQL

Spark SQL DataFrame API 对 RDD API 进行了重大优化。如果您与使用 RDD 的代码交互,请考虑先以 DataFrame 形式读取数据,然后再在代码中传递 RDD。在 Java 或 Scala 代码中,请考虑使用 Spark SQL Dataset API 作为 RDD 和 DataFrame 的超集。

使用 Apache Spark 3

Dataproc 2.0 可以安装 Spark 3,其中包含以下功能和性能改进:

  • GPU 支持
  • 能够读取二进制文件
  • 性能改进
  • 动态分区删减
  • 自适应查询执行,可实时优化 Spark 作业

使用动态分配

Apache Spark 包含动态分配功能,该功能可扩缩集群内工作器上的 Spark 执行程序的数量。即使集群纵向扩容,此功能也允许作业使用完整的 Dataproc 集群。默认情况下,Dataproc 上启用此功能(spark.dynamicAllocation.enabled 设置为 true)。如需了解详情,请参阅 Spark 动态分配

使用 Dataproc 自动扩缩功能

Dataproc 自动扩缩功能会在集群中动态添加 Dataproc 工作器以及从集群中动态移除 Dataproc 工作器,以帮助确保 Spark 作业具有快速完成所需的资源。

最佳做法是将自动扩缩政策配置为仅扩缩辅助工作器

使用 Dataproc 增强的灵活模式

如果工作器在完成向缩减器处理 shuffle 数据之前被抢占或移除,具有抢占式虚拟机或自动扩缩政策的集群可能会收到 FetchFailed 异常。此异常可能会导致任务重试和更长的作业完成时间。

建议:使用 Dataproc 增强型灵活模式,该模式不会在辅助工作器上存储中间 Shuffle 数据,以便辅助工作器可以被安全抢占或缩减。

配置分区和重排

Spark 将数据存储在集群上的临时分区中。如果您的应用对 DataFrame 进行分组或联接,它会根据分组和低级别配置将数据重排到新分区。

数据分区可显著影响应用性能:太少的分区会限制作业并行性和集群资源利用率;太多的分区会因为需要进行额外的分区处理和重排而降低作业速度。

配置分区

以下属性用于控制分区的数量和大小:

  • spark.sql.files.maxPartitionBytes:从 Cloud Storage 读取数据时的分区大小上限。默认值为 128 MB,这对大多数处理小于 100 TB 的应用来说已经足够大。

  • spark.sql.shuffle.partitions:执行重排后的分区数量。默认值为 200,这适用于 vCPU 总数少于 100 个的集群。建议:请将此属性设置为集群中的 vCPU 数量的 3 倍。

  • spark.default.parallelism:执行需要 shuffle 的 RDD 转换后返回的分区数,例如 joinreduceByKeyparallelize。默认值是集群中的 vCPU 总数。在 Spark 作业中使用 RDD 时,您可以将此数量设置为 vCPU 的 3 倍

限制文件数量

如果 Spark 读取大量小文件,会造成性能损失。以较大的文件大小存储数据,例如 256 MB-512 MB 范围内的文件大小。同样,限制输出文件的数量(如需强制 shuffle,请参阅避免不必要的 shuffle)。

配置自适应查询执行 (Spark 3)

自适应查询执行(在 Dataproc 映像版本 2.0 中默认启用)可提高 Spark 作业性能,包括:

虽然默认配置设置适用于大多数用例,但将 spark.sql.adaptive.advisoryPartitionSizeInBytes 设置为 spark.sqlfiles.maxPartitionBytes(默认值为 128 MB)会很有帮助。

避免不必要的重排

Spark 允许用户手动触发重排以使用 repartition 函数重新平衡其数据。重排的费用很高,因此应谨慎重排数据。适当设置分区配置应该足以让 Spark 自动对您的数据进行分区。

例外情况:将列分区数据写入 Cloud Storage 时,对特定列进行重新分区可避免写入许多小文件,从而缩短写入时间。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

将数据存储在 Parquet 或 Avro 中

Spark SQL 默认在 Snappy 压缩的 Parquet 文件中读取和写入数据。Parquet 采用高效的列式文件格式,使 Spark 仅读取执行应用所需的数据。在处理大型数据集时,这是一个重要优势。其他列式格式(例如 Apache ORC)也效果良好。

对于非列式数据,Apache Avro 提供高效的二进制行文件格式。虽然通常比 Parquet 慢,但 Avro 的性能优于基于文本的格式(例如 CSV 或 JSON)。

优化磁盘大小

永久性磁盘的吞吐量会随着磁盘大小而扩缩,这可能会影响 Spark 作业的性能,因为作业会将元数据和重排数据写入磁盘。使用标准永久性磁盘时,每个工作器的磁盘大小应至少为 1 TB(请参阅永久性磁盘大小的性能)。

如需在 Google Cloud 控制台中监控工作器磁盘吞吐量,请执行以下操作:

  1. 点击集群页面上的集群名称。
  2. 点击“虚拟机实例”标签页。
  3. 点击任意工作器名称。
  4. 点击“监控”标签页,然后向下滚动到“磁盘吞吐量”以查看工作器吞吐量。

磁盘注意事项

临时 Dataproc 集群(无法受益于永久性存储)可以使用本地 SSD。本地 SSD 以物理方式挂接到集群,与永久性磁盘相比,它提供更高的吞吐量(请参阅性能表)。本地 SSD 具有 375 GB 的固定大小,但您可以添加多个 SSD 以提高性能。

集群关闭后,本地 SSD 不会保留数据。如果需要永久性存储空间,您可以使用 SSD 永久性磁盘,与标准永久性磁盘相比,这种磁盘可针对磁盘大小提供更高的吞吐量。如果分区大小小于 8 KB,则 SSD 永久性磁盘也是一个不错的选择(不过,请避免使用小型分区)。

将 GPU 挂接到您的集群

Spark 3 支持 GPU。通过将 GPU 与 RAPIDS 初始化操作搭配使用,通过 RAPIDS SQL Accelerator 加速 Spark 作业。GPU 驱动程序初始化操作,用于配置具有 GPU 的集群。

常见的作业故障和修复方案

内存不足

示例:

  • “执行器丢失”
  • “java.lang.OutOfMemoryError:已超出 GC 开销限制”
  • “由于超出内存限制,YARN 已终止容器”

可能的修复方案:

  • 如果使用 PySpark,请提高 spark.executor.memoryOverhead,降低 spark.executor.memory
  • 使用高内存机器类型。
  • 使用较小的分区

重排提取失败

示例:

  • “FetchFailedException”(Spark 错误)
  • “无法连接...”(Spark 错误)
  • “无法提取”(MapReduce 错误)

这通常是由于提前移除仍具有 Shuffle 数据的工作器所致。

可能的原因和解决方法:

  • 抢占式工作器虚拟机被收回,或非抢占式工作器虚拟机已被自动扩缩器移除。解决方案:使用增强型灵活模式可使辅助工作器安全地抢占或可伸缩。
  • 由于 OutOfMemory 错误,执行器或映射器崩溃。解决方案:增加执行器或映射器的内存。
  • Spark Shuffle 服务可能会过载。解决方案:减少作业分区的数量。

YARN 节点运行状况不佳

示例(来自 YARN 日志):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

通常与 shuffle 数据的磁盘空间不足有关。通过查看日志文件进行诊断:

  • 在 Google Cloud 控制台中打开项目的集群页面,然后点击集群的名称。
  • 点击“查看日志”。
  • hadoop-yarn-nodemanager 过滤日志。
  • 搜索“UNHEALTHY”。

可能的解决方法:

  • 用户缓存存储在 yarn-site.xml file 中的 yarn.nodemanager.local-dirs 属性指定的目录中。此文件位于 /etc/hadoop/conf/yarn-site.xml。您可以查看 /hadoop/yarn/nm-local-dir 路径中的可用空间,并删除 /hadoop/yarn/nm-local-dir/usercache 用户缓存文件夹以释放空间。
  • 如果日志报告“UNHEALTHY”状态,请使用更大的磁盘空间重新创建集群,这将提高吞吐量上限

由于驱动程序内存不足,作业失败

在集群模式下运行作业时,如果主节点的内存大小明显大于工作器节点的内存大小,则作业将失败。

驱动程序日志中的示例:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

可能的解决方法:

  • spark:spark.driver.memory 设置为小于 yarn:yarn.scheduler.maximum-allocation-mb
  • 为主节点和工作器节点使用相同的机器类型。

如需深入了解