Pipeline lifecycle

This page provides an overview of the pipeline lifecycle from pipeline code to a Dataflow job.

This page explains the following:

  • What an execution graph is, and how an Apache Beam pipeline becomes a Dataflow job.
  • How Dataflow automatically parallelizes and distributes the processing logic in your pipeline to the workers performing your job.
  • How Dataflow handles errors.
  • Job optimizations that Dataflow may make.

Execution graph

When you run your Dataflow pipeline, Dataflow creates an execution graph from the code that constructs your Pipeline object, including all of the transforms and their associated processing functions (such as DoFns). This is the pipeline execution graph, and the phase is called graph construction time.

During graph construction, Apache Beam locally executes the code from the main entry point of the pipeline code, stopping at the calls to a source, sink or transform step, and turning these calls into nodes of the graph. Consequently, a piece of code in a pipeline's entry point (Java and Go main() method or the top-level of a Python script) locally executes on the machine that runs the pipeline, while the same code declared in a method of a DoFn object executes in the Dataflow workers.

For example, the WordCount sample included with the Apache Beam SDKs, contains a series of transforms to read, extract, count, format, and write the individual words in a collection of text, along with an occurrence count for each word. The following diagram shows how the transforms in the WordCount pipeline are expanded into an execution graph:

The transforms in the WordCount example program expanded into an execution graph
of steps to be executed by the Dataflow service.

Figure 1: WordCount example execution graph

The execution graph often differs from the order in which you specified your transforms when you constructed the pipeline. This is because the Dataflow service performs various optimizations and fusions on the execution graph before it runs on managed cloud resources. The Dataflow service respects data dependencies when executing your pipeline; however, steps without data dependencies between them may be executed in any order.

You can see the unoptimized execution graph that Dataflow has generated for your pipeline when you select your job in the Dataflow monitoring interface. For more information about viewing jobs, see Using the Dataflow monitoring interface.

During graph construction, Apache Beam validates that any resources referenced by the pipeline (like Cloud Storage buckets, BigQuery tables, and Pub/Sub topics or subscriptions) actually exist and are accessible. The validation is done through standard API calls to the respective services, so it's vital that the user account used to run a pipeline has proper connectivity to the necessary services and is authorized to call their APIs. Before submitting the pipeline to the Dataflow service, Apache Beam also checks for other errors, and ensures that the pipeline graph doesn't contain any illegal operations.

The execution graph is then translated into JSON format, and the JSON execution graph is transmitted to the Dataflow service endpoint.

The Dataflow service then validates the JSON execution graph. When the graph is validated, it becomes a job on the Dataflow service. You'll be able to see your job, its execution graph, status, and log information by using the Dataflow monitoring interface.

Java

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object DataflowPipelineJob, which contains your Dataflow job's jobId. You can use the jobId to monitor, track, and troubleshoot your job using the Dataflow monitoring interface and the Dataflow command-line interface. See the API reference for DataflowPipelineJob for more information.

Python

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object DataflowPipelineResult, which contains your Dataflow job's job_id. You can use the job_id to monitor, track, and troubleshoot your job using the Dataflow monitoring interface and the Dataflow command-line interface.

Go

The Dataflow service sends a response to the machine where you ran your Dataflow program. This response is encapsulated in the object dataflowPipelineResult, which contains your Dataflow job's jobID. You can use the jobID to monitor, track, and troubleshoot your job using the Dataflow monitoring interface and the Dataflow command-line interface.

Parallelization and distribution

The Dataflow service automatically parallelizes and distributes the processing logic in your pipeline to the workers you've allotted to perform your job. Dataflow uses the abstractions in the programming model to represent parallel processing functions; for example, the ParDo transforms in a pipeline cause Dataflow to automatically distribute processing code, represented by DoFns, to multiple workers to be run in parallel.

Error and exception handling

Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.

Dataflow processes elements in arbitrary bundles, and retries the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Startup worker errors, like failure to install packages on the workers, are transient, which will result in indefinite retries and may cause your pipeline to permanently stall.

Fusion optimization

