Dataproc 增强的灵活性模式

Dataproc 增强的灵活性模式 (EFM) 可管理 shuffle 数据,以最大限度地减少因从正在运行的集群中移除节点而导致的作业进度延迟。EFM 将 Spark Shuffle 数据写入主要工作器。 在缩减阶段,工作器会从这些远程节点拉取。

由于 EFM 不会在辅助工作器上存储中间 Shuffle 数据,因此它非常适合在使用抢占式虚拟机或仅自动扩缩辅助工作器组的集群中使用。

限制

  • 不支持 AppMaster 重定位的 Apache Hadoop YARN 作业可能会在增强型灵活模式下失败(请参阅何时等待 AppMaster 完成)。
  • 不建议使用增强的灵活性模式:
    • 在只有主要工作器的集群上。
  • 不支持增强的灵活性模式:
    • 启用主要工作器自动扩缩后。在大多数情况下,主要工作器将继续存储未自动迁移的 shuffle 数据。降低主工作器组的规模会抵消 EFM 优势。
    • 当 Spark 作业在启用了安全停用的集群上运行时。安全停用和 EFM 可以交叉使用,因为 YARN 安全停用机制会确保停用节点直到所有相关的应用完成为止。

使用增强的灵活性模式

增强型灵活模式按执行引擎配置,并且必须在创建集群期间进行配置。Spark EFM 实现使用 dataproc:efm.spark.shuffle=primary-worker 集群属性 进行配置。

示例:针对 Spark 创建具有 primary-worker shuffle 的集群:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Apache Spark 示例

  1. 使用 EFM 集群上的 Spark 示例 jar 对公开的莎士比亚文字运行 WordCount 作业。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

为主工作器重排配置本地 SSD

主工作器和 HDFS 重排实现将中间重排数据写入虚拟机挂接的磁盘,并受益于本地 SSD 提供的更高吞吐量和 IOPS。为便于资源分配,请在配置主要工作器机器时,将目标定为大约每 4 个 vCPU 有 1 个本地 SSD 分区。

如需挂接本地 SSD,请将 --num-worker-local-ssds 标志传递给 gcloud dataproc clusters create 命令。

辅助工作器比率

由于辅助工作器将其 shuffle 数据写入主要工作器,因此您的集群必须包含足够数量的主要工作器以及足够的 CPU、内存和磁盘资源,以适应作业的 shuffle 负载。对于自动扩缩集群,为防止主要群组扩缩及其导致的不必要行为,请将主要工作器组的自动扩缩政策中的 minInstances 设置为 maxInstances 值。

如果您的辅助工作器与主要工作器之比较高(例如,10:1),请监控主要工作器的 CPU 利用率、网络和磁盘用量,以确定它们是否过载。为此,请按以下说明操作:

  1. 转到 Google Cloud 控制台中的虚拟机实例页面。

  2. 点击主要工作器左侧的复选框。

  3. 点击“监控”标签,查看主工作器的 CPU 利用率、磁盘 IOPS、网络字节数以及其他指标。

如果主要工作器过载,请考虑手动扩容主要工作器

调整主要工作器组的大小

主要工作器组可以安全地纵向扩容,但缩减主要工作器组会对作业进度产生负面影响。缩小主要工作器组的操作应使用安全停用(通过设置 --graceful-decommission-timeout 标志来启用)。

自动扩缩集群:使用自动扩缩政策的 EFM 集群停用了主要工作器扩缩功能。如需调整自动扩缩集群上的主要工作器组的大小,请执行以下操作:

  1. 停用自动扩缩。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. 扩缩主要组。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 重新启用自动扩缩:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

监控主要工作器磁盘的使用情况

主要工作器必须具有足够的磁盘空间来存储集群的 shuffle 数据。您可以通过 remaining HDFS capacity 指标间接监控此情况。本地磁盘填满时,HDFS 将不再提供空间,剩余容量也会减少。

默认情况下,当主要工作器的本地磁盘使用量超过 90% 时,该节点将在 YARN 节点界面中标记为“UNHEALTHY”。如果您遇到磁盘容量问题,可以从 HDFS 中删除未使用的数据或纵向扩容主要工作器池。

请注意,中间 Shuffle 数据通常在作业结束时才会被清理。将主要工作器 shuffle 与 Spark 搭配使用时,此过程在作业完成后可能需要花费长达 30 分钟的时间。

高级配置

分区和并行性

提交 MapReduce 或 Spark 作业时,请配置适当的分区级别。确定重排阶段的输入和输出分区数量涉及到在不同性能特征之间的权衡取舍。最好尝试使用适合您工作形状的值。

输入分区

MapReduce 和 Spark 输入分区由输入数据集决定。从 Cloud Storage 读取文件时,每个任务会处理大约一个“块大小”的数据。

  • 对于 Spark SQL 作业,分区大小上限由 spark.sql.files.maxPartitionBytes 控制。请考虑将其增加至 1 GB:spark.sql.files.maxPartitionBytes=1073741824

  • 对于 MapReduce 作业和 Spark RDD,分区大小通常由 fs.gs.block.size 控制,默认值为 128 MB。请考虑将其增加至 1 GB。您还可以设置 InputFormat 特定属性,例如 mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsize

    • 对于 MapReduce 作业:--properties fs.gs.block.size=1073741824
    • 对于 Spark RDD:--properties spark.hadoop.fs.gs.block.size=1073741824

输出分区

后续阶段的任务数量由多个属性控制。对于处理超过 1 TB 数据的大型作业,请考虑使每个分区至少有 1 GB。

  • 对于 MapReduce 作业,输出分区数量由 mapreduce.job.reduces 控制。

  • 对于 Spark SQL,输出分区数量由 spark.sql.shuffle.partitions 控制。

  • 对于使用 RDD API 的 Spark 作业,您可以指定输出分区的数量或设置 spark.default.parallelism

主工作器重排的重排调节

最重要的属性是 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>。请注意,这是集群级 YARN 属性,因为 Spark 重排服务器作为节点管理器的一部分运行。默认值为机器核心数的两倍 (2x)。例如,n1-highmem-8 上有 16 个线程。如果“重排读取受阻时间”大于 1 秒,并且主要工作器未达到网络、CPU 或磁盘限制,请考虑增加重排服务器线程数。

在较大的机器类型上,请考虑增加 spark.shuffle.io.numConnectionsPerPeer(默认值为 1)。(例如,将其设置为每个主机对 5 个连接。)

增加重试次数

应用主实例、任务和阶段允许的最大尝试次数可通过设置以下属性进行配置:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

由于在使用多个抢占式虚拟机或自动扩缩而不安全停用的集群中,应用主实例和任务会更频繁地终止,因此增加这些集群中的上述属性的值会有所帮助(请注意,不支持将 EFM 与 Spark 搭配使用并安全停用)。

EFM 集群上的 YARN 安全停用

YARN 安全停用可用于快速移除节点,而对运行的应用产生最小的影响。对于自动扩缩集群,可以在挂接到 EFM 集群的 AutoscalingPolicy 中设置安全停用超时

针对安全停用的 EFM 增强功能

  1. 由于中间数据存储在分布式文件系统中,因此在这些节点上运行的所有容器完成运行后,可以立即从 EFM 集群中移除节点。相比之下,在应用完成之前,系统不会移除标准 Dataproc 集群上的节点。

  2. 节点移除不会等待在节点上运行的应用主实例结束运行。 应用主容器被终止后,将在另一个未停用的节点上重新安排。作业进度不会丢失:新应用主实例通过读取作业历史记录快速恢复之前的应用主实例的状态。