横向自动扩缩

横向自动扩缩使 Dataflow 能够为作业选择适当数量的工作器实例,并根据需要添加或移除工作器。Dataflow 根据工作器的平均 CPU 利用率和流水线的并行性进行扩缩。流水线的并行性是指在任何给定时间以最有效率的方式处理数据所需的估算线程数。

批处理和流处理流水线都支持横向自动扩缩。

批量自动扩缩

默认情况下,所有批处理流水线都会启用横向自动扩缩功能。Dataflow 会根据流水线每个阶段的预估总工作量自动选择工作器数量。此估算值取决于输入大小和当前吞吐量。Dataflow 每 30 秒会根据执行进度重新评估一次工作量。随着预估总工作量的增减,Dataflow 会动态地增加或减少工作器数量。

工作器数量与工作量成亚线性关系。例如,有两倍工作量的作业具备的工作器不足两倍。

如果出现以下任何情况,Dataflow 会维持或减少工作器数量,以节省空闲资源:

  • 一般工作器 CPU 使用率低于 5%。
  • 并行性会由于不可并行的工作而受到限制,例如由压缩文件导致的不可拆分数据或不拆分的 I/O 模块。
  • 并行化程度是固定的,例如,在 Cloud Storage 中写入现有文件时。

如需设置工作器数量的上限,请设置 --maxNumWorkers 流水线选项。对于批量作业而言,此标志是可选的。默认值为 1000

流式自动扩缩功能

对于流式作业,横向自动扩缩功能允许 Dataflow 适应性地更改工作器数量,以应对负载和资源利用率的变化。

默认情况下,系统会为使用 Streaming Engine 的流式作业启用横向自动扩缩功能。如需为不使用 Streaming Engine 的流处理作业启用横向自动扩缩功能,请在启动流水线时设置以下流水线选项

Java

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=MAX_WORKERS

MAX_WORKERS 替换为工作器实例数上限。

Python

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS

MAX_WORKERS 替换为工作器实例数上限。

Go

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS

MAX_WORKERS 替换为工作器实例数上限。

停用横向自动扩缩

如需停用横向自动扩缩功能,请在运行作业时设置以下流水线选项

Java

--autoscalingAlgorithm=NONE

如果您停用横向自动扩缩功能,则 Dataflow 会根据 --numWorkers 选项设置工作器数量。

Python

--autoscaling_algorithm=NONE

如果您停用横向自动扩缩功能,则 Dataflow 会根据 --num_workers 选项设置工作器数量。

Go

--autoscaling_algorithm=NONE

如果您停用横向自动扩缩功能,则 Dataflow 会根据 --num_workers 选项设置工作器数量。

自定义来源

如果您要创建自定义数据源,则可以通过实现向横向自动扩缩算法提供更多信息的方法来提高性能:

Java

有界限来源

  • BoundedSource 子类中,实现 getEstimatedSizeBytes 方法。Dataflow 服务会使用 getEstimatedSizeBytes 计算要用于您的流水线的初始工作器数量。
  • BoundedReader 子类中,实现 getFractionConsumed 方法。Dataflow 服务会使用 getFractionConsumed 跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。

无界限来源

该来源必须在出现输入积压情况时向 Dataflow 服务发送通知。积压输入量用于估算尚未经来源处理的输入量(以字节为单位)。如要就输入积压的事宜向该服务发送通知,请在您的 UnboundedReader 类中实现以下任一方法。

  • getSplitBacklogBytes() - 当前来源分组的积压输入量。该服务会汇总所有分组的积压输入量。
  • getTotalBacklogBytes() - 所有分组中的全局积压输入量。在某些情况下,无法计算每个分组的积压输入量,而是只能计算所有分组的积压输入量。 只需提供第一个分组(分组 ID 为“0”)的总积压输入量。

Apache Beam 代码库包含多个实现 UnboundedReader 类的自定义来源示例

Python

有界限来源

  • BoundedSource 子类中,实现 estimate_size 方法。Dataflow 服务会使用 estimate_size 计算要用于您的流水线的初始工作器数量。
  • RangeTracker 子类中,实现 fraction_consumed 方法。Dataflow 服务会使用 fraction_consumed 跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。

Go

有界限来源

  • RangeTracker 中,实现 GetProgress() 方法。Dataflow 服务会使用 GetProgress 跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。

限制

  • 在运行 Dataflow Prime 的作业中,横向自动扩缩功能在纵向自动扩缩期间以及之后 10 分钟内停用。如需了解详情,请参阅对横向自动扩缩的影响
  • 对于不使用 Dataflow Shuffle 的流水线,Dataflow 可能无法有效地缩减工作器,因为工作器可能已重排了存储在本地磁盘中的数据。
  • 流式自动扩缩功能不支持 PeriodicImpulse 转换。如果流水线使用 PeriodicImpulse,则 Dataflow 工作器不会按预期纵向缩容。

后续步骤