This page provides information about memory usage in Dataflow pipelines and steps for investigating and resolving issues with Dataflow out of memory (OOM) errors.
About Dataflow memory usage
To troubleshoot out of memory errors, it's helpful to understand how Dataflow pipelines use memory.
When Dataflow runs a pipeline, the processing is distributed
across multiple Compute Engine virtual machines (VMs), often called workers.
Workers process work items from the Dataflow service
and delegate the work items to Apache Beam SDK processes. An Apache Beam
SDK process creates instances
of DoFn
s. DoFn
is an Apache Beam SDK class that defines a distributed
processing function.
Dataflow launches several threads on each worker, and the memory of each worker is shared across all the threads. A thread is a single executable task running within a larger process. The default number of threads depends on multiple factors and varies between batch and streaming jobs.
If your pipeline needs more memory than the default amount of memory available on the workers, you might encounter out of memory errors.
Dataflow pipelines primarily use worker memory in three ways:
Worker operational memory
Dataflow workers need memory for their operating systems and system processes. Worker memory usage is typically no larger than 1 GB. Usage is typically less than 1 GB.
- Various processes on the worker use memory to ensure that your pipeline is in working order. Each of these processes might reserve a small amount of memory for its operation.
- When your pipeline doesn't use Streaming Engine, additional worker processes use memory.
SDK process memory
Apache Beam SDK processes might create objects and data that are shared between threads within the process, referred to on this page as SDK shared objects and data. Memory usage from these SDK shared objects and data is referred to as SDK process memory. The following list includes examples of SDK shared objects and data:
- Side inputs
- Machine learning models
- In-memory singleton objects
- Python objects created with the
apache_beam.utils.shared
module - Data loaded from external sources, such as Cloud Storage or BigQuery
Streaming jobs that don't use Streaming Engine store side inputs in memory. For Java and Go pipelines, each worker has one copy of the side input. For Python pipelines, each Apache Beam SDK process has one copy of the side input.
Streaming jobs that use Streaming Engine have a side input size limit of 80 MB. Side inputs are stored outside of worker memory.
Memory usage from SDK shared objects and data grows linearly with the number of Apache Beam SDK processes. In Java and Go pipelines, one Apache Beam SDK process is started per worker. In Python pipelines, one Apache Beam SDK process is started per vCPU. SDK shared objects and data are reused across threads within the same Apache Beam SDK process.
DoFn
memory usage
DoFn
is an Apache Beam SDK class that defines a distributed processing function.
Each worker can run concurrent DoFn
instances. Each thread runs one DoFn
instance. When evaluating total memory usage, calculating working set size, or
the amount of memory necessary for an application to continue working, might be
helpful. For example, if an individual DoFn
uses
a maximum of 5 MB of memory and a worker has 300 threads, then DoFn
memory
usage could peak at 1.5 GB, or the number of bytes of memory multiplied by
the number of threads. Depending on how workers are using memory, a spike in memory
usage could cause workers to run out of memory.
It's hard to estimate how many instances of a
DoFn
Dataflow creates. The number depends on various factors, such as the SDK,
the machine type, and so on. In addition, the DoFn might be used by multiple threads in succession.
The Dataflow service does not guarantee how many times a DoFn
is invoked,
nor does it guarantee the exact number of DoFn
instances created over the course of a pipeline.
However, the following table gives some insight into the level
of parallelism you can expect and estimates an upper bound on
the number of DoFn
instances.
Beam Python SDK
Batch | Streaming without Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelism |
1 process per vCPU 1 thread per process 1 thread per vCPU
|
1 process per vCPU 12 threads per process 12 threads per vCPU |
1 process per vCPU 12 threads per process 12 threads per vCPU
|
Maximum number of concurrent DoFn instances (All of these numbers are subject to change at any time.) |
1 DoFn per thread
1
|
1 DoFn per thread
12
|
1 DoFn per thread
12
|
Beam Java/Go SDK
Batch | Streaming without Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelism |
1 process per worker VM 1 thread per vCPU
|
1 process per worker VM 300 threads per process 300 threads per worker VM
|
1 process per worker VM 500 threads per process 500 threads per worker VM
|
Maximum number of concurrent DoFn instances (All of these numbers are subject to change at any time.) |
1 DoFn per thread
1
|
1 DoFn per thread
300
|
1 DoFn per thread
500
|
When you have a multi-language pipeline, and more than one Apache Beam SDK is running on the worker, the worker uses the lowest degree of thread-per-process parallelism possible.
Java, Go, and Python differences
Java, Go, and Python manage processes and memory differently. As a result, the approach that you should take when troubleshooting out of memory errors varies based on whether your pipeline uses Java, Go, or Python.
Java and Go pipelines
In Java and Go pipelines:
- Each worker starts one Apache Beam SDK process.
- SDK shared objects and data, like side inputs and caches, are shared among all threads on the worker.
- The memory used by SDK shared objects and data does not usually scale based on the number of vCPUs on the worker.
Python pipelines
In Python pipelines:
- Each worker starts one Apache Beam SDK process per vCPU.
- SDK shared objects and data, like side inputs and caches, are shared among all threads within each Apache Beam SDK process.
- The total number of threads on the worker scales linearly based on the number of vCPUs. As a result, the memory used by SDK shared objects and data grows linearly with the number of vCPUs.
- Threads performing the work are distributed across processes. New units of work are assigned either to a process with no work items, or to the process with the fewest work items currently assigned.
Find out of memory errors
To determine if your pipeline is running out of memory, use one of the following methods.
- On the Jobs details page, in the Logs pane, view the Diagnostics tab. This tab displays errors related to memory issues and how often the errors occur.
- In the Dataflow monitoring interface, use the Memory utilization chart to monitor worker memory capacity and usage.
On the Jobs details page, in the Logs pane, select Worker logs. Find the memory errors.
Java
The Java Memory Monitor, configured by the
MemoryMonitorOptions
interface,
periodically reports garbage collection metrics. If the fraction of CPU time used for
garbage collection exceeds a threshold of 50% for
an extended period of time, the current SDK harness fails.
You might see an error similar to the following example:
Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
This memory error can occur when physical memory is still available. The error usually indicates that the memory usage of the pipeline is inefficient. To resolve this issue, optimize your pipeline.
If your job has high memory usage or out of memory errors, follow the recommendations on this page to optimize memory usage or to increase the amount of memory available.
Resolve out of memory errors
Changes to your Dataflow pipeline might resolve out of memory errors or reduce memory usage. Possible changes include the following actions:
The following diagram shows the Dataflow troubleshooting workflow described in this page.
Optimize your pipeline
Several pipeline operations can cause out of memory errors. This section provides options for reducing the memory usage of your pipeline. To identify the pipeline stages that consume the most memory, use Cloud Profiler to monitor pipeline performance.
You can use the following best practices to optimize your pipeline:
- Use Apache Beam built-in I/O connectors for reading files
- Redesign operations when using
GroupByKey
PTransforms - Reduce ingress data from external sources
- Share objects across threads
- Use memory-efficient element representations
- Reduce the size of side inputs
Use Apache Beam built-in I/O connectors for reading files
Don't open large files inside a DoFn
. To read files, use
Apache Beam built-in I/O connectors.
Files opened in a DoFn
must fit into memory. Because multiple DoFn
instances run
concurrently, large files opened in DoFn
s can cause out of memory errors.
Redesign operations when using GroupByKey
PTransforms
When you use a GroupByKey
PTransform in Dataflow, the resulting
per key and per window values are processed on a single thread. Because this data
is passed as a stream from the Dataflow backend service to the
workers, it doesn't need to fit in worker memory. However, if the values are
collected in memory, the processing logic might cause out of memory errors.
For example, if you have a key that contains data for a window, and you add the key values to an in-memory object, such as a list, out of memory errors might occur. In this scenario, the worker might not have sufficient memory capacity to hold all of the objects.
For more information about GroupByKey
PTransforms, see the Apache Beam
Python GroupByKey
and Java GroupByKey
documentation.
The following list contains suggestions for designing your pipeline to minimize
memory consumption when using GroupByKey
PTransforms.
- To reduce the amount of data per key and per window, avoid keys with many values, also known as hot keys.
- To reduce the amount of data collected per-window, use a smaller window size.
- If you're using values of a key in a window to calculate a number, use a
Combine
transform. Don't do the calculation in a singleDoFn
instance after collecting the values. - Filter values or duplicates before processing. For more information, see the
Python
Filter
and the JavaFilter
transform documentation.
Reduce ingress data from external sources
If you're making calls to an external API or a database for data enrichment,
the returned data must fit into worker memory.
If you're batching calls, using a GroupIntoBatches
transform is recommended.
If you encounter out of memory errors, reduce the batch size. For more information
about grouping into batches, see the
Python GroupIntoBatches
and the Java GroupIntoBatches
transform documentation.
Share objects across threads
Sharing an in-memory data object across DoFn
instances can improve space and
access efficiency. Data objects created in any method of the DoFn
, including
Setup
, StartBundle
, Process
, FinishBundle
, and Teardown
, are invoked
for each DoFn
. In Dataflow, each worker might have several DoFn
instances. For more efficient memory usage, pass a data object as a singleton to
share it across several DoFn
s. For more information, see the blog post
Cache reuse across DoFn
s.
Use memory-efficient element representations
Evaluate whether you can use representations for PCollection
elements that use less memory. When using coders in your pipeline, consider not
only encoded but also decoded PCollection
element representations. Sparse
matrices can often benefit from this type of optimization.
Reduce the size of side inputs
If your DoFn
s use side inputs, reduce the size of the side input. For side
inputs that are collections of elements, consider using iterable views, such as
AsIterable
or AsMultimap
, instead of views that materialize the entire side input at the same time, such as
AsList
.
Make more memory available
To increase available memory, you can increase the total amount of memory available on workers without changing the amount of memory available per thread. Alternately, you can increase the amount of memory available per thread. When you increase the memory per thread, you also increase the total memory on the worker.
You can increase the amount of memory available per thread in four ways:
- Use a machine type with more memory per vCPU.
- Use a machine type with more vCPUs (Java and Go streaming pipelines).
- Reduce the number of threads.
- Use only one Apache Beam SDK process (Python streaming and Python Runner v2 pipelines).
Use a machine type with more memory per vCPU
To select a worker with more memory per vCPU, use one of the following methods.
- Use a high-memory machine type in the general-purpose machine family. High-memory machine types have higher memory per vCPU than the standard machine types. Using a high memory machine type both increases the memory available to each worker and the memory available per thread, because the number of vCPUs remains the same. As a result, using a high-memory machine type can be a cost effective way to select a worker with more memory per vCPU.
- For more flexibility when specifying the number of vCPUs and the amount of memory, you can use a custom machine type. With custom machine types, you can increase memory in 256 MB increments. These machine types are priced differently than standard machine types.
- Some machine families let you use extended memory custom machine types. Extended memory enables a higher memory-per-vCPU ratio. The cost is higher.
To set worker types, use the following pipeline option. For more information, see Setting pipeline options and Pipeline options.
Java
Use the --workerMachineType
pipeline option.
Python
Use the --machine_type
pipeline option.
Go
Use the --worker_machine_type
pipeline option.
Use a machine type with more vCPUs
This option is only recommended for Java and Go streaming pipelines. Machine types with more
vCPUs have more total memory, because the amount of memory scales linearly
with the number of vCPUs. For example, an n1-standard-4
machine type with four
vCPUs has 15 GB of memory. An n1-standard-8
machine type with eight vCPUs
has 30 GB of memory. For more information about predefined machine types, see
General-purpose machine family.
Using workers with a higher number of vCPUs might
increase the cost of your pipeline significantly. However, you can use horizontal
autoscaling to reduce the number of total workers so that parallelism
remains the same. For example, if you have 50 workers using an n1-standard-4
machine type, and
you switch to an n1-standard-8
machine type, you can use horizontal autoscaling
and set the maximum number of workers to reduce the total number of
workers in your pipeline to about 25. This configuration results in a pipeline with a similar cost.
To set the maximum number of workers, use the following pipeline option.
Java
Use the --maxNumWorkers
pipeline option.
For more information, see Pipeline options.
Go
Use the --max_num_workers
pipeline option.
For more information, see Pipeline options.
This method is not recommended for Python pipelines. When you use the
Python SDK, if you switch to a worker with a higher number of
vCPUs, you not only increase memory, but you also increase the number of
Apache Beam SDK processes. For example, the n1-standard-4
machine type has the same memory
per thread as the n1-standard-8
machine type for Python pipelines. Therefore, with Python pipelines,
the recommendation is to use a high-memory machine type, reduce the number of
threads, or use only one Apache Beam SDK process.
Reduce the number of threads
If using a high-memory machine type doesn't solve your issue, increase the memory
available per thread by reducing the maximum number of threads that run DoFn
instances.
This change reduces parallelism. To reduce the number of Apache Beam
SDK threads that run DoFn
instances, use the following pipeline option.
Java
Use the --numberOfWorkerHarnessThreads
pipeline option.
For more information, see Pipeline options.
Python
Use the --number_of_worker_harness_threads
pipeline option.
For more information, see Pipeline options.
Go
Use the --number_of_worker_harness_threads
pipeline option.
For more information, see Pipeline options.
To reduce the number of threads for Java and Go batch pipelines, set the value
of the flag to a number that is less than the number of vCPUs on the worker. For streaming pipelines,
set the value of the flag to a number that is less than the number of threads per
Apache Beam SDK process.
To estimate threads per process, see the table in the DoFn
memory usage section
in this page.
This customization is not available for Python pipelines running on the Apache Beam SDK 2.20.0 or earlier or for Python pipelines that don't use Runner v2.
Use only one Apache Beam SDK process
For Python streaming pipelines and Python pipelines that use Runner v2, you can force Dataflow to start only one Apache Beam SDK process per worker. Before trying this option, first try to resolve the issue using the other methods. To configure Dataflow worker VMs to start only one containerized Python process, use the following pipeline option:
--experiments=no_use_multiple_sdk_containers
With this configuration, Python pipelines create one Apache Beam SDK process per worker. This configuration prevents the shared objects and data from being replicated multiple times for each Apache Beam SDK process. However, it limits the efficient use of the compute resources available on the worker.
Reducing the number of Apache Beam SDK processes to one does not necessarily reduce the total number of threads started on the worker. In addition, having all the threads on a single Apache Beam SDK process might cause slow processing or cause the pipeline to get stuck. Therefore, you might also have to reduce the number of threads, as described in the Reduce the number of threads section in this page.
You can also force workers to use only one Apache Beam SDK process by using a machine type with only one vCPU.