自动扩缩集群

什么是自动扩缩?

估算工作负载的“集群”工作器(节点)的“适当”数量非常困难,整个流水线的单个集群大小通常不是理想之选。用户启动的集群扩缩功能部分解决了此问题,但需要监控集群利用率和人工干预。

Dataproc AutoscalingPolicies API 提供自动管理集群资源的机制,还启用了集群自动扩缩功能。Autoscaling Policy 是可重复使用的配置,描述了应如何扩缩使用该自动扩缩政策的集群。该政策定义了扩缩边界、频率和积极性,以在整个集群生命周期内提供对集群资源的精细控制。

何时使用自动扩缩

使用自动扩缩:

在将数据储存在外部服务(如 Cloud StorageBigQuery)的集群上

在处理许多作业的集群上

以扩缩单个作业集群

对 Spark 批处理作业使用增强的灵活性模式

自动扩缩功能建议使用/用于:

  • HDFS:自动扩缩功能不适合对集群 HDFS 调整规模。如果对 HDFS 使用自动扩缩功能,请确保主要工作器数量的下限足以处理所有 HDFS 数据。此外,还要认识到停用 HDFS 数据节点可能会延迟移除工作器。

  • YARN 节点标签:自动扩缩因 YARN-9088 而不支持 YARN 节点标签和属性 dataproc:am.primary_only。当使用节点标签时,YARN 错误地报告集群指标。

  • Spark 结构化流:自动扩缩不支持 Spark 结构化流(请参阅自动扩缩和 Spark 结构化流)。

  • 闲置集群:建议不要在集群处于闲置状态时使用自动扩缩将集群缩减到大小下限。由于创建新集群的速度与调整集群大小的速度一样快,请考虑删除闲置集群,然后重新创建。以下工具支持此“临时”模型:

    使用 Dataproc 工作流在专用集群上安排一组作业,然后在作业完成之后删除该集群。如需更高级的编排,请使用以 Apache Airflow 为基础开发的 Cloud Composer

    对于处理临时查询或外部安排的工作负载的集群,请使用集群计划删除以在指定的空闲时间段或特定的时间段后删除集群。

启用自动扩缩

如需在集群上启用自动扩缩,请执行以下操作:

  1. 创建自动扩缩政策

  2. 采用以下任一方式:

    1. 创建自动扩缩集群,或
    2. 在现有集群上启用自动扩缩

创建自动扩缩政策

gcloud 命令

您可以使用 gcloud dataproc autoscaling-policies import 命令创建自动扩缩政策。它会读取定义了自动扩缩政策的本地 YAML 文件。文件的格式和内容应与 autoscalingPolicies REST API 定义的配置对象和字段相匹配。

以下 YAML 示例定义了指定所有必填字段的政策。它还为主要工作器和辅助(抢占式)工作器提供 maxInstances 值,并指定了 4 分钟的 cooldownPeriod(默认为 2 分钟)。

workerConfig:
  maxInstances: 100
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

以下 YAML 示例指定了所有可选和必填的自动扩缩政策字段。

workerConfig:
  minInstances: 2
  maxInstances: 100
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

从本地终端或在 Cloud Shell 中运行以下 gcloud 命令以创建自动扩缩政策。提供政策的名称。此名称将成为政策 id,您可以在后续的 gcloud 命令中使用该名称来引用该政策。使用 --source 标志指定要导入的自动扩缩政策 YAML 文件的本地文件路径和文件名。

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

REST API

通过将 AutcalealingPolicy 定义为 autcalealingPolicies.create 请求的一部分来创建自动扩缩政策。

控制台

如需创建自动扩缩政策,请使用 Cloud Console 从 Dataproc 自动扩缩政策页面选择“创建政策”。在创建政策页面上,您可以选择政策建议面板,以填充特定作业类型或扩缩目标的自动扩缩政策字段。

创建自动扩缩集群

创建自动扩缩政策后,创建将使用自动扩缩政策的集群。该集群必须与自动扩缩政策位于同一区域中。

gcloud 命令

在本地终端或在 Cloud Shell 中运行以下 gcloud 命令以创建自动扩缩集群。为集群提供名称,并使用 --autoscaling-policy 标志指定 policy id(您在创建政策时指定的政策名称)或政策 resource URI (resource name)(请参阅自动扩缩政策 idname 字段)。

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

可以通过将 AutoscalingConfig 添加到 clusters.create 请求中来创建自动扩缩集群。

控制台

您可以从 Dataproc 上“设置集群”面板的“自动扩缩政策”部分选择要应用到新集群的现有自动扩缩政策创建集群页面。

