Deploying a Pipeline

Once you have constructed and tested your Dataflow pipeline, you can use the Cloud Dataflow managed service to deploy and execute it. Once on the Dataflow service, your pipeline code becomes a Dataflow job.

The Dataflow service fully manages Google Cloud Platform services such as Google Compute Engine and Google Cloud Storage to run your Dataflow job, automatically spinning up and tearing down the necessary resources. The Dataflow service provides visibilty into your job through tools like the Dataflow Monitoring Interface and the Dataflow Command-line Interface.

You can control some aspects of how the Dataflow service runs your job by setting execution parameters in your pipeline code.

In addition to managing Cloud Platform resources, the Dataflow service automatically performs and optimizes many aspects of distributed parallel processing. These include:

  • Parallelization and Distribution. Dataflow automatically partitions your data and distributes your worker code to Compute Engine instances for parallel processing.
  • Optimization. Dataflow uses your pipeline code to create an execution graph that represents your pipeline's PCollections and transforms, and optimizes the graph for the most efficient performance and resource usage. Dataflow also automatically optimizes potentially costly operations, such as data aggregations.
  • Automatic Tuning features. The Dataflow service includes several features that provide on-the-fly adjustment of resource allocation and data partitioning, such as Autoscaling and Dynamic Work Rebalancing. These features help the Dataflow service execute your job as quickly and efficiently as possible.

Pipeline Lifecycle: From Pipeline Code to Dataflow Job

When you run your Dataflow program, Dataflow creates an execution graph from the code that constructs your Pipeline object, including all of the transforms and their associated processing functions (such as DoFns). This phase is called Graph Construction Time. During graph construction, Dataflow checks for various errors and ensures that the your pipeline graph doesn't contain any illegal operations. The execution graph is translated into JSON format, and the JSON execution graph is transmitted to the Dataflow service endpoint.

Note: Graph construction also happens when you execute your pipeline locally, but the graph is not translated to JSON or transmitted to the service. Instead, the graph is run locally on the same machine where you launched your Dataflow program. See the documentation on configuring for local execution for more details.

The Cloud Dataflow service then validates the JSON execution graph. When the graph is validated, it becomes a job on the Dataflow service. You'll be able to see your job, its execution graph, status, and log information by using the Dataflow Monitoring Interface.

Java: SDK 1.x

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object DataflowPipelineJob, which contains your Datafow job's jobId. You can use the jobId to monitor, track, and troubleshoot your job using the Dataflow Monitoring Interface and the Dataflow Command-line Interface. See the API reference information for DataflowPipelineJob for more information.

Java: SDK 2.x

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object DataflowPipelineJob, which contains your Datafow job's jobId. You can use the jobId to monitor, track, and troubleshoot your job using the Dataflow Monitoring Interface and the Dataflow Command-line Interface. See the API reference information for DataflowPipelineJob for more information.

Python

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object DataflowPipelineResult, which contains your Datafow job's job_id. You can use the job_id to monitor, track, and troubleshoot your job using the Dataflow Monitoring Interface and the Dataflow Command-line Interface.

Execution Graph

Dataflow builds a graph of steps that represents your pipeline, based on the transforms and data you used when you constructed your Pipeline object. This is the pipeline execution graph.

The WordCount Example Program included with the Cloud Dataflow SDKs contains a series of transforms to read, extract, count, format, and write the individual words in a collection of text, along with an occurrence count for each word. The following diagram shows how the transforms in the WordCount pipeline are expanded into an execution graph:

The transforms in the WordCount example program expanded into an execution graph
              of steps to be executed by the Cloud Dataflow service.
Figure 1: WordCount Example Execution Graph

The execution graph often differs from the order in which you specified your transforms when you constructed the pipeline. This is because the Dataflow service performs various optimizations and fusions on the execution graph before it runs on managed cloud resources. The Dataflow service respects data dependencies when executing your pipeline; however, steps without data dependencies between them may be executed in any order.

You can see the unoptimized execution graph that Dataflow has generated for your pipeline when you select your job in the Dataflow Monitoring Interface.

Resource Usage and Management

