Dynamic Thread Scaling

Dynamic Thread Scaling は、Dataflow の垂直スケーリング機能のスイートの一部です。Dataflow の水平自動スケーリング機能を並列タスクの数の調整により補完するもので、各 Dataflow ワーカーが実行する バンドルとしても知られています。これは、Dataflow パイプライン全体の効率性を高めることを目標としています。

Dataflow がパイプラインを実行すると、処理は複数の Compute Engine 仮想マシン(VM)に分散されます。これらの仮想マシンはワーカーとしても知られています。スレッドは、より大きなプロセス内で実行される単一の実行可能タスクです。Dataflow は各ワーカーで複数のスレッドを起動します。

Dynamic Thread Scaling を有効にすると、Dataflow サービスは各 Dataflow ワーカー上で実行する適切な数のスレッドを自動的に選択します。各スレッドはタスクを実行するため、スレッド数を増やすと、ワーカーでより多くのタスクを同時に実行できます。水平自動スケーリング機能でこの機能を使用する場合、パイプラインで使用されるスレッドの合計数は変わりませんが、使用されるワーカー数は少なくなります。

Dynamic Thread Scaling では、パイプラインの実行中に生成されたリソース使用率のシグナルに基づいて、各ワーカーが必要とするスレッド数を決定するアルゴリズムを使用します。詳細については、このページの仕組みをご覧ください。

利点

Dynamic Thread Scaling には、次のような潜在的な利点があります。

  • ワーカーごとの CPU とメモリの使用率を改善することで、Dataflow ワーカーがデータをより効率的に処理できるようになります。
  • パイプラインの実行中にタスクを並行して実行できるワーカー スレッドの数を調整することで、並列処理を向上させます。
  • 大規模なデータセットの処理に必要なワーカーの数を削減できるため、コストを削減できます。

サポートと制限事項

  • Dynamic Thread Scaling は、Java、Python、Go SDK を使用するパイプラインで使用できます。
  • Dataflow ジョブは、Runner v2 を使用する必要があります。
  • バッチ パイプラインのみがサポートされています。
  • CPU またはメモリを大量に消費するパイプラインでは、Dynamic Thread Scaling のメリットが得られない可能性があります。
  • Dynamic Thread Scaling は、Dataflow ジョブの完了にかかる時間を短縮しません。

仕組み

Dynamic Thread Scaling では、自動チューニングの原則を使用して、Dataflow ワーカープール内の各ワーカーでスレッド数を動的にスケールアップまたはスケールダウンします。スレッド数は、ワーカーごとに個別にスケーリングされます。各スレッドがタスクを実行します。スレッド数を増やすと、ワーカーでより多くのタスクを並列実行できます。タスクが完了し、スレッドが不要になると、スレッド数はスケールダウンされます。各ワーカーに必要なスレッド数は、アルゴリズムによって決定します。

次の条件が両方とも満たされると、ワーカーのスレッド数は vCPU あたり最大 2 つのスレッドにスケールアップされます。

  • ワーカーのメモリ使用率が 50% 未満の場合。
  • ワーカーの CPU 使用率が 65% 未満の場合。

次の条件が満たされると、ワーカーのスレッド数は vCPU あたり最小 1 スレッドにスケールダウンされます。

  • ワーカーのメモリ使用率が 70% を超えている場合。

ジョブのメモリと CPU の使用率を確認するには、Dataflow ウェブ インターフェースの [ジョブの指標] タブを使用します。

有効な推奨事項を提供するため、Dataflow は、リソースの使用率が安定してからワーカーに推奨事項を送信します。たとえば、メモリと CPU の使用率がスケーリングの範囲に入っていても、リソース使用率がまだ増加している場合、Dataflow は推奨事項を送信しません。リソース使用率が安定すると、Dataflow は推奨事項を送信します。

メモリ不足(OOM)エラーが発生すると、スレッド スケーリングは自動的に無効になり、パイプラインは vCPU ごとに 1 つのスレッドで実行されます。

