水平自動スケーリングを使用すると、Dataflow はジョブに適切な数のワーカー インスタンスを選択し、必要に応じてワーカーの追加または削除を行うことができます。Dataflow は、ワーカーの平均 CPU 使用率とパイプラインの並列処理に基づいてスケーリングを行います。パイプラインの並列処理とは、特定の時点でデータを最も効率的に処理するために必要なスレッド数の見積もりです。
水平自動スケーリングは、バッチ パイプラインとストリーミング パイプラインの両方でサポートされています。
バッチ自動スケーリング
水平自動スケーリングは、すべてのバッチ パイプラインでデフォルトで有効になっています。Dataflow は、パイプラインの各ステージでの推定合計作業量に基づいてワーカー数を自動的に選択します。この推定値は、入力サイズと現在のスループットによって異なります。Dataflow は 30 秒ごとに実行の進行状況に応じて作業量を再評価します。推定合計作業量が増減すると、Dataflow はワーカー数を動的にスケールアップまたはスケールダウンします。
ワーカー数は作業量に比例しません。たとえば、ジョブで作業量が 2 倍になっても、ワーカー数が 2 倍になるとは限りません。
次のいずれかの条件が発生した場合、Dataflow はアイドル状態のリソースを節約するために、ワーカー数を維持するか減らします。
- ワーカーの平均 CPU 使用率が 5% 未満の場合。
- 並列処理が、圧縮ファイルや分割されない I/O モジュールによって発生する分割不能なデータなど、並列化できない作業によって制限される場合。
- 並列度が一定の場合(Cloud Storage 内の既存のファイルに書き込む場合など)。
ワーカー数に上限を設定するには、--maxNumWorkers
パイプライン オプションを設定します。デフォルト値は 2,000
です。
ワーカー数の下限を設定するには、--min_num_workers
サービス オプションを設定します。これらのフラグは省略可能です。
ストリーミング自動スケーリング
ストリーミング ジョブの場合、水平自動スケーリングを使用すると、Dataflow は負荷とリソース使用率の変化に応じてワーカー数を適宜変更します。
Streaming Engine を使用するストリーミング ジョブでは、水平自動スケーリングがデフォルトで有効になります。Streaming Engine を使用しないストリーミング ジョブで水平自動スケーリングを有効にするには、パイプラインの起動時に次のパイプライン オプションを設定します。
Java
--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=MAX_WORKERS
MAX_WORKERS は、ワーカー インスタンスの最大数に置き換えます。
Python
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
MAX_WORKERS は、ワーカー インスタンスの最大数に置き換えます。
Go
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
MAX_WORKERS は、ワーカー インスタンスの最大数に置き換えます。
ワーカー数の下限を設定するには、--min_num_workers
サービス オプションを設定します。この値を設定すると、水平自動スケーリングは、指定されたワーカー数を下回ってスケーリングされません。このフラグは省略可能です。
水平自動スケーリングを無効にする
水平自動スケーリングを無効にするには、ジョブの実行時に次のパイプライン オプションを設定します。
Java
--autoscalingAlgorithm=NONE
水平自動スケーリングを無効にすると、Dataflow は --numWorkers
オプションに基づいてワーカー数を設定します。
Python
--autoscaling_algorithm=NONE
水平自動スケーリングを無効にすると、Dataflow は --num_workers
オプションに基づいてワーカー数を設定します。
Go
--autoscaling_algorithm=NONE
水平自動スケーリングを無効にすると、Dataflow は --num_workers
オプションに基づいてワーカー数を設定します。
カスタムソース
カスタム データソースを作成する場合、より多くの情報を提供するメソッドを水平自動スケーリング アルゴリズムに実装することで、パフォーマンスを改善できる可能性があります。
Java
制限のあるソース
BoundedSource
サブクラスで、メソッドgetEstimatedSizeBytes
を実装します。Dataflow サービスでは、パイプラインに使用するワーカーの初期数を計算するときに、getEstimatedSizeBytes
が使用されます。BoundedReader
サブクラスで、メソッドgetFractionConsumed
を実装します。Dataflow サービスは、getFractionConsumed
を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。
制限のないソース
ソースは、バックログについて Dataflow サービスに通知する必要があります。バックログは、ソースによってまだ処理されていない入力の推定値(バイト単位)です。バックログについてサービスに通知するには、UnboundedReader
クラスに次のメソッドのいずれかを実装します。
getSplitBacklogBytes()
- ソースの現在のスプリットのバックログ。サービスで、すべてのスプリットのバックログを集約します。getTotalBacklogBytes()
- すべてのスプリットのグローバル バックログ。スプリットごとにバックログを取得できず、すべてのスプリットでのみバックログを計算できる場合があります。最初のスプリット(スプリット ID「0」)のみが合計バックログを提供する必要があります。
Apache Beam リポジトリには、UnboundedReader
クラスを実装するカスタムソースの例がいくつか含まれています。
Python
制限のあるソース
BoundedSource
サブクラスで、メソッドestimate_size
を実装します。Dataflow サービスでは、パイプラインに使用するワーカーの初期数を計算するときに、estimate_size
が使用されます。RangeTracker
サブクラスで、メソッドfraction_consumed
を実装します。Dataflow サービスは、fraction_consumed
を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。
Go
制限のあるソース
RangeTracker
で、メソッドGetProgress()
を実装します。Dataflow サービスは、GetProgress
を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。
制限事項
- Dataflow Prime を実行するジョブでは、水平自動スケーリングは垂直自動スケーリングの実行中か、終了から最長で 10 分後に無効になります。詳細については、水平自動スケーリングへの影響をご覧ください。
- Dataflow Shuffle を使用していないパイプラインの場合、ワーカーがローカル ディスクに格納されているデータをシャッフルしている可能性があるため、Dataflow はワーカーを効果的にスケールダウンできない可能性があります。
- PeriodicImpulse 変換は、Apache Beam SDK バージョン 2.60.0 以降において、ストリーミング自動スケーリングでサポートされています。パイプラインに以前の SDK バージョンの
PeriodicImpulse
が使用されている場合、Dataflow ワーカーは想定どおりにスケールダウンしません。