在现有集群上启用自动扩缩

创建自动扩缩政策后,您可以在位于同一区域中的现有集群上启用该政策。

gcloud 命令

从本地终端或在 Cloud Shell 中运行以下 gcloud 命令,以在现有集群上启用自动扩缩政策。提供集群名称,并使用 --autoscaling-policy 标志指定 policy id(您在创建政策时指定的政策名称)或政策 resource URI (resource name)(请参阅自动扩缩政策 idname 字段)。

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

如需在现有集群上启用自动扩缩政策,请在 clusters.patch 请求的 updateMask 中设置政策的 AutoscalingConfig.policyUri

控制台

目前,Google Cloud Console 不支持在现有集群上启用自动扩缩政策。

多集群政策使用情况

  • 自动扩缩政策定义了可应用于多个集群的扩缩行为。当多个集群共享类似工作负载或运行具有类似资源使用模式的作业时,自动扩缩政策非常适用。

  • 您可以更新多个集群所使用的政策。更新会立即影响所有使用该政策的集群的自动扩缩行为(请参阅 autcalealingPolicies.update)。如果您不想将政策更新应用于使用该政策的集群,请先停用集群上的自动扩缩,然后再更新政策。

gcloud 命令

在本地终端或在 Cloud Shell 中运行以下 gcloud 命令,以停用集群的自动扩缩。

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

REST API

要停用集群上的自动扩缩,请将 AutoscalingConfig.policyUri 设置为空字符串,并在 clusters.patch 请求中设置 update_mask=config.autoscaling_config.policy_uri

控制台

目前,Google Cloud Console 不支持停用集群上的自动扩缩。

自动扩缩的工作原理

