Dynamic work rebalancing

The Dynamic Work Rebalancing feature of the Dataflow service allows the service to dynamically repartition work based on runtime conditions. These conditions might include the following:

  • Imbalances in work assignments
  • Workers taking longer than expected to finish
  • Workers finishing faster than expected

The Dataflow service automatically detects these conditions and can dynamically assign work to unused or underused workers to decrease the overall processing time of your job.

Limitations

Dynamic work rebalancing only happens when the Dataflow service is processing some input data in parallel: when reading data from an external input source, when working with a materialized intermediate PCollection, or when working with the result of an aggregation like GroupByKey. If a large number of steps in your job are fused, your job has fewer intermediate PCollections, and dynamic work rebalancing is limited to the number of elements in the source materialized PCollection. If you want to ensure that dynamic work rebalancing can be applied to a particular PCollection in your pipeline, you can prevent fusion in a few different ways to ensure dynamic parallelism.

Dynamic work rebalancing cannot reparallelize data finer than a single record. If your data contains individual records that cause large delays in processing time, they might still delay your job. Dataflow can't subdivide and redistribute an individual "hot" record to multiple workers.

Java

If you set a fixed number of shards for the final output of your pipeline (for example, by writing data using TextIO.Write.withNumShards), Dataflow limits parallelization based on the number of shards that you choose.

Python

If you set a fixed number of shards for the final output of your pipeline (for example, by writing data using beam.io.WriteToText(..., num_shards=...)), Dataflow limits parallelization based on the number of shards that you choose.

Go

If you set a fixed number of shards for the final output of your pipeline, Dataflow limits parallelization based on the number of shards that you choose.

Working with Custom Data Sources

Java

If your pipeline uses a custom data source that you provide, you must implement the method splitAtFraction to allow your source to work with the dynamic work rebalancing feature.

If you implement splitAtFraction incorrectly, records from your source might appear to get duplicated or dropped. See the API reference information on RangeTracker for help and tips on implementing splitAtFraction.

Python

If your pipeline uses a custom data source that you provide, your RangeTracker must implement try_claim, try_split, position_at_fraction, and fraction_consumed to allow your source to work with the dynamic work rebalancing feature.

See the API reference information on RangeTracker for more information.

Go

If your pipeline uses a custom data source that you provide, you must implement a valid RTracker to allow your source to work with the dynamic work rebalancing feature.

For more information, see the RTracker API reference information.

Dynamic work rebalancing uses the return value of the getProgress() method of your custom source to activate. The default implementation for getProgress() returns null. To ensure autoscaling activates, make sure your custom source overrides getProgress() to return an appropriate value.