動的作業再調整

Dataflow サービスの動的作業再調整機能を使用すると、実行時の条件に基づいて処理を動的に再パーティショニングできます。これには、次のような条件があります。

  • 作業割り当ての不均衡
  • 終了に予想より長い時間がかかるワーカー
  • 予想よりも早く終了するワーカー

Dataflow サービスはこれらの状態を自動的に検出し、未使用のワーカーや十分に使用されていないワーカーに作業を動的に割り当て、ジョブの全体的な処理時間を短縮します。

制限事項

動的作業再調整は、Dataflow サービスが一部の入力データを並列に処理している場合にのみ行われます。データを外部入力ソースから読み取っている場合、実体化された中間 PCollection を操作している場合、または GroupByKey などの集約の結果を操作している場合です。ジョブの多くの手順が融合される場合、ジョブの中間 PCollection は少なくなり、動的作業再調整は、ソースの実体化された PCollection 内の要素数に限定されます。パイプラインで特定の PCollection に動的作業再調整が確実に適用されるようにする場合、動的並列処理を保証するいくつかの異なる方法で融合を防止できます。

動的作業再調整は、単一のレコードよりも細かくデータを再並列化できません。データに処理時間の大幅な遅延を引き起こす個別レコードが含まれている場合は、それらによってジョブが遅延する可能性が残ります。Dataflow は、個々の「ホット」レコードを分割して複数のワーカーに再分散できません。

Java

パイプラインの最終出力に固定数のシャードを設定した場合(TextIO.Write.withNumShards を使用してデータを書き込むことなどにより)、Dataflow は選択したシャード数に基づいて並列化を制限します。

Python

パイプラインの最終出力に固定数のシャードを設定した場合(beam.io.WriteToText(..., num_shards=...) を使用してデータを書き込むことなどにより)、Dataflow は選択したシャード数に基づいて並列化を制限します。

Go

パイプラインの最終出力に固定数のシャードを設定した場合は、Dataflow により、選択したシャード数に基づいて並列化が制限されます。

カスタム データソースの操作

Java

自分が提供するカスタム データソースをパイプラインで使用する場合は、splitAtFraction メソッドを実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

splitAtFraction の実装を誤ると、ソースからのレコードが重複して表示されることや、失われることがあります。splitAtFraction の実装のヘルプとヒントについては、RangeTracker の API リファレンス情報をご覧ください。

Python

自分が提供するカスタム データソースをパイプラインで使用する場合は、RangeTrackertry_claimtry_splitposition_at_fractionfraction_consumed を実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

詳しくは、RangeTracker の API リファレンス情報をご覧ください。

Go

自分が提供するカスタム データソースをパイプラインで使用する場合は、有効な RTracker を実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

詳細については、RTracker API リファレンスの情報をご覧ください。

動的作業再調整では、アクティブ化にカスタムソースの getProgress() メソッドの戻り値が使用されます。getProgress() のデフォルト実装では null が返されます。自動スケーリングがアクティブになるように、カスタムソースで getProgress() をオーバーライドして適切な値を返すことを確認してください。