The Dataflow service fully manages resources in Google Cloud Platform on a per-job basis. This includes spinning up and shutting down Google Compute Engine instances (occasionally referred to as workers or VMs) and accessing your project's Google Cloud Storage buckets for both I/O and temporary file staging. However, if your pipeline interacts with Cloud Platform data storage technologies like Google Cloud BigQuery and Google Cloud Pub/Sub, you must manage the resources and quota for those services.

Dataflow uses a user provided location in Google Cloud Storage specifically for staging files. This location is under your control, and you should ensure that the location's lifetime is maintained as long as any job is reading from it. You can re-use the same staging location for multiple job runs, as the SDK's built-in caching can speed up the start time for your jobs.

Jobs

You may run up to 25 concurrent Dataflow jobs per Cloud Platform project.

The Dataflow service is currently limited to processing job requests that are 10MB in size or smaller. The size of the job request is specifically tied to the JSON representation of your pipeline; a larger pipeline means a larger request.

To estimate the size of your pipeline's JSON request, run your pipeline with the following option:

Java: SDK 1.x

--dataflowJobFile=< path to output file >

Java: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

This command writes a JSON representation of your job to a file. The size of the serialized file is a good estimate of the size of the request; the actual size will be slightly larger due to some additional information included in the request.

Workers

The Dataflow service currently allows a maximum of 1000 Compute Engine instances per job. The default machine type is n1-standard-1 for a batch job, and n1-standard-4 for streaming; when using the default machine types, the Dataflow service can therefore allocate up to 4000 cores per job.

Dataflow supports n1series workers as well as custom machine types. You can specify a machine type for your pipeline by setting the appropriate execution parameter at pipeline creation time.

Java: SDK 1.x

To change the machine type, set the --workerMachineType option.

Java: SDK 2.x

To change the machine type, set the --workerMachineType option.

Python

To change the machine type, set the --worker_machine_type option.

Resource Quota

The Dataflow service checks to ensure that your Cloud Platform project has the Compute Engine resource quota required to run your job, both to start the job and scale to the maximum number of worker instances. Your job will fail to start if there is not enough resource quota available.

Dataflow's Autoscaling feature is limited by your project's available Compute Engine quota. If your job has sufficient quota when it starts, but another job uses the remainder of your project's available quota, the first job will run but not be able to fully scale.

However, the Dataflow service does not manage quota increases for jobs that exceed the resource quotas in your project. You are responsible for making any necessary requests for additional resource quota, for which you can use the Google Cloud Platform Console.

Persistent Disk Resources

The Dataflow service is currently limited to 15 persistent disks per worker instance when running a streaming job. Each persistent disk is local to an individual Compute Engine virtual machine. Your job may not have more workers than persistent disks; a 1:1 ratio between workers and disks is the minimum resource allotment.

The default size of each persistent disk is 250 GB in batch mode and 400 GB in streaming mode.

Locations

The Dataflow service deploys Compute Engine resources in the zone us-central1-f by default. You can override this setting by specifying the --zone option when you create your pipeline.

Parallelization and Distribution

The Dataflow service automatically parallelizes and distributes the processing logic in your pipeline to the workers you've allotted to perform your job. Dataflow uses the abstractions in the programming model to represent parallel processing functions; for example, your ParDo transforms cause Dataflow to automatically distribute your processing code (represented by DoFns) to multiple workers to be run in parallel.

Structuring your User Code

You can think of your DoFn code as small, independent entities: there can potentially be many instances running on different machines, each with no knowledge of the others. As such, pure functions (functions that do not depend on hidden or external state, that have no observable side effects, and are deterministic) are ideal code for the parallel and distributed nature of DoFns.

The pure function model is not strictly rigid, however; state information or external initialization data can be valid for DoFn and other function objects, so long as your code does not depend on things that the Dataflow service does not guarantee. When structring your ParDo transforms and creating your DoFns, keep the following guidelines in mind:

  • The Dataflow service guarantees that every element in your input PCollection is processed by a DoFn instance exactly once.
  • The Dataflow service does not guarantee how many times a DoFn will be invoked.
  • The Dataflow service does not guarantee exactly how the distributed elements are grouped—that is, it does not guarantee which (if any) elements are processed together.
  • The Dataflow service does not guarantee the exact number of DoFn instances that will be created over the course of a pipeline.
  • The Dataflow service is fault-tolerant, and may retry your code multiple times in the case of worker issues. The Dataflow service may create backup copies of your code, and can have issues with manual side effects (such as if your code relies upon or creates temporary files with non-unique names).
  • The Dataflow service serializes element processing per DoFn instance. Your code does not need to be strictly thread-safe; however, any state shared between multiple DoFn instances must be thread-safe.

