调整流处理流水线的横向自动扩缩

在具有大量输入数据的流处理流水线中,通常需要在费用和延迟时间之间做出取舍。为了保持较短的延迟时间,Dataflow 必须随着流量的增加而添加工作器。另一个因素是流水线应以多快的速度进行纵向扩容或纵向缩容来应对输入数据速率的变化。

Dataflow 自动扩缩器具有适合许多工作负载的默认设置。但是,您可能希望针对特定场景调整此行为。例如,为了降低费用可能可以接受较长的平均延迟时间,或者您可能希望 Dataflow 更快地纵向扩容以应对流量高峰。

如需优化横向自动扩缩,您可以调整以下参数:

设置自动扩缩范围

创建新的流处理作业时,您可以设置初始工作器数量和工作器数量上限。为此,请指定以下流水线选项

Java

  • --numWorkers:流水线开始运行时可用的初始工作器数量
  • --maxNumWorkers:流水线可用的工作器数量上限

Python

  • --num_workers:流水线开始运行时可用的初始工作器数量
  • --max_num_workers:流水线可用的工作器数量上限

Go

  • --num_workers:流水线开始运行时可用的初始工作器数量
  • --max_num_workers:流水线可用的工作器数量上限

对于使用 Streaming Engine 的流处理作业,--maxNumWorkers 是可选标志。默认值为 100。对于不使用 Streaming Engine 的流处理作业,启用横向自动扩缩后,--maxNumWorkers 是必需标志。

--maxNumWorkers 的起始值还决定了为作业分配的永久性磁盘的数量。流水线在部署时使用的是固定永久性磁盘池(相当于 --maxNumWorkers 个磁盘)。在流处理期间,系统会重新分配永久性磁盘,使每个工作器都会挂接相同数量的磁盘。

如果您设置了 --maxNumWorkers,请确保该值为流水线提供足够的磁盘。设置初始值时,请考虑未来的增长。如需了解永久性磁盘性能,请参阅配置永久性磁盘和虚拟机。Dataflow 会对永久性磁盘用量计费,并且具有 Compute Engine 配额(包括永久性磁盘配额)。

默认情况下,对于使用 Streaming Engine 的流处理作业,工作器数量下限为 1;对于不使用 Streaming Engine 的作业,工作器数量下限为 (maxNumWorkers/15),向上取整。

更新自动扩缩范围

对于使用 Streaming Engine 的作业,您可以调整工作器数量下限和上限,而无需停止或替换作业。如需调整这些值,请使用运行中作业更新。更新以下作业选项:

  • --min-num-workers:工作器数量下限。
  • --max-num-workers:工作器数量上限。

gcloud

使用 gcloud dataflow jobs update-options 命令:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

替换以下内容:

  • REGION:作业的区域级端点的区域 ID
  • MINIMUM_WORKERS:Compute Engine 实例数下限
  • MAXIMUM_WORKERS:Compute Engine 实例数上限
  • JOB_ID:要更新的作业的 ID

您还可以分别更新 --min-num-workers--max-num-workers

REST

使用 projects.locations.jobs.update 方法:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

替换以下内容:

  • PROJECT_ID:Dataflow 作业的 Google Cloud 项目 ID
  • REGION:作业的区域级端点的区域 ID
  • JOB_ID:要更新的作业的 ID
  • MINIMUM_WORKERS:Compute Engine 实例数下限
  • MAXIMUM_WORKERS:Compute Engine 实例数上限

您还可以分别更新 min_num_workersmax_num_workers。在 updateMask 查询参数中指定要更新的参数,并在请求正文的 runtimeUpdatableParams 字段中添加更新后的值。以下示例会更新 min_num_workers

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

对于不使用 Streaming Engine 的作业,您可以使用更新后的 maxNumWorkers替换现有作业

如果您更新了未使用 Streaming Engine 的流处理作业,则默认情况下,更新后的作业停用了横向自动扩缩。如需启用自动扩缩,请为更新后的作业指定 --autoscalingAlgorithm--maxNumWorkers

设置工作器利用率提示

Dataflow 使用平均 CPU 利用率作为何时应用横向自动扩缩的信号。默认情况下,Dataflow 将目标 CPU 利用率设置为 0.8。如果利用率超出此范围,则 Dataflow 可能会添加或移除工作器。

为了更好地控制自动扩缩行为,您可以将目标 CPU 利用率设置为 [0.1, 0.9] 范围内的值。

  • 如果您希望实现较短的峰值延迟时间,请设置较低的 CPU 利用率值。较低的值可让 Dataflow 更主动地横向扩容以应对不断增长的工作器利用率,并更保守地纵向缩容以提高稳定性。如果流水线在稳定状态下运行,较低的值还可以提供更多余量,这通常可以缩短尾延迟时间。(尾延迟时间用于衡量新记录处理之前的最长等待时间。)

  • 如果您希望在流量高峰时节省资源并降低费用,请设置较高的值。较高的值可防止纵向扩容过度,但延迟时间较长。

如需在运行非模板作业时配置利用率提示,请设置 worker_utilization_hint 服务选项。对于模板作业,请改为更新利用率提示,因为服务选项不受支持。

以下示例展示了如何使用 worker_utilization_hint

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替换为 [0.1, 0.9] 范围内的值。

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替换为 [0.1, 0.9] 范围内的值。

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替换为 [0.1, 0.9] 范围内的值。

