This page explains how to set pipeline options for your Dataflow jobs. These pipeline options configure how and where your pipeline executes and which resources it uses.
Pipeline execution is separate from your Apache Beam program's execution. The Apache Beam program that you've written constructs a pipeline for deferred execution. This means that the program generates a series of steps that any supported Apache Beam runner can execute. Compatible runners include the Dataflow runner on Google Cloud and the direct runner that executes the pipeline directly in a local environment.
You can pass parameters into a Dataflow job at runtime. For additional information about setting pipeline options at runtime, see Use runtime parameters in your pipeline code and Configuring pipeline options.
Using pipeline options with Apache Beam SDKs
You can use the following SDKs to set pipeline options for Dataflow jobs:
- Apache Beam SDK for Python
- Apache Beam SDK for Java
- Apache Beam SDK for Go
To use the SDKs, you set the pipeline runner and other execution parameters by
using the Apache Beam SDK class PipelineOptions
.
There are two methods for specifying pipeline options:
- Set them programmatically by supplying a list of pipeline options.
- Set them directly on the command line when you run your pipeline code.
Setting pipeline options programmatically
You can set pipeline options programmatically by creating and modifying a
PipelineOptions
object.
Java
Construct a
PipelineOptions
object using the method PipelineOptionsFactory.fromArgs
.
For an example, view the Launching on Dataflow sample.
Python
Create a
PipelineOptions
object.
For an example, view the Launching on Dataflow sample.
Go
Setting pipeline options programmatically using PipelineOptions
is not
supported in the Apache Beam SDK for Go. Use Go command-line arguments.
For an example, view the Launching on Dataflow sample.
Setting pipeline options on the command line.
You can set pipeline options using command-line arguments.
Java
To view an example of this syntax, see the Java quickstart samples.
Python
To view an example of this syntax, see the Python quickstart samples.
Go
To view an example of this syntax, see the Go quickstart samples.
Accessing the pipeline options object
You pass PipelineOptions
when you create your Pipeline
object in your
Apache Beam program. When the Dataflow service runs
your pipeline, it sends a copy of the PipelineOptions
to each worker.
Java
You can access PipelineOptions
inside any ParDo
's DoFn
instance by using
the method ProcessContext.getPipelineOptions
.
Python
This feature is not supported in the Apache Beam SDK for Python.
Go
You can access pipeline options using beam.PipelineOptions
.
Launching on Dataflow
You can run your job on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with Dataflow creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project. For information about Dataflow permissions, see Dataflow security and permissions.
Setting required options
To execute your pipeline using Dataflow, set the following pipeline options:
Java
project
: the ID of your Google Cloud project.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must beDataflowRunner
.gcpTempLocation
: a Cloud Storage path for Dataflow to stage most temporary files. If you want to specify a bucket, you must create the bucket ahead of time. If you don't setgcpTempLocation
, you can set the pipeline optiontempLocation
and thengcpTempLocation
is set to the value oftempLocation
. If neither are specified, a defaultgcpTempLocation
is created.stagingLocation
: a Cloud Storage path for Dataflow to stage your binary files. If you're using the Apache Beam SDK 2.28 or higher, do not set this option. For the Apache Beam SDK 2.28 or lower, if you do not set this option, what you specified for thetempLocation
is used for the staging location.A default
gcpTempLocation
is created if neither it nortempLocation
is specified. IftempLocation
is specified andgcpTempLocation
is not,tempLocation
must be a Cloud Storage path, andgcpTempLocation
defaults to it. IftempLocation
is not specified andgcpTempLocation
is,tempLocation
is not populated.
Python
project
: your Google Cloud project ID.region
: the regional endpoint for your Dataflow job.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must beDataflowRunner
.temp_location
: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
Go
project
: your Google Cloud project ID.region
: the regional endpoint for your Dataflow job.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must bedataflow
.staging_location
: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
Setting pipeline options programmatically
The following example code shows how to construct a pipeline by programmatically setting the runner and other required options to execute the pipeline using Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
The Apache Beam SDK for Go uses Go command-line arguments. Use
flag.Set()
to set flag values.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.
Using pipeline options from the command line
The following example shows how to use pipeline options that are specified on the command line. This example doesn't set the pipeline options programmatically.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Use the Python argparse module to parse command-line options.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Use the Go flag
package to parse
command-line options. You must parse the options before you call
beam.Init()
. In this example, output
is a command-line option.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.
Controlling execution modes
When an Apache Beam program runs a pipeline on a service such as Dataflow, the program can either run the pipeline asynchronously, or can block until pipeline completion. You can change this behavior by using the following guidance.
Java
When an Apache Beam Java program runs a pipeline on a service such as
Dataflow, it is typically executed asynchronously. To run a
pipeline and wait until the job completes, set DataflowRunner
as the
pipeline runner and explicitly call pipeline.run().waitUntilFinish()
.
When you use DataflowRunner
and call waitUntilFinish()
on the
PipelineResult
object returned from pipeline.run()
, the pipeline executes
on Google Cloud but the local code waits for the cloud job to finish and
return the final DataflowPipelineJob
object. While the job runs, the
Dataflow service prints job status updates and console messages
while it waits.
Python
When an Apache Beam Python program runs a pipeline on a service such as
Dataflow, it is typically executed asynchronously. To block
until pipeline completion, use the wait_until_finish()
method of the
PipelineResult
object, returned from the run()
method of the runner.
Go
When an Apache Beam Go program runs a pipeline on Dataflow, it is synchronous by default and blocks until pipeline completion. If you don't want to block, there are two options:
Start the job in a Go routine.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Use the
--async
command-line flag, which is in thejobopts
package.
To view execution details, monitor progress, and verify job completion status, use the Dataflow monitoring interface or the Dataflow command line interface.
Using streaming sources
Java
If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.
Python
If your pipeline uses an unbounded data source, such as Pub/Sub, you
must set the streaming
option to true.
Go
If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.
Streaming jobs use a Compute Engine machine type
of n1-standard-2
or higher by default.
Launching locally
Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud project.
When you use local execution, you must run your pipeline with datasets small
enough to fit in local memory. You can create a small in-memory
data set using a Create
transform, or you can use a Read
transform to
work with small local or remote files. Local execution provides a fast and easy
way to perform testing and debugging with fewer external dependencies but is
limited by the memory available in your local environment.
The following example code shows how to construct a pipeline that executes in your local environment.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
After you've constructed your pipeline, run it.
Creating custom pipeline options
You can add your own custom options in addition to the standard
PipelineOptions
. Apache Beam's command line can also parse custom
options using command line arguments specified in the same format.
Java
To add your own options, define an interface with getter and setter methods for each option, as in the following example:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
To add your own options, use the add_argument()
method (which behaves
exactly like Python's standard
argparse module),
as in the following example:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
To add your own options, use the Go flag package as shown in the following example:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
You can also specify a description, which appears when a user passes --help
as
a command-line argument, and a default value.
Java
You set the description and default value using annotations, as follows:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
We recommend that you register your interface with PipelineOptionsFactory
and then pass the interface when creating the PipelineOptions
object. When
you register your interface with PipelineOptionsFactory
, the --help
can
find your custom options interface and add it to the output of the --help
command. PipelineOptionsFactory
validates that your custom options are
compatible with all other registered options.
The following example code shows how to register your custom options interface
with PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Now your pipeline can accept --myCustomOption=value
as a command-line
argument.
Python
You set the description and default value as follows:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
You set the description and default value as follows:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)