See Requirements for User-Provided Function Objects in the programming model documentation for more information on building your user code.

Error and Exception Handling

Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.

Dataflow processes elements in arbitrary bundles, and will retry the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Fusion Optimization

Once the JSON form of your pipeline's execution graph has been validated, the Dataflow service may modify the graph to perform optimizations. Such optimizations can include fusing multiple steps or transforms in your pipeline's execution graph into single steps. Fusing steps prevents the Dataflow service from needing to materialize every intermediate PCollection in in your pipeline, which can be costly in terms of memory and processing overhead.

While all the transforms you've specified in your pipeline construction are executed on the service, they may be executed in a different order, or as part of a larger fused transform to ensure the most efficient execution of your pipeline. The Dataflow service will respect data dependencies between the steps in the execution graph, but otherwise steps may be executed in any order.

Fusion Example

The following diagram shows how the execution graph from the WordCount Example program included with the Dataflow SDK for Java might be optimized and fused by the Dataflow service for efficient execution:

The execution graph for the WordCount example program optimized and with steps fused
              by the Cloud Dataflow service.
Figure 2: WordCount Example Optimized Execution Graph

Preventing Fusion

There are a few cases in your pipeline where you may want to prevent the Dataflow service from performing fusion optimizations. These are cases in which the Dataflow service might incorrectly guess the optimal way to fuse operations in the pipeline, which could limit the Dataflow service's ability to make use of all available workers.

For example, one case in which fusion can limit Dataflow's ability to optimize worker usage is a "high fan-out" ParDo. In such an operation, you might have an input collection with relatively few elements, but the ParDo produces an output with hundreds or thousands of times as many elements, followed by another ParDo. If the Dataflow service fuses these ParDo operations together, parallelism in this step is limited to at most the number of items in the input collection, even though the intermediate PCollection contains many more elements.

You can prevent such a fusion by adding an operation to your pipeline that forces the Dataflow service to materialize your intermediate PCollection. Consider using one of the following operations:

  • You can insert a GroupByKey and ungroup after your first ParDo. The Dataflow service never fuses ParDo operations across an aggregation.
  • You can pass your intermediate PCollection as a side input to another ParDo. The Dataflow service always materializes side inputs.

Combine Optimization

Aggregation operations are an important concept in large-scale data processing. Aggregation brings together data that's conceptually far apart, making it extremely useful for correlating. The Dataflow programming model represents aggregation operations as the GroupByKey, CoGroupByKey, and Combine transforms.

Dataflow's aggregation operations combine data across the entire data set, including data that may be spread across multiple workers. During such aggregation operations, it's often most efficient to combine as much data locally as possible before combining data across instances. When you apply a GroupByKey or other aggregating transform, the Dataflow service automatically performs partial combining locally before the main grouping operation.

When performing partial or multi-level combining, the Dataflow service makes different decisions based on whether your pipeline is working with batch or streaming data. For bounded data, the service favors efficiency and will perform as much local combining as possible. For unbounded data, the service favors lower latency, and may not perform partial combining (as it may increase latency).

Shuffle Optimization

Java: SDK 1.x

Shuffle is the base operation behind Cloud Dataflow transforms such as GroupByKey, CoGroupByKey and Combine. The Shuffle operation partitions and groups data by key in a scalable, efficient, fault-tolerant manner. Currently, Cloud Dataflow uses a shuffle implementation which runs entirely on worker virtual machines and consumes worker CPU, memory, and Persistent Disk storage. The new, service-based Dataflow Shuffle implementation, available in batch pipelines only, moves the shuffle operation out of the worker VMs and into the Cloud Dataflow service backend.