对于新流水线,我们建议您使用默认设置在实际负载下进行测试。然后评估应用于流水线的自动扩缩行为,并根据需要进行调整。

利用率提示只是 Dataflow 在决定是否扩缩工作器时使用的一个因素。积压和可用键等其他因素可能会替换提示值。此外,提示并非严格目标。自动扩缩器会尝试将 CPU 利用率保持在提示值的范围内,但聚合利用率指标可能会更高或更低。如需了解详情,请参阅流式自动扩缩启发法

更新利用率提示

如需在作业运行时更新利用率提示,请按照以下所述执行运行中更新

gcloud

使用 gcloud dataflow jobs update-options 命令:

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

替换以下内容:

  • REGION:作业的区域级端点的区域 ID
  • JOB_ID:要更新的作业的 ID
  • TARGET_UTILIZATION:[0.1, 0.9] 范围内的值

如需将利用率提示重置为默认值,请使用以下 gcloud 命令:

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

使用 projects.locations.jobs.update 方法:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

请替换以下内容:

  • PROJECT_ID:Dataflow 作业的 Google Cloud 项目 ID。
  • REGION:作业的区域端点的区域 ID。
  • JOB_ID:要更新的作业的 ID。
  • TARGET_UTILIZATION:[0.1, 0.9] 范围内的值

流式自动扩缩启发法

对于流处理流水线,横向自动扩缩的目标是在最大限度地提高工作器的利用率和吞吐量的同时最大限度地减少积压,并快速应对负载高峰。

Dataflow 在自动扩缩时会考虑几个因素,包括:

  • 积压。预计的积压时间是根据吞吐量和仍要从输入源处理的积压字节数计算得出的。当预计积压时间超过 15 秒时,流水线被视为“积压”。

  • 目标 CPU 利用率。平均 CPU 利用率的默认目标为 0.8。您可以替换此值

  • 可用键。键是 Dataflow 中并行处理的基本单元。

在某些情况下,Dataflow 会在做出自动扩缩决策时使用以下因素。如果您的作业使用这些因素,您可以在自动扩缩指标标签页中查看该信息。

  • 基于键的限制使用作业收到的处理键数量来计算用户工作器的数量上限,因为每个键一次只能由一个工作器处理。

  • 纵向缩容抑制。如果 Dataflow 检测到发生了不稳定的自动扩缩决策,则会减慢纵向缩容速度以提高稳定性。

  • 基于 CPU 的纵向扩容使用高 CPU 利用率作为纵向扩容标准。

  • 对于不使用 Streaming Engine 的流处理作业,扩缩可能会受限于永久性磁盘的数量。如需了解详情,请参阅设置自动扩缩范围

纵向扩容。如果流处理流水线在工作器上保持积压长达几分钟,并且具有足够的并行性,则 Dataflow 会纵向扩容。根据每个工作器的当前吞吐量,Dataflow 会尝试在纵向扩容后大约 150 秒内清除积压。如果存在积压,但工作器没有足够的并行性来处理其他工作器,则流水线不会纵向扩容。(如果扩缩工作器的数量,使其超出可用于并行处理的键的数量,则不能帮助您更快地处理积压。)

纵向缩容。当自动扩缩器做出纵向缩容决策时,积压是优先级最高的因素。自动扩缩器力求积压不超过 15 秒。如果积压低于 10 秒,且平均工作器利用率低于 CPU 利用率目标,则 Dataflow 会纵向缩容。只要积压可以接受,自动扩缩器就会尝试将 CPU 利用率保持在接近目标 CPU 利用率。但是,如果利用率已经足够接近目标,则自动扩缩器可能会保持工作器数量不变,因为每个纵向缩容步骤都会产生费用。

Streaming Engine 还会使用基于计时器积压的预测性自动扩缩技术。流式流水线中的无界限数据划分为按时间戳分组的时间段。在某时间段结束时,系统会对该时间段内处理的每个键触发计时器。触发计时器表示该时间段对于给定键已过期。Streaming Engine 可以衡量计时器积压,并预测时间段结束时将触发的计时器数量。Dataflow 利用计时器积压作为信号,估算未来的计时器触发时必须发生的处理量。Dataflow 会根据估算的未来负载提前进行自动扩缩,以满足预期需求。

指标

如需查看作业的当前自动扩缩限制,请查询以下指标:

  • job/max_worker_instances_limit:工作器数量上限。
  • job/min_worker_instances_limit:工作器数量下限。

如需获取有关工作器利用率的信息,请查询以下指标:

  • job/aggregated_worker_utilization:汇总的工作器利用率。
  • job/worker_utilization_hint:当前工作器利用率提示。

如需深入了解自动扩缩器的行为,请查询以下指标:

  • job.worker_utilization_hint_is_actively_used:指示自动扩缩器是否正在主动使用工作器利用率提示。在此指标进行采样时,如果其他因素替换了提示,则值为 false
  • job/horizontal_worker_scaling:描述自动扩缩器所做的决策。此指标包含以下标签:
    • direction:指定自动扩缩器是纵向扩容、纵向缩容还是不执行任何操作。
    • rationale:指定自动扩缩器决策的理由。

如需了解详情,请参阅 Cloud Monitoring 指标。这些指标也会显示在自动扩缩监控图表中。

后续步骤