横向自动扩缩使 Dataflow 能够为作业选择适当数量的工作器实例,并根据需要添加或移除工作器。Dataflow 根据工作器的平均 CPU 利用率和流水线的并行性进行扩缩。流水线的并行性是指在任何给定时间以最有效率的方式处理数据所需的估算线程数。
批处理和流处理流水线都支持横向自动扩缩。
批量自动扩缩
默认情况下,系统会对所有批量流水线启用横向自动扩缩功能。Dataflow 会根据流水线每个阶段的预估总工作量自动选择工作器数量。此估算值取决于输入大小和当前吞吐量。Dataflow 会每 30 秒根据执行进度重新评估工作量。随着预估总工作量的增减,Dataflow 会动态增加或减少工作器数量。
工作器数量与工作量成亚线性关系。例如,有两倍工作量的作业具备的工作器不足两倍。
如果出现以下任何情况,为了节省空闲资源,Dataflow 会维持或减少工作器数量:
- 一般工作器 CPU 使用率低于 5%。
- 并行性会由于不可并行的工作而受到限制,例如压缩文件或不拆分的 I/O 模块导致的不可拆分数据。
- 并行度是固定的,例如在写入 Cloud Storage 中的现有文件时。
如需设置工作器数量上限,请设置 --maxNumWorkers
流水线选项。默认值为 2,000
。
如需设置工作器数量下限,请设置 --min_num_workers
服务选项。这些标志是可选的。
流式自动扩缩功能
对于流式作业,横向自动扩缩功能可让 Dataflow 以自适应方式更改工作器数量,以应对负载和资源利用率的变化。
默认情况下,使用 Streaming Engine 的流式作业会启用横向自动扩缩功能。如需为不使用 Streaming Engine 的流式作业启用横向自动扩缩功能,请在启动流水线时设置以下流水线选项:
Java
--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=MAX_WORKERS
将 MAX_WORKERS 替换为工作器实例数量上限。
Python
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
将 MAX_WORKERS 替换为工作器实例数量上限。
Go
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
将 MAX_WORKERS 替换为工作器实例数量上限。
如需设置工作器数量下限,请设置 --min_num_workers
服务选项。设置此值后,横向自动扩缩不会缩减到低于指定工作器数量的水平。此标志不是必需的。
停用横向自动扩缩
如需停用横向自动扩缩功能,请在运行作业时设置以下流水线选项。
Java
--autoscalingAlgorithm=NONE
如果您停用横向自动扩缩功能,Dataflow 会根据 --numWorkers
选项设置工作器数量。
Python
--autoscaling_algorithm=NONE
如果您停用横向自动扩缩功能,Dataflow 会根据 --num_workers
选项设置工作器数量。
Go
--autoscaling_algorithm=NONE
如果您停用横向自动扩缩功能,Dataflow 会根据 --num_workers
选项设置工作器数量。
自定义来源
如果您创建自定义数据源,则可以通过实现向横向自动扩缩算法提供更多信息的方法来提高性能:
Java
有界来源
- 在
BoundedSource
子类中,实现getEstimatedSizeBytes
方法。Dataflow 服务会使用getEstimatedSizeBytes
计算要用于您的流水线的初始工作器数量。 - 在
BoundedReader
子类中,实现getFractionConsumed
方法。Dataflow 服务会使用getFractionConsumed
跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。
无界限来源
来源必须在出现输入积压情况时向 Dataflow 服务发送通知。积压输入量用于估算尚未经来源处理的输入量(以字节为单位)。如要就输入积压的事宜向该服务发送通知,请在您的 UnboundedReader
类中实现以下任一方法。
getSplitBacklogBytes()
- 当前来源分组的积压输入量。该服务会汇总所有分组的积压输入量。getTotalBacklogBytes()
- 所有分组中的全局积压输入量。在某些情况下,无法计算每个分组的积压输入量,而是只能计算所有分组的积压输入量。 只需提供第一个分组(分组 ID 为“0”)的总积压输入量。
Apache Beam 代码库包含多个实现 UnboundedReader
类的自定义来源示例。
Python
有界来源
- 在
BoundedSource
子类中,实现estimate_size
方法。Dataflow 服务会使用estimate_size
计算要用于您的流水线的初始工作器数量。 - 在
RangeTracker
子类中,实现fraction_consumed
方法。Dataflow 服务会使用fraction_consumed
跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。
Go
有界来源
- 在
RangeTracker
中,实现GetProgress()
方法。Dataflow 服务会使用GetProgress
跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。
限制
- 在运行 Dataflow Prime 的作业中,横向自动扩缩功能在纵向自动扩缩期间以及之后 10 分钟内停用。如需了解详情,请参阅对横向自动扩缩的影响。
- 对于不使用 Dataflow Shuffle 的流水线,Dataflow 可能无法有效地缩减工作器,因为工作器可能已重排了存储在本地磁盘中的数据。
- 流式自动扩缩功能不支持 PeriodicImpulse 转换。如果流水线使用
PeriodicImpulse
,则 Dataflow 工作器不会按预期纵向缩容。