The benefits of the new service-based Dataflow Shuffle include faster execution time of batch pipelines for the majority of pipeline job types and a reduction in consumed CPU, memory, and Persistent Disk storage resources on the worker VMs. Furthermore, you can expect better autoscaling since VMs no longer hold any shuffle data and can therefore be scaled down earlier. Most of the reduction in worker resources comes from offloading the shuffle work to the Dataflow service. For that reason, there is a charge associated with the use of Dataflow Shuffle. However, the total bill for Dataflow pipelines using the service-based Shuffle implementation is expected to be less than or equal to the cost of Dataflow piplines not using this option.

When using service-based Dataflow Shuffle, you do not have to attach large Persistent Disks to your worker VMs (Dataflow will still automatically attach a small 25GB boot disk). Also, Dataflow Shuffle offers better fault tolerance; an unhealthy VM holding Shuffle data will not cause the entire job to fail, as would be the case if not using Shuffle.

Note: Service-based Dataflow Shuffle is currently available in beta in the us-central1 (Iowa) region only. It will become available in other regions in the future.

Service-based Dataflow Shuffle can be turned on in batch Dataflow jobs. To turn on service-based Dataflow Shuffle in your batch pipelines, specify the following parameter: --experiments=shuffle_mode=service.

Do not specify the --zone parameter if you want to use service-based Dataflow Shuffle. Dataflow will auto-select the best zone in the us-central1 region to run the Dataflow job in. If you specify a zone outside of the us-central1 region, along with the --experiments=shuffle_mode=service option, Dataflow will report an error.

Java: SDK 2.x

Shuffle is the base operation behind Cloud Dataflow transforms such as GroupByKey, CoGroupByKey and Combine. The Shuffle operation partitions and groups data by key in a scalable, efficient, fault-tolerant manner. Currently, Cloud Dataflow uses a shuffle implementation which runs entirely on worker virtual machines and consumes worker CPU, memory, and Persistent Disk storage. The new, service-based Dataflow Shuffle implementation, available in batch pipelines only, moves the shuffle operation out of the worker VMs and into the Cloud Dataflow service backend.

The benefits of the new service-based Dataflow Shuffle include faster execution time of batch pipelines for the majority of pipeline job types and a reduction in consumed CPU, memory, and Persistent Disk storage resources on the worker VMs. Furthermore, you can expect better autoscaling since VMs no longer hold any shuffle data and can therefore be scaled down earlier. Most of the reduction in worker resources comes from offloading the shuffle work to the Dataflow service. For that reason, there is a charge associated with the use of Dataflow Shuffle. However, the total bill for Dataflow pipelines using the service-based Shuffle implementation is expected to be less than or equal to the cost of Dataflow piplines not using this option.

When using service-based Dataflow Shuffle, you do not have to attach large Persistent Disks to your worker VMs (Dataflow will still automatically attach a small 25GB boot disk). Also, Dataflow Shuffle offers better fault tolerance; an unhealthy VM holding Shuffle data will not cause the entire job to fail, as would be the case if not using Shuffle.

Note: Service-based Dataflow Shuffle is currently available in beta in the us-central1 (Iowa) region only. It will become available in other regions in the future.

Service-based Dataflow Shuffle can be turned on in batch Dataflow jobs. To turn on service-based Dataflow Shuffle in your batch pipelines, specify the following parameter: --experiments=shuffle_mode=service.

Do not specify the --zone parameter if you want to use service-based Dataflow Shuffle. Dataflow will auto-select the best zone in the us-central1 region to run the Dataflow job in. If you specify a zone outside of the us-central1 region, along with the --experiments=shuffle_mode=service option, Dataflow will report an error.

Python

This feature is not yet supported in the Dataflow SDK for Python.

Autotuning Features

The Dataflow service contains several autotuning features that can further dynamically optimize your Dataflow job while it is running. These features include Autoscaling and Dynamic Work Rebalancing.

Autoscaling

