动态线程扩缩

动态线程扩缩是 Dataflow 的纵向扩缩功能套件的一部分。它通过调整每个 Dataflow 工作器运行的并行任务数量(也称为捆绑包)对 Dataflow 的横向自动扩缩功能进行补充。目标是提高 Dataflow 流水线的整体效率。

当 Dataflow 运行流水线时,系统会在多个 Compute Engine 虚拟机 (VM) 之间进行分布式处理,也称为工作器。线程是在较大的进程中运行的单个可执行任务。Dataflow 会在每个工作器上启动多个线程。

启用动态线程扩缩后,Dataflow 服务会自动选择要在每个 Dataflow 工作器上运行的适当数量的线程。由于每个线程运行一个任务,因此增加线程数可以在工作器上并行运行更多任务。将此功能与横向自动扩缩功能搭配使用时,流水线使用的线程总数保持不变,但使用的工作器更少。

动态线程扩缩使用算法,根据流水线执行期间生成的资源利用率信号来确定每个工作器所需的线程数。如需了解详情,请参阅本页面中的工作原理部分。

优势

动态线程扩缩具有以下潜在优势。

  • 通过提高每个工作器的 CPU 和内存利用率,可让 Dataflow 工作器更高效地处理数据。
  • 通过调整可用于在流水线执行期间并行运行任务的工作器线程数来改进并行处理。
  • 减少处理大型数据集所需的工作器数量,从而降低费用。

支持和限制

  • 动态线程扩缩适用于使用 Java、Python 和 Go SDK 的流水线。
  • Dataflow 作业必须使用 Runner v2
  • 仅支持批处理流水线。
  • CPU 密集型或内存密集型流水线可能无法从动态线程扩缩获益。
  • 动态线程扩缩不会缩短 Dataflow 作业完成所需的时间。

运作方式

动态线程扩缩使用自动调节原则来动态扩容或缩容 Dataflow 工作器池中每个工作器上的线程数。每个工作器上的线程数会独立扩缩。每个线程运行一个任务。增加线程数可以在工作器上并行运行更多任务。当任务完成且不再需要线程时,线程数会缩容。算法可确定每个工作器所需的线程数。

当同时满足以下两个条件时,工作器上的线程数会扩容:

  • 工作器上的内存利用率低于 50%。
  • 工作器上的 CPU 利用率低于 65%。

当满足以下条件时,工作器上的线程数会缩容:

  • 工作器的内存利用率超过 70%。

如需查看作业的内存和 CPU 利用率,请使用 Dataflow 网页界面的作业指标标签页。

为确保建议有效,Dataflow 会等待资源利用率达到稳定状态,然后再将建议发送到工作器。例如,内存和 CPU 利用率可能在扩缩范围内,但由于资源利用率仍在增加,因此 Dataflow 不会发送建议。当资源利用率稳定后,Dataflow 会发送建议。

如果发生内存不足 (OOM) 错误,则线程扩缩会自动停用,并且流水线会在没有动态线程扩缩的情况下运行。

启用动态线程扩缩

如需启用动态线程扩缩,请使用以下 Dataflow 服务选项

Java

--dataflowServiceOptions=enable_dynamic_thread_scaling

Python

--dataflow_service_options=enable_dynamic_thread_scaling

Go

--dataflow_service_options=enable_dynamic_thread_scaling

启用动态线程扩缩后,您还可以设置执行期间可用于流水线的初始工作器数量和最大数量。如需了解详情,请参阅流水线选项

验证动态线程扩缩是否已启用

启用动态线程扩缩后,工作器日志文件中会显示以下消息:

Enabling thread vertical scaling feature in worker.

如需查看工作器日志文件,请在 Logs Explorer 中使用查询窗格日志名称过滤日志。在过滤条件中使用以下日志名称:

projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness

您可以在工作器日志文件中查看建议的线程数。以下消息包含建议的线程数:

worker_thread_scaling_report_response { recommended_thread_count: NUMBER }

如果资源利用率不在扩缩范围内,则显示的值等于工作器上的 vCPU 数量。

您还可以使用 Google Cloud 控制台查看是否已启用动态线程扩缩。如果已启用,则在 Dataflow 作业信息面板上的流水线选项部分的 dataflowServiceOptions 行中会显示 enable_dynamic_thread_scaling

问题排查

本部分介绍了如何排查与动态线程扩缩相关的常见问题。

启用动态线程扩缩后性能下降

在以下情况下,增加线程数可能会导致性能问题:

  • 当多个进程尝试使用同一资源时,一个进程可以使用该资源,而其他进程必须等待。这种情况称为资源争用。发生资源争用时,流水线性能可能会下降。
  • 发生内存不足错误时,动态线程扩缩会停用。在某些情况下,内存不足错误可能会导致流水线失败。

验证线程数是否已增加。如需了解如何验证建议的线程数,请参阅本页面上的验证线程扩缩是否已启用

如果线程扩缩已启用,则如需解决此问题,请在运行流水线时不要添加动态线程扩缩服务选项。

统一工作器...已启用和已停用

启用动态线程扩缩后,您的作业可能会失败,并显示以下错误:

The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.

当 Runner v2 明确被停用时,会发生此错误。

如需解决此问题,请启用 Runner v2。如需了解详情,请参阅“使用 Dataflow Runner V2”页面中的启用 Dataflow Runner v2 部分。

升级 SDK

启用动态线程扩缩后,您的作业可能会失败,并显示以下错误:

Java

Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.

Python

Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.

当 Runner v2 因 SDK 版本不支持而无法启用时,会发生此错误。

如需要解决此问题,请使用支持 Runner v2 的 SDK 版本。

无法启用线程纵向扩缩功能

启用动态线程扩缩后,您的作业可能会失败,并显示以下错误:

The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.

当流水线使用 numberOfWorkerHarnessThreadsnumber_of_worker_harness_threads 流水线选项明确设置每个工作器的线程数时,会发生此错误。

如需解决此问题,请从流水线中移除 numberOfWorkerHarnessThreadsnumber_of_worker_harness_threads 流水线选项。