This page provides an overview of the pipeline lifecycle from pipeline code to a Dataflow job.
This page explains the following:
- What an execution graph is, and how an Apache Beam pipeline becomes a Dataflow job.
- How Dataflow automatically parallelizes and distributes the processing logic in your pipeline to the workers performing your job.
- How Dataflow handles errors.
- Job optimizations that Dataflow may make.
Execution graph
When you run your Dataflow pipeline, Dataflow
creates an execution graph from the code that constructs your Pipeline
object,
including all of the transforms and their associated processing functions (such
as DoFn
s). This is the pipeline execution graph, and the phase is called
graph construction time.
During graph construction, Apache Beam locally executes the code from the
main entry point of the pipeline code, stopping at the calls to a source, sink
or transform step, and turning these calls into nodes of the graph.
Consequently, a piece of code in a pipeline's entry point (Java and Go main()
method or the top-level of a Python script) locally executes on the machine that
runs the pipeline, while the same code declared in a method of a DoFn
object
executes in the Dataflow workers.
For example, the WordCount sample included with the Apache Beam SDKs, contains a series of transforms to read, extract, count, format, and write the individual words in a collection of text, along with an occurrence count for each word. The following diagram shows how the transforms in the WordCount pipeline are expanded into an execution graph:
Figure 1: WordCount example execution graph
The execution graph often differs from the order in which you specified your transforms when you constructed the pipeline. This is because the Dataflow service performs various optimizations and fusions on the execution graph before it runs on managed cloud resources. The Dataflow service respects data dependencies when executing your pipeline; however, steps without data dependencies between them may be executed in any order.
You can see the unoptimized execution graph that Dataflow has generated for your pipeline when you select your job in the Dataflow monitoring interface. For more information about viewing jobs, see Using the Dataflow monitoring interface.
During graph construction, Apache Beam validates that any resources referenced by the pipeline (like Cloud Storage buckets, BigQuery tables, and Pub/Sub topics or subscriptions) actually exist and are accessible. The validation is done through standard API calls to the respective services, so it's vital that the user account used to run a pipeline has proper connectivity to the necessary services and is authorized to call their APIs. Before submitting the pipeline to the Dataflow service, Apache Beam also checks for other errors, and ensures that the pipeline graph doesn't contain any illegal operations.
The execution graph is then translated into JSON format, and the JSON execution graph is transmitted to the Dataflow service endpoint.
The Dataflow service then validates the JSON execution graph. When the graph is validated, it becomes a job on the Dataflow service. You'll be able to see your job, its execution graph, status, and log information by using the Dataflow monitoring interface.
Java
The Dataflow service sends a response to the machine where you ran
your Dataflow program. This response is encapsulated in the object
DataflowPipelineJob
, which contains your Dataflow job's jobId
.
You can use the jobId
to monitor, track, and troubleshoot your job using the
Dataflow monitoring interface
and the Dataflow command-line interface.
See the API reference for DataflowPipelineJob
for more information.
Python
The Dataflow service sends a response to the machine where you ran
your Dataflow program. This response is encapsulated in the object
DataflowPipelineResult
, which contains your Dataflow job's
job_id
. You can use the job_id
to monitor, track, and troubleshoot your job
using the Dataflow monitoring interface
and the Dataflow command-line interface.
Go
The Dataflow service sends a response to the machine where you ran
your Dataflow program. This response is encapsulated in the object
dataflowPipelineResult
, which contains your Dataflow job's
jobID
. You can use the jobID
to monitor, track, and troubleshoot your job
using the Dataflow monitoring interface
and the Dataflow command-line interface.
Parallelization and distribution
The Dataflow service automatically parallelizes and distributes
the processing logic in your pipeline to the workers you've allotted to perform
your job. Dataflow uses the abstractions in the
programming model to represent
parallel processing functions; for example, the ParDo
transforms in a pipeline
cause Dataflow to automatically distribute processing code,
represented by DoFn
s, to multiple workers to be run in parallel.
Error and exception handling
Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.
Dataflow processes elements in arbitrary bundles, and retries the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.
Startup worker errors, like failure to install packages on the workers, are transient, which will result in indefinite retries and may cause your pipeline to permanently stall.
Fusion optimization
Once the JSON form of your pipeline's execution graph has been validated, the
Dataflow service may modify the graph to perform optimizations.
Such optimizations can include fusing multiple steps or transforms in your
pipeline's execution graph into single steps. Fusing steps prevents the
Dataflow service from needing to materialize every intermediate
PCollection
in your pipeline, which can be costly in terms of memory and
processing overhead.
While all the transforms you've specified in your pipeline construction are executed on the service, they may be executed in a different order, or as part of a larger fused transform to ensure the most efficient execution of your pipeline. The Dataflow service respects data dependencies between the steps in the execution graph, but otherwise steps may be executed in any order.
Fusion example
The following diagram shows how the execution graph from the WordCount example included with the Apache Beam SDK for Java might be optimized and fused by the Dataflow service for efficient execution:
Figure 2: WordCount Example Optimized Execution Graph
Preventing fusion
There are a few cases in your pipeline where you may want to prevent the Dataflow service from performing fusion optimizations. These are cases in which the Dataflow service might incorrectly guess the optimal way to fuse operations in the pipeline, which could limit the Dataflow service's ability to make use of all available workers.
For example, one case in which fusion can limit Dataflow's ability
to optimize worker usage is a "high fan-out" ParDo
. In such an operation, you
might have an input collection with relatively few elements, but the ParDo
produces an output with hundreds or thousands of times as many elements,
followed by another ParDo
. If the Dataflow service fuses these
ParDo
operations together, parallelism in this step is limited to at most the
number of items in the input collection, even though the intermediate
PCollection
contains many more elements.
You can prevent such a fusion by adding an operation to your pipeline that
forces the Dataflow service to materialize your intermediate
PCollection
. Consider using one of the following operations:
- You can insert a
GroupByKey
and ungroup after your firstParDo
. The Dataflow service never fusesParDo
operations across an aggregation. - You can pass your intermediate
PCollection
as a side input to anotherParDo
. The Dataflow service always materializes side inputs. - You can insert a
Reshuffle
step.Reshuffle
prevents fusion, checkpoints the data, and performs deduplication of records. Reshuffle is supported by Dataflow even though it is marked deprecated in the Apache Beam documentation.
Monitoring fusion
You can access your optimized graph and fused stages by calling project.locations.jobs.get
or by running
the following gcloud
command:
gcloud dataflow jobs describe --full $JOB_ID --format json
The fused stages are described under the ExecutionStageSummary
object
within the ComponentTransform
array on the output response file. You can consider piping the output to jq
to easily
extract the relevant bits using the following gcloud
command:
gcloud dataflow jobs describe --full $JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'
Combine optimization
Aggregation operations are an important concept in large-scale data processing.
Aggregation brings together data that's conceptually far apart, making it
extremely useful for correlating. The Dataflow programming model represents aggregation operations as the GroupByKey
, CoGroupByKey
, and
Combine
transforms.
Dataflow's aggregation operations combine data across the entire
data set, including data that may be spread across multiple workers. During such
aggregation operations, it's often most efficient to combine as much data
locally as possible before combining data across instances. When you apply a
GroupByKey
or other aggregating transform, the Dataflow service
automatically performs partial combining locally before the main grouping
operation.
When performing partial or multi-level combining, the Dataflow service makes different decisions based on whether your pipeline is working with batch or streaming data. For bounded data, the service favors efficiency and will perform as much local combining as possible. For unbounded data, the service favors lower latency, and may not perform partial combining (as it may increase latency).