With autoscaling enabled, the Dataflow service automatically chooses the appropriate number of worker instances required to run your job. The Dataflow service may also dynamically re-allocate more workers or fewer workers during runtime to account for the characteristics of your job. Certain parts of your pipeline may be computationally heavier than others, and the Dataflow service may automatically spin up additional workers during these phases of your job (and shut them down when they're no longer needed).

Java: SDK 1.x

Autoscaling is enabled by default on all batch Dataflow jobs created using the Dataflow SDK for Java version 1.6.0 or higher. You can explicitly disable autoscaling by specifying the option --autoscalingAlgorithm=NONE when you run your pipeline; if so, note that the Dataflow service sets the number of workers based on the --numWorkers option, which defaults to 3.

If your Dataflow job uses an earlier version of the SDK, you can enable autoscaling by specifying the option --autoscalingAlgorithm=THROUGHPUT_BASED when you run your pipeline.

Java: SDK 2.x

Autoscaling is enabled by default on all batch Dataflow jobs. You can explicitly disable autoscaling by specifying the option --autoscalingAlgorithm=NONE when you run your pipeline; if so, note that the Dataflow service sets the number of workers based on the --numWorkers option, which defaults to 3.

If your Dataflow job uses an earlier version of the SDK, you can enable autoscaling by specifying the option --autoscalingAlgorithm=THROUGHPUT_BASED when you run your pipeline.

Python

Autoscaling is enabled by default on all batch Dataflow jobs created using the Dataflow SDK for Python version 0.5.1 or higher. You can explicitly disable autoscaling by specifying the option --autoscaling_algorithm=NONE when you run your pipeline; if so, note that the Dataflow service sets the number of workers based on the --num_workers option, which defaults to 3.

If your Dataflow job uses an earlier version of the SDK, you can enable autoscaling by specifying the option --autoscaling_algorithm=THROUGHPUT_BASED when you run your pipeline.

Batch Autoscaling

For bounded data in batch mode, Dataflow automatically chooses the number of workers based on both the amount of work in each stage of your pipeline and the current throughput at that stage.

If your pipeline uses a custom data source that you've implemented, there are a few methods you can implement that provide more information to the Dataflow service's autoscaling algorithm and potentially improve performance:

Java: SDK 1.x

  • In your BoundedSource subclass, implement the method getEstimatedSizeBytes. The Dataflow service uses getEstimatedSizeBytes when calculating the initial number of workers to use for your pipeline.
  • In your BoundedReader subclass, implement the method getFractionConsumed. The Dataflow service uses getFractionConsumed to track read progress and converge on the correct number of workers to use during a read.

Java: SDK 2.x

  • In your BoundedSource subclass, implement the method getEstimatedSizeBytes. The Dataflow service uses getEstimatedSizeBytes when calculating the initial number of workers to use for your pipeline.
  • In your BoundedReader subclass, implement the method getFractionConsumed. The Dataflow service uses getFractionConsumed to track read progress and converge on the correct number of workers to use during a read.

Python

  • In your BoundedSource subclass, implement the method estimate_size. The Dataflow service uses estimate_size when calculating the initial number of workers to use for your pipeline.
  • In your RangeTracker subclass, implement the method fraction_consumed. The Dataflow service uses fraction_consumed to track read progress and converge on the correct number of workers to use during a read.

Streaming Autoscaling

Java: SDK 1.x

Streaming autoscaling allows the Dataflow service to adaptively change the number of workers used to execute your streaming pipeline in response to changes in load and resource utilization. Streaming autoscaling is a free feature and is designed to reduce the costs of the resources used when executing streaming pipelines.

Without autoscaling, you would choose a fixed number of workers (by specifying --numWorkers) to execute your pipeline. As the input workload varies over time, this number can become either too high or too low. Provisioning too many workers results in unnecessary extra cost, and provisioning too few workers results in higher latency for processed data. By enabling autoscaling, resources are used only as they are needed.

To make scaling decisions, autoscaling relies on several signals that assess how busy workers are and whether they can keep up with the input stream. Key signals include CPU utilization, throughput, and backlog. The objective is to minimize backlog while maximizing worker utilization and throughput, and quickly react to spikes in load. By enabling autoscaling, you don't have to choose between provisioning for peak load and fresh results. Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. This way, you’re paying only for what you need, and the job is processed as efficiently as possible.

If your pipeline uses a custom unbounded source, it is essential that the source informs the Dataflow service about backlog. Backlog is an estimate of the input in bytes that has not yet been processed by the source. To inform the service about backlog, implement either one of the two following methods in your UnboundedReader class:

  • getSplitBacklogBytes() - Backlog for the current split of the source. The service aggregates backlog across all the splits.
  • getTotalBacklogBytes() - The global backlog across all the splits. In some cases the backlog is not available for each split and can only be calculated across all the splits. Only the first split (split id ‘0’) needs to provide total backlog.
Enable Streaming Autoscaling

To enable autoscaling, set the following execution parameters when you start your pipeline:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

Autoscaling may oscillate between N/15 and N workers during the execution of a pipeline, where N is the value of --maxNumWorkers. For example, if your pipeline needs 3 or 4 workers in steady state, you could set --maxNumWorkers=15 and the pipeline will automatically scale between 1 and 15 workers.

Streaming pipelines are deployed with a fixed pool of persistent disks, equal in number to --maxNumWorkers. Take this into account when you specify --maxNumWorkers, and ensure this value is a sufficient number of disk(s) for your pipeline.

Currently, PubSubIO is the only source that supports autoscaling on streaming pipelines. All SDK-provided sinks are supported. In this Beta release, Autoscaling works smoothest when reading from Pub/Sub subscriptions tied to topics published with small batches and when writing to sinks with low latency. In extreme cases (i.e. Pub/Sub subscriptions with large publishing batches or sinks with very high latency), autoscaling is known to become coarse-grained. This will be improved in future releases.

Usage and Pricing

Google Compute Engine usage is based on the average number of workers, while persistent disk usage is based on the exact number of --maxNumWorkers. Persistent disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where --maxNumWorkers=15, you will pay for between 1 and 15 Compute Engine instances and exactly 15 persistent disks.

Java: SDK 2.x

Streaming autoscaling allows the Dataflow service to adaptively change the number of workers used to execute your streaming pipeline in response to changes in load and resource utilization. Streaming autoscaling is a free feature and is designed to reduce the costs of the resources used when executing streaming pipelines.

Without autoscaling, you would choose a fixed number of workers (by specifying --numWorkers) to execute your pipeline. As the input workload varies over time, this number can become either too high or too low. Provisioning too many workers results in unnecessary extra cost, and provisioning too few workers results in higher latency for processed data. By enabling autoscaling, resources are used only as they are needed.

To make scaling decisions, autoscaling relies on several signals that assess how busy workers are and whether they can keep up with the input stream. Key signals include CPU utilization, throughput, and backlog. The objective is to minimize backlog while maximizing worker utilization and throughput, and quickly react to spikes in load. By enabling autoscaling, you don't have to choose between provisioning for peak load and fresh results. Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. This way, you’re paying only for what you need, and the job is processed as efficiently as possible.

If your pipeline uses a custom unbounded source, it is essential that the source informs the Dataflow service about backlog. Backlog is an estimate of the input in bytes that has not yet been processed by the source. To inform the service about backlog, implement either one of the two following methods in your UnboundedReader class:

  • getSplitBacklogBytes() - Backlog for the current split of the source. The service aggregates backlog across all the splits.
  • getTotalBacklogBytes() - The global backlog across all the splits. In some cases the backlog is not available for each split and can only be calculated across all the splits. Only the first split (split id ‘0’) needs to provide total backlog.
Enable Streaming Autoscaling

To enable autoscaling, set the following execution parameters when you start your pipeline:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

Autoscaling may oscillate between N/15 and N workers during the execution of a pipeline, where N is the value of --maxNumWorkers. For example, if your pipeline needs 3 or 4 workers in steady state, you could set --maxNumWorkers=15 and the pipeline will automatically scale between 1 and 15 workers.

Streaming pipelines are deployed with a fixed pool of persistent disks, equal in number to --maxNumWorkers. Take this into account when you specify --maxNumWorkers, and ensure this value is a sufficient number of disk(s) for your pipeline.

Currently, PubSubIO is the only source that supports autoscaling on streaming pipelines. All SDK-provided sinks are supported. In this Beta release, Autoscaling works smoothest when reading from Pub/Sub subscriptions tied to topics published with small batches and when writing to sinks with low latency. In extreme cases (i.e. Pub/Sub subscriptions with large publishing batches or sinks with very high latency), autoscaling is known to become coarse-grained. This will be improved in future releases.

Usage and Pricing

Google Compute Engine usage is based on the average number of workers, while persistent disk usage is based on the exact number of --maxNumWorkers. Persistent disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where --maxNumWorkers=15, you will pay for between 1 and 15 Compute Engine instances and exactly 15 persistent disks.

Python

This feature is not yet supported in the Dataflow SDK for Python.

Manually Scaling a Streaming Pipeline

Java: SDK 1.x

Until autoscaling is generally available in streaming mode, there is a workaround you can use to manually scale the number of workers running your streaming pipeline by using Dataflow's Update feature.

If you know you'll want to scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set --maxNumWorkers equal to the maximum number of workers you want available to your pipeline.
  • Set --numWorkers equal to the initial number of workers you want your pipeline to use when it starts running.

Once your pipeline is running, you can Update your pipeline and specify a new number of workers using the --numWorkers parameter. The value you set for the new --numWorkers must be between N and --maxNumWorkers, where N is equal to --maxNumWorkers / 15.

Update will replace your running job with a new job, using the new number of workers, while preserving all state information associated with the previous job.

Java: SDK 2.x

Until autoscaling is generally available in streaming mode, there is a workaround you can use to manually scale the number of workers running your streaming pipeline by using Dataflow's Update feature.

If you know you'll want to scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set --maxNumWorkers equal to the maximum number of workers you want available to your pipeline.
  • Set --numWorkers equal to the initial number of workers you want your pipeline to use when it starts running.

Once your pipeline is running, you can Update your pipeline and specify a new number of workers using the --numWorkers parameter. The value you set for the new --numWorkers must be between N and --maxNumWorkers, where N is equal to --maxNumWorkers / 15.

Update will replace your running job with a new job, using the new number of workers, while preserving all state information associated with the previous job.

Python

This feature is not yet supported in the Dataflow SDK for Python.

Dynamic Work Rebalancing

The Dataflow service's Dynamic Work Rebalancing feature allows the service to dynamically re-partition work based on runtime conditions. These conditions might include:

  • Imbalances in work assignments
  • Workers taking longer than expected to finish
  • Workers finishing faster than expected

The Dataflow service automatically detects these conditions and can dynamically reassign work to unused or underused workers to decrease your job's overall processing time.

Limitations

Dynamic Work Rebalancing only happens when the Dataflow service is processing some input data in parallel: when reading data from an external input source, when working with a materialized intermediate PCollection, or when working with the result of an aggregation like GroupByKey. If a large number of steps in your job are fused, there are fewer intermediate PCollections in your job and Dynamic Work Rebalancing will be limited to the number of elements in the source materialized PCollection. If you want to ensure that Dynamic Work Rebalancing can be applied to a particular PCollection in your pipeline, you can prevent fusion in a few different ways to ensure dynamic parallelism.

Dynamic Work Rebalancing cannot re-parallelize data finer than a single record. If your data contains individual records that cause large delays in processing time, they may still delay your job, since Dataflow cannot subdivide and redistribute an individual "hot" record to multiple workers.

Java: SDK 1.x

If you've set a fixed number of shards for your pipeline's final output (for example, by writing data using TextIO.Write.withNumShards), parallelization will be limited based on the number of shards that you've chosen.

Java: SDK 2.x

If you've set a fixed number of shards for your pipeline's final output (for example, by writing data using TextIO.Write.withNumShards), parallelization will be limited based on the number of shards that you've chosen.

Python

If you've set a fixed number of shards for your pipeline's final output (for example, by writing data using beam.io.WriteToText(..., num_shards=...)), Dataflow will limit parallelization based on the number of shards that you've chosen.

The fixed-shards limitation can be considered temporary, and may be subject to change in future releases of the Dataflow service.

Working with Custom Data Sources

Java: SDK 1.x

If your pipeline uses a custom data source that you provide, you must implement the method splitAtFraction to allow your source to work with the Dynamic Work Rebalancing feature.

Java: SDK 2.x

If your pipeline uses a custom data source that you provide, you must implement the method splitAtFraction to allow your source to work with the Dynamic Work Rebalancing feature.

Python

If your pipeline uses a custom data source that you provide, your RangeTracker must implement try_claim, try_split, position_at_fraction, and fraction_consumed to allow your source to work with the Dynamic Work Rebalancing feature.

Send feedback about...

Cloud Dataflow Documentation