自动扩缩功能会在每个“冷却期”过后检查集群的 Hadoop YARN 指标,以确定是否扩缩集群,如果需要扩缩,则确定更新的规模。

  1. 每次评估时,自动扩缩功能都会检查在上一个 cooldown_period 内得出的平均待处理和可用集群内存,以确定需要更改的确切工作器数量:

    exact Δworkers = avg(pending memory - available memory) / memory per node manager

    • pending memory 是一个信号,表明集群已将任务加入队列但尚未执行,可能需要进行扩缩以更好地处理其工作负载。
    • available memory 是一个信号,表明集群在运行状况良好的节点上具有额外的带宽,可能需要缩减以节省资源。
    • 请参阅使用 Hadoop 和 Spark 进行自动扩缩,详细了解这些 Apache Hadoop YARN 指标。
  2. 根据需要更改的确切工作器数量,自动扩缩功能使用 scaleUpFactorscaleDownFactor 来计算工作器数量的实际变化:

    if exact Δworkers > 0:
      actual Δworkers = ROUND_UP(exact Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(exact Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(exact Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(exact Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(exact Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(exact Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(exact Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    scaleUpFactor 或 scaleDownFactor 为 1.0 意味着自动扩缩功能将执行扩缩,以使待处理/可用内存为 0(完全利用率)。

  3. 计算工作器数量的实际更改后,scaleUpMinWorkerFractionscaleDownMinWorkerFraction 将充当阈值,以确定自动扩缩是否会扩缩集群。一小部分表示即使 Δworkers 较小,自动扩缩功能也应该进行扩缩。较大部分表示仅在 Δworkers 较大时才应进行扩缩。

    if (Δworkers >  scaleUpMinWorkerFraction* cluster size) then scale up
    if (abs(Δworkers) >  scaleDownMinWorkerFraction* cluster size) then scale down

  4. 如果要扩缩的工作器数量大到足以触发扩缩操作,则自动扩缩功能会使用 minInstances maxInstances 的边界 workerConfigsecondaryWorkerConfigweight(主要工作器与辅助工作器的比率)以确定如何在主要和辅助工作器实例组之间拆分工作器数量。这些计算的结果是针对扩缩阶段的集群的最终自动扩缩更改。

自动扩缩配置建议

避免扩缩主要工作器

主要工作器运行 HDFS Datanode,而辅助工作器是计算专用工作器。HDFS Namenode 具有多个争用情况,导致 HDFS 进入损坏状态,从而导致停用过程永远卡住。请避免扩缩主要工作器以避免发生这些问题。例如:

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100

您需要对集群创建命令进行一些修改:

  1. 设置 --num-workers=10 以匹配自动扩缩政策的主要工作器组大小。
  2. 设置 --secondary-worker-type=non-preemptible 以将辅助工作器配置为非抢占式工作器。(除非需要抢占式虚拟机)。
  3. 将硬件配置从主要工作器复制到辅助工作器。例如,将 --secondary-worker-boot-disk-size=1000GB 设置为与 --worker-boot-disk-size=1000GB 匹配。

对 Spark 批处理作业使用增强的灵活性模式

请参阅增强的灵活性模式。增强的灵活性模式可管理 Shuffle 数据,通过自动扩缩或抢占以最大限度地减少因从正在运行的集群中移除节点而导致的作业进度延迟。

启用 EFM 后,自动扩缩政策的安全停用超时值必须设置为 0s。自动扩缩政策只能自动扩缩辅助工作器。增强的灵活性模式还允许安全使用抢占式辅助工作器 (--secondary-worker-type=preemptible)。

选择安全停用超时

从集群中移除节点时,自动扩缩功能支持 YARN 安全停用。通过安全停用功能,应用可以在不同阶段之间完成数据重排,从而避免阻碍作业进度。自动扩缩政策中提供的⁠安全停用超时是指在移除节点之前 YARN 等待运行应用(停用开始时运行的应用)的持续时间上限。

安全停用超时应设置为大于集群处理最长作业所需时间的值。例如 1h

请考虑将花费时间超过 1 小时的作业迁移到其临时集群,以免禁用安全停用。

设置 scaleUpFactor

scaleUpFactor 控制自动扩缩程序应该以怎样的积极程度对集群进行扩容。它是介于 0.01.0 之间的数字,表示要添加节点的 YARN 待处理内存所占百分比。

例如,如果有 100 个待处理容器,每个容器请求 512MB,则待处理 YARN 内存将达到 50GB。如果 scaleUpFactor 为 0.5,则自动扩缩程序将添加足够的节点来添加 25GB 的 YARN 内存。同样,如果为 0.1,则自动扩缩程序将添加足够节点以达到 5GB。请注意,这些值对应于 YARN 内存,而不是虚拟机上实际可用的总内存。

对于 MapReduce 作业和启用了动态分配功能的 Spark 作业,建议您首先使用 0.05。对于具有固定数量的执行程序的 Spark 作业和 Tez 作业,请使用 1.0

设置 scaleDownFactor

scaleDownFactor 控制自动扩缩程序应该以怎样的积极程度对集群进行缩减。它是介于 0.01.0 之间的数字,表示要移除节点的 YARN 可用内存所占百分比。

对于大多数多作业集群,请将此值保留为 1.0。由于安全停用,缩减操作的速度比扩容要慢得多。设置 scaleDownFactor=1.0 可配置积极缩减,从而最大限度地减少缩减操作的数量,以达到适当的集群大小。

将此值设置为 0.0 可避免一直缩减集群(例如,在临时集群上)。

设置 scaleUpMinWorkerFractionscaleDownMinWorkerFraction

scaleUpMinWorkerFractionscaleDownMinWorkerFraction 默认为 0.0,表示自动扩缩程序将选择扩容或缩减集群时的阈值。scaleUpMinWorkerFraction 控制发出更新请求所需的集群大小增加的最低百分比。例如,如果自动扩缩程序希望向 100 个节点集群添加 5 个工作器,则 scaleUpMinWorkerFraction 需要小于或等于 0.05 (5%) 才能实际发出更新请求。如果值为 0.1,则自动扩缩程序不会扩缩集群。

同样,scaleDownMinWorkerFraction 表示为了发出缩减请求,必须在集群中移除的当前节点比例的阈值。如果 scaleDownMinFraction 为 0.05,在包含 100 个节点的集群中,除非至少需要移除 5 个节点,否则自动扩缩程序不会发出更新请求。

默认值为 0.0,表示没有阈值。设置阈值对于大型集群(超过 100 个节点)非常有用,可以避免进行较小的不必要的扩缩操作。

选择冷却期

cooldownPeriod 的最小值和默认值均为两分钟。如果在政策中设置了较短的 cooldownPeriod,工作负载更改会更快地影响集群大小,但集群可能会进行不必要的扩缩。推荐做法是在使用较短的 cooldownPeriod 时将政策的 scale_upscale_downmin_worker_fractions 设置为非零值。这可确保仅在内存利用率的变化足以保证集群更新时,才可以扩充或缩减集群的规模。

工作器计数边界和群组权重

每个工作器组都具有 minInstancesmaxInstances,用于为每个组的大小配置硬性限制。

每个组还有一个名为 weight 的参数,用于在两个组之间配置目标平衡。请注意,此参数只是一个提示,如果某个组的大小达到最小值或最大值,则只能在其他组中添加或移除节点。因此,weight 通常始终保留为默认的 1

自动扩缩指标和日志

以下资源和工具可帮助您监控自动扩缩操作及其对集群和它的作业产生的影响。

Cloud Monitoring

使用 Cloud Monitoring 来执行下列操作:

  • 查看自动扩缩功能使用的指标
  • 查看集群中的节点管理器数量
  • 了解为何自动扩缩功能已经扩缩或未扩缩您的集群 autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

使用 Cloud Logging 通过 Cloud Dataproc 自动扩缩程序查看日志。

1) 查找集群的日志。

autoscaling-logs-for-cluster

2) 选择 dataproc.googleapis.com/autoscaler

autoscaling-log-file

3) 展开日志消息以查看 status 字段。日志采用机器可读的 JSON 格式。

autoscaling-three-logs autoscaling-update-operation

4) 展开日志消息以查看扩缩建议、用于制订扩缩决策的指标、原始集群大小和新目标集群的大小。

autoscaling-recommendation-message

背景:使用 Apache Hadoop 和 Apache Spark 进行自动扩缩

以下部分介绍了自动扩缩如何与 Hadoop YARN 和 Hadoop Mapreduce、与 Apache Spark、Spark Streaming 和 Spark 结构化流进行(或不进行)互操作。

Hadoop YARN 指标

自动扩缩功能会配置 Hadoop YARN 以根据 YARN 内存请求而非 YARN 核心请求安排作业。

自动扩缩功能以下列 Hadoop YARN 指标为中心:

  1. Allocated memory 是指在整个集群中运行容器所占用的 YARN 内存总量。如果有 6 个最多可使用 1GB 的正在运行的容器,则表示已分配 6GB 内存。

  2. Available memory 是未被分配的容器使用的集群中的 YARN 内存。如果所有节点管理器有 10GB 内存,已分配的内存为 6GB,则有 4GB 的可用内存。如果集群中有可用(未使用)内存,则自动扩缩功能可能会从集群中移除工作器。

  3. Pending memory 是待处理容器的 YARN 内存请求总和。 待处理容器正在等待在 YARN 中运行空间。仅当可用内存为零或太小而无法分配给下一个容器时,待处理内存才不为零。如果存在待处理容器,则自动扩缩功能可能会将工作器添加到集群中。

您可以在 Cloud Monitoring 中查看这些指标。 默认情况下,YARN 内存将为 0.8 * 集群的总内存,而剩余的内存保留给其他守护进程和操作系统使用,例如页面缓存。您可以使用“yarn.nodemanager.resource.memory-mb” YARN 配置设置替换默认值(请参阅 Apache Hadoop YARN、HDFS、Spark 和相关属性)。

自动扩缩和 Hadoop MapReduce

MapReduce 作为单独的 YARN 容器运行每个映射和缩减任务。当作业开始时,MapReduce 会为每个映射任务提交容器请求,从而导致待处理的 YARN 内存出现大量峰值。随着映射任务的完成,待处理内存会减少。

mapreduce.job.reduce.slowstart.completedmaps 完成时(Dataproc 的默认设置为 95%),MapReduce 会为所有缩减器将容器请求加入队列,从而导致待处理内存再次增加。

除非您的映射任务和缩减任务需要花费几分钟或更长的时间,否则请勿为自动扩缩 scaleUpFactor 设置较高的值。将工作器添加到集群至少需要 1.5 分钟,因此请确保有足够的待处理工作能够使用新工作器几分钟。良好的起点是将 scaleUpFactor 设置为待处理内存的 0.05 (5%) 或 0.1 (10%)。

自动扩缩和 Spark

Spark 在 YARN 上额外添加了一层计划。具体来说,Spark Core 的动态分配会向 YARN 请求容器运行 Spark 执行程序,然后在这些执行程序的线程上安排 Spark 任务。Dataproc 集群默认启用动态分配,因此系统会根据需要添加和移除执行程序。

Spark 始终向 YARN 请求容器,但不进行动态分配,它仅在作业开始时请求容器。如果进行动态分配,则 Spark 会根据需要移除容器或请求新的容器。

Spark 从少量执行程序开始,在自动扩缩集群上执行 2 次,并且在执行积压任务时继续将执行程序的数量加倍。这将使待处理的内存(较少的待处理内存峰值)减少。对于 Spark 作业,建议将自动扩缩 scaleUpFactor 设置为较大的数字,例如 1.0 (100%)。

停用 Spark 动态分配

如果您运行的独立 Spark 作业无法从 Spark 动态分配中受益,则可以通过设置 spark.dynamicAllocation.enabled=falsespark.executor.instances 停用 Spark 动态分配。当各个 Spark 作业运行时,您仍然可以使用自动扩缩来扩缩集群。

已缓存数据的 Spark 作业

当不再需要数据集时设置 spark.dynamicAllocation.cachedExecutorIdleTimeout 或释放缓存的数据集。默认情况下,Spark 不会移除已缓存数据的执行程序,这将阻止集群缩减。

自动扩缩和 Spark Streaming

  1. 由于 Spark Streaming 有自己的动态分配版本,该版本使用流式传输专用信号来添加和移除执行程序,因此请通过设置 spark.dynamicAllocation.enabled=false 来设置 spark.streaming.dynamicAllocation.enabled=true 并停用 Spark Core 的动态分配。

  2. 请勿对 Spark Streaming 作业使用安全停用(自动扩缩 gracefulDecommissionTimeout)。而是改为通过自动扩缩安全移除工作器,请配置检查点以实现容错。

或者,要在不使用自动扩缩的情况下使用 Spark Streaming:

  1. 停用 Spark Core 的动态分配 (spark.dynamicAllocation.enabled=false),以及
  2. 设置作业的执行程序数量 (spark.executor.instances)。请参阅集群属性

自动扩缩和 Spark 结构化流

自动扩缩与 Spark 结构化流不兼容,因为 Spark 结构化流目前不支持动态分配(请参阅 SPARK-24815:结构化流应支持动态分配)。

通过分区和并行性来控制自动扩缩

虽然并行性通常由集群资源设置或确定(例如,HDFS 块数由任务数量控制),但在使用自动扩缩时反过来同样适用:即自动扩缩根据作业并行性设置工作器数量。以下指南可帮助您设置作业并行性:

  • 虽然 Dataproc 根据集群的初始集群大小设置 MapReduce 缩减任务的默认数量,但您可以设置 mapreduce.job.reduces 来提高缩减阶段的并行度。
  • Spark SQL 和 Dataframe 并行度由 spark.sql.shuffle.partitions 确定,默认值为 200。
  • Spark 的 RDD 函数默认为 spark.default.parallelism,该值设置为作业启动时工作器节点上的核心数。但是,创建重排操作的所有 RDD 函数都采用分区数量的参数,该参数会替换 spark.default.parallelism

您应该确保数据均匀分区。如果存在明显的关键偏差,则可能有一个或多个任务花费的时间明显长于其他任务,从而导致利用率低。

自动扩缩默认的 Spark/Hadoop 属性设置

自动扩缩集群具有默认集群属性值,有助于在移除主要工作器或抢占辅助工作器时避免失败。创建具有自动扩缩功能的集群时,您可以替换这些默认值(请参阅集群属性)。

默认增加任务、应用主实例和阶段的最大重试次数

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

默认重置重试计数器(适用于长时间运行的 Spark Streaming 作业)

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

默认让 Spark 的慢启动动态分配机制从较小的规模开始

spark:spark.executor.instances=2

常见问题解答 (FAQ)

可以在高可用性集群和单节点集群上启用自动扩缩吗?

您可以在高可用性集群上启用自动扩缩,但不能在单节点集群上启用(单节点集群不支持调整大小)。

可以手动调整自动扩缩集群的大小吗?

可以。在调整自动扩缩政策时,您可以决定手动调整集群的大小作为临时方案。但是,这些更改只会产生临时影响,而自动扩缩最终会缩减集群。

不考虑手动调整自动扩缩集群的大小,而是考虑:

更新自动扩缩政策。对自动扩缩政策所做的任何更改都会影响当前使用该政策的所有集群(请参阅多集群政策使用情况)。

分离政策并手动将集群扩缩为首选大小。

获取 Dataproc 支持

Dataproc 与 Dataflow 自动扩缩有何区别?

请参阅将 Cloud Dataflow 自动扩缩功能与 Spark 和 Hadoop 进行比较

Dataproc 的开发团队能否将集群状态从 ERROR 重置回 RUNNING

一般来说不可以。这样做需要手动操作来验证是否只需重置集群的状态,通常情况下,如果不执行其他手动步骤(例如重启 HDFS Namenode),则可能无法重置集群。

如果 Dataproc 无法确定操作失败后的集群状态,它会将集群的状态设置为 ERRORERROR 中的集群将不再自动扩缩或运行作业。出现该消息的常见原因包括:

  1. 从 Compute Engine API 返回的错误,通常发生在 Compute Engine 服务中断期间。

  2. HDFS 因 HDFS 停用出错而进入损坏状态。

  3. Dataproc Control API 错误,例如“任务租用过期”

我们正在努力改进 Dataproc 的弹性以解决这些问题。

现在,请删除状态为 ERROR 的集群,然后重新创建这些集群。