Once the JSON form of your pipeline's execution graph has been validated, the Dataflow service may modify the graph to perform optimizations. Such optimizations can include fusing multiple steps or transforms in your pipeline's execution graph into single steps. Fusing steps prevents the Dataflow service from needing to materialize every intermediate PCollection in your pipeline, which can be costly in terms of memory and processing overhead.

While all the transforms you've specified in your pipeline construction are executed on the service, they may be executed in a different order, or as part of a larger fused transform to ensure the most efficient execution of your pipeline. The Dataflow service respects data dependencies between the steps in the execution graph, but otherwise steps may be executed in any order.

Fusion example

The following diagram shows how the execution graph from the WordCount example included with the Apache Beam SDK for Java might be optimized and fused by the Dataflow service for efficient execution:

The execution graph for the WordCount example program optimized and with steps fused
by the Dataflow service.

Figure 2: WordCount Example Optimized Execution Graph

Prevent fusion

There are a few cases in your pipeline where you may want to prevent the Dataflow service from performing fusion optimizations. These are cases in which the Dataflow service might incorrectly guess the optimal way to fuse operations in the pipeline, which could limit the Dataflow service's ability to make use of all available workers.

For example, one case in which fusion can limit Dataflow's ability to optimize worker usage is a "high fan-out" ParDo. In such an operation, you might have an input collection with relatively few elements, but the ParDo produces an output with hundreds or thousands of times as many elements, followed by another ParDo. If the Dataflow service fuses these ParDo operations together, parallelism in this step is limited to at most the number of items in the input collection, even though the intermediate PCollection contains many more elements.

You can prevent such a fusion by adding an operation to your pipeline that forces the Dataflow service to materialize your intermediate PCollection. Consider using one of the following operations:

  • You can insert a GroupByKey and ungroup after your first ParDo. The Dataflow service never fuses ParDo operations across an aggregation.
  • You can pass your intermediate PCollection as a side input to another ParDo. The Dataflow service always materializes side inputs.
  • You can insert a Reshuffle step. Reshuffle prevents fusion, checkpoints the data, and performs deduplication of records. Reshuffle is supported by Dataflow even though it is marked deprecated in the Apache Beam documentation.

Monitor fusion

You can access your optimized graph and fused stages in the Google Cloud console, by using the gcloud CLI, or by using the API.

Console

To view your graph's fused stages and steps in the console, in the Execution details tab for your Dataflow job, open the Stage workflow graph view.

To see the component steps that are fused for a stage, in the graph, click the fused stage. In the Stage info pane, the Component steps row displays the fused stages. Sometimes portions of a single composite transform are fused into multiple stages.

gcloud

To access your optimized graph and fused stages by using the gcloud CLI, run the following gcloud command:

  gcloud dataflow jobs describe --full JOB_ID --format json

Replace JOB_ID with the ID of your Dataflow job.

To extract the relevant bits, pipe the output of the gcloud command to jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

To see the description of the fused stages in the output response file, within the ComponentTransform array, see the ExecutionStageSummary object.

API

To access your optimized graph and fused stages by using the API, call project.locations.jobs.get.

To see the description of the fused stages in the output response file, within the ComponentTransform array, see the ExecutionStageSummary object.

Combine optimization

Aggregation operations are an important concept in large-scale data processing. Aggregation brings together data that's conceptually far apart, making it extremely useful for correlating. The Dataflow programming model represents aggregation operations as the GroupByKey, CoGroupByKey, and Combine transforms.

Dataflow's aggregation operations combine data across the entire data set, including data that may be spread across multiple workers. During such aggregation operations, it's often most efficient to combine as much data locally as possible before combining data across instances. When you apply a GroupByKey or other aggregating transform, the Dataflow service automatically performs partial combining locally before the main grouping operation.

When performing partial or multi-level combining, the Dataflow service makes different decisions based on whether your pipeline is working with batch or streaming data. For bounded data, the service favors efficiency and will perform as much local combining as possible. For unbounded data, the service favors lower latency, and may not perform partial combining (as it may increase latency).