This page explains how to set pipeline options for your Dataflow jobs. These pipeline options configure how and where your pipeline runs and which resources it uses.
Pipeline execution is separate from your Apache Beam program's execution. The Apache Beam program that you've written constructs a pipeline for deferred execution. This means that the program generates a series of steps that any supported Apache Beam runner can execute. Compatible runners include the Dataflow runner on Google Cloud and the direct runner that executes the pipeline directly in a local environment.
You can pass parameters into a Dataflow job at runtime. For additional information about setting pipeline options at runtime, see Configuring pipeline options.
Use pipeline options with Apache Beam SDKs
You can use the following SDKs to set pipeline options for Dataflow jobs:
- Apache Beam SDK for Python
- Apache Beam SDK for Java
- Apache Beam SDK for Go
To use the SDKs, you set the pipeline runner and other execution parameters by
using the Apache Beam SDK class PipelineOptions
.
There are two methods for specifying pipeline options:
- Set pipeline options programmatically by supplying a list of pipeline options.
- Set pipeline options directly on the command line when you run your pipeline code.
Set pipeline options programmatically
You can set pipeline options programmatically by creating and modifying a
PipelineOptions
object.
Java
Construct a
PipelineOptions
object using the method PipelineOptionsFactory.fromArgs
.
For an example, see the Launch on Dataflow sample section on this page.
Python
Create a
PipelineOptions
object.
For an example, see the Launch on Dataflow sample section on this page.
Go
Setting pipeline options programmatically using PipelineOptions
is not
supported in the Apache Beam SDK for Go. Use Go command-line arguments.
For an example, see the Launch on Dataflow sample section on this page.
Set pipeline options on the command line
You can set pipeline options by using command-line arguments.
Java
The following example syntax is from the WordCount
pipeline in the
Java quickstart.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Replace the following:
PROJECT_ID
: your Google Cloud project IDBUCKET_NAME
: the name of your Cloud Storage bucketREGION
: a Dataflow region,us-central1
Python
The following example syntax is from the WordCount
pipeline in the
Python quickstart.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Replace the following:
DATAFLOW_REGION
: the region where you want to deploy the Dataflow job—for example,europe-west1
The
--region
flag overrides the default region that is set in the metadata server, your local client, or environment variables.STORAGE_BUCKET
: the Cloud Storage bucket namePROJECT_ID
: the Google Cloud project ID
Go
The following example syntax is from the WordCount
pipeline in the
Go quickstart.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Replace the following:
BUCKET_NAME
: the Cloud Storage bucket namePROJECT_ID
: the Google Cloud project IDDATAFLOW_REGION
: The region where you want to deploy the Dataflow job. For example,europe-west1
. The--region
flag overrides the default region that is set in the metadata server, your local client, or environment variables.
Set experimental pipeline options
In the Java, Python, and Go SDKs, the experiments
pipeline option
enables experimental or pre-GA Dataflow features.
Set programmatically
To set the experiments
option programmatically, use the following syntax.
Java
In your
PipelineOptions
object, include the experiments
option by using the following syntax.
This example sets the boot disk size to 80 GB with the experiment flag.
options.setExperiments("streaming_boot_disk_size_gb=80")
For an example that shows how to create the PipelineOptions
object, see the
Launch on Dataflow sample
section on this page.
Python
In your
PipelineOptions
object, include the experiments
option by using the following syntax.
This example sets the boot disk size to 80 GB with the experiment flag.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
For an example that shows how to create the PipelineOptions
object, see the
Launch on Dataflow sample
section on this page.
Go
Setting pipeline options programmatically using PipelineOptions
is not
supported in the Apache Beam SDK for Go. Use Go command-line arguments.
Set on the command line
To set the experiments
option on the command line, use the following syntax.
Java
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80
Python
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80
Go
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80
Set in a template
To enable an experimental feature when running a Dataflow
template, use the --additional-experiments
flag.
Classic template
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Flex template
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Access the pipeline options object
When you create your Pipeline
object in your Apache Beam program, pass
PipelineOptions
. When the Dataflow service runs
your pipeline, it sends a copy of the PipelineOptions
to each worker.
Java
Access PipelineOptions
inside any ParDo
transform's DoFn
instance by using
the method ProcessContext.getPipelineOptions
.
Python
This feature is not supported in the Apache Beam SDK for Python.
Go
Access pipeline options using beam.PipelineOptions
.
Launch on Dataflow
Run your job on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with Dataflow creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project. For information about Dataflow permissions, see Dataflow security and permissions.
Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. To avoid being billed for unnecessary storage costs, turn off the soft delete feature on buckets that your Dataflow jobs use for temporary storage. For more information, see Remove a soft delete policy from a bucket.
Set required options
To run your pipeline using Dataflow, set the following pipeline options:
Java
project
: the ID of your Google Cloud project.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must beDataflowRunner
.gcpTempLocation
: a Cloud Storage path for Dataflow to stage most temporary files. If you want to specify a bucket, you must create the bucket ahead of time. If you don't setgcpTempLocation
, you can set the pipeline optiontempLocation
and thengcpTempLocation
is set to the value oftempLocation
. If neither are specified, a defaultgcpTempLocation
is created.stagingLocation
: a Cloud Storage path for Dataflow to stage your binary files. If you're using the Apache Beam SDK 2.28 or higher, don't set this option. For the Apache Beam SDK 2.28 or lower, if you don't set this option, what you specified for thetempLocation
is used for the staging location.A default
gcpTempLocation
is created if neither it nortempLocation
is specified. IftempLocation
is specified andgcpTempLocation
is not,tempLocation
must be a Cloud Storage path, andgcpTempLocation
defaults to it. IftempLocation
is not specified andgcpTempLocation
is,tempLocation
is not populated.
Python
project
: your Google Cloud project ID.region
: the region for your Dataflow job.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must beDataflowRunner
.temp_location
: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
Go
project
: your Google Cloud project ID.region
: the region for your Dataflow job.runner
: the pipeline runner that executes your pipeline. For Google Cloud execution, this must bedataflow
.staging_location
: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
Set pipeline options programmatically
The following example code shows how to construct a pipeline by programmatically setting the runner and other required options to execute the pipeline using Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
The Apache Beam SDK for Go uses Go command-line arguments. Use
flag.Set()
to set flag values.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.
Use pipeline options from the command line
The following example shows how to use pipeline options that are specified on the command line. This example doesn't set the pipeline options programmatically.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Use the Python argparse module to parse command-line options.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Use the Go flag
package to parse
command-line options. You must parse the options before you call
beam.Init()
. In this example, output
is a command-line option.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
After you construct your pipeline, specify all the pipeline reads, transforms, and writes, and then run the pipeline.
Control execution modes
When an Apache Beam program runs a pipeline on a service such as Dataflow, the program can either run the pipeline asynchronously, or can block until pipeline completion. You can change this behavior by using the following guidance.
Java
When an Apache Beam Java program runs a pipeline on a service such as
Dataflow, it is typically executed asynchronously. To run a
pipeline and wait until the job completes, set DataflowRunner
as the
pipeline runner and explicitly call pipeline.run().waitUntilFinish()
.
When you use DataflowRunner
and call waitUntilFinish()
on the
PipelineResult
object returned from pipeline.run()
, the pipeline executes
on Google Cloud but the local code waits for the cloud job to finish and
return the final DataflowPipelineJob
object. While the job runs, the
Dataflow service prints job status updates and console messages
while it waits.
Python
When an Apache Beam Python program runs a pipeline on a service such as
Dataflow, it is typically executed asynchronously. To block
until pipeline completion, use the wait_until_finish()
method of the
PipelineResult
object, returned from the run()
method of the runner.
Go
When an Apache Beam Go program runs a pipeline on Dataflow, it is synchronous by default and blocks until pipeline completion. If you don't want to block, there are two options:
Start the job in a Go routine.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Use the
--async
command-line flag, which is in thejobopts
package.
To view execution details, monitor progress, and verify job completion status, use the Dataflow monitoring interface or the Dataflow command line interface.
Use streaming sources
Java
If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.
Python
If your pipeline uses an unbounded data source, such as Pub/Sub, you
must set the streaming
option to true.
Go
If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.
Streaming jobs use a Compute Engine machine type
of n1-standard-2
or higher by default.
Launch locally
Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud project.
When you use local execution, you must run your pipeline with datasets small
enough to fit in local memory. You can create a small in-memory
dataset using a Create
transform, or you can use a Read
transform to
work with small local or remote files. Local execution typically provides a
faster and easier way to perform testing and debugging with fewer external
dependencies, but is limited by the memory that is available in your local
environment.
The following example code shows how to construct a pipeline that executes in your local environment.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
After you've constructed your pipeline, run it.
Create custom pipeline options
You can add your own custom options in addition to the standard
PipelineOptions
. Apache Beam's command line can also parse custom
options using command line arguments specified in the same format.
Java
To add your own options, define an interface with getter and setter methods for each option, as in the following example:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
To add your own options, use the add_argument()
method (which behaves
exactly like Python's standard
argparse module),
as in the following example:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
To add your own options, use the Go flag package as shown in the following example:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
You can also specify a description, which appears when a user passes --help
as
a command-line argument, and a default value.
Java
You set the description and default value using annotations, as follows:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
We recommend that you register your interface with PipelineOptionsFactory
and then pass the interface when creating the PipelineOptions
object. When
you register your interface with PipelineOptionsFactory
, the --help
can
find your custom options interface and add it to the output of the --help
command. PipelineOptionsFactory
validates that your custom options are
compatible with all other registered options.
The following example code shows how to register your custom options interface
with PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Now your pipeline can accept --myCustomOption=value
as a command-line
argument.
Python
You set the description and default value as follows:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
You set the description and default value as follows:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)