Dynamic Thread Scaling を有効にする

Dynamic Thread Scaling を有効にするには、次の Dataflow サービス オプションを使用します。

Java

--dataflowServiceOptions=enable_dynamic_thread_scaling

Python

--dataflow_service_options=enable_dynamic_thread_scaling

Go

--dataflow_service_options=enable_dynamic_thread_scaling

Dynamic Thread Scaling が有効になっている場合は、実行中にパイプラインで使用できるワーカーの初期数と最大数を設定することもできます。詳細については、パイプライン オプションをご覧ください。

Dynamic Thread Scaling が有効になっていることを確認する

Dynamic Thread Scaling が有効になっている場合、次のメッセージがワーカー ログファイルに表示されます。

Enabling thread vertical scaling feature in worker.

ワーカー ログファイルを表示するには、ログ エクスプローラ[クエリ] ペインを使用して、ログ名でログをフィルタします。フィルタで次のログ名を使用します。

projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness

ワーカー ログファイルに推奨されるスレッド数を確認できます。次のメッセージには、推奨されるスレッド数が含まれています。

worker_thread_scaling_report_response { recommended_thread_count: NUMBER }

リソース使用率がスケーリングの範囲にない場合、表示される値はワーカーの vCPU 数と同じです。

Dynamic Thread Scaling が有効どうかは、Google Cloud コンソールでも確認できます。有効になっている場合、Dataflow の [ジョブ情報] パネルの [パイプライン オプション] セクションで dataflowServiceOptions 行に enable_dynamic_thread_scaling が表示されています。

トラブルシューティング

このセクションでは、Dynamic Thread Scaling に関連する一般的な問題のトラブルシューティングについて説明します。

Dynamic Thread Scaling を有効にするとパフォーマンスが低下する

次のような場合、スレッド数を増やすとパフォーマンスの問題が発生する可能性があります。

  • 複数のプロセスで同じリソースを使用しようとすると、1 つのプロセスでリソースを使用できますが、他のプロセスでは待機する必要があります。この状況をリソース競合といいます。リソース競合が発生すると、パイプラインのパフォーマンスが低下する可能性があります。
  • メモリ不足エラーが発生すると、Dynamic Thread Scaling は無効になります。メモリ不足エラーが原因でパイプラインが失敗することもあります。

スレッド数の増加を確認します。推奨スレッド数を確認する方法については、このページのスレッド スケーリングが有効になっていることを確認するをご覧ください。

スレッド スケーリングが有効になっている場合は、問題を解決するため、パイプラインの実行時に Dynamic Thread Scaling サービス オプションを使用しないでください。

統合ワーカー ... 有効と無効

Dynamic Thread Scaling を有効にすると、次のエラーでジョブが失敗することがあります。

The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.

このエラーは、Runner v2 が明示的に無効にされた場合に発生します。

この問題を解決するには、Runner v2 を有効にしてください。詳細については、「Dataflow Runner V2 を使用する」の Dataflow Runner v2 を有効にするセクションをご覧ください。

SDK をアップグレードする

Dynamic Thread Scaling を有効にすると、次のエラーでジョブが失敗することがあります。

Java

Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.

Python

Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.

このエラーは、SDK バージョンが Runner v2 をサポートしていないため、Runner v2 を有効にできないときに発生します。

この問題を解決するには、Runner v2 をサポートする SDK バージョンを使用します。

スレッドの垂直スケーリング機能を有効にできない

Dynamic Thread Scaling を有効にすると、次のエラーでジョブが失敗することがあります。

The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.

このエラーは、パイプラインが numberOfWorkerHarnessThreads または number_of_worker_harness_threads パイプライン オプションを使用してワーカーあたりのスレッド数を明示的に設定した場合に発生します。

この問題を解決するには、パイプラインから numberOfWorkerHarnessThreads または number_of_worker_harness_threads パイプライン オプションを削除します。