Join the Apache Beam community on July 18th-20th for the Beam Summit 2022 to learn more about Beam and share your expertise.

Setting pipeline options

This page explains how to set pipeline options for your Dataflow jobs. These pipeline options configure how and where your pipeline executes and which resources it uses.

Pipeline execution is separate from your Apache Beam program's execution. The Apache Beam program that you've written constructs a pipeline for deferred execution. This means that the program generates a series of steps that any supported Apache Beam runner can execute. Compatible runners include the Dataflow runner on Google Cloud and the direct runner that executes the pipeline directly in a local environment.

Using pipeline options

You can set the pipeline runner and other execution parameters by using the Apache Beam SDK class PipelineOptions.

There are two methods for specifying pipeline options:

  • Set them programmatically by supplying a list of pipeline options.
  • Set them directly on the command line when you run your pipeline code.

Setting pipeline options programmatically

You can set pipeline options programmatically by creating and modifying a PipelineOptions object.

Java

Construct a PipelineOptions object using the method PipelineOptionsFactory.fromArgs.

For an example, view the Launching on Dataflow sample.

Python

Create a PipelineOptions object.

For an example, view the Launching on Dataflow sample.

Go

Setting pipeline options programmatically using PipelineOptions is not supported in the Apache Beam SDK for Go. Use Go command-line arguments.

For an example, view the Launching on Dataflow sample.

Setting pipeline options on the command line.

You can set pipeline options using command-line arguments.

Java

To view an example of this syntax, see the Java quickstart samples.

Python

To view an example of this syntax, see the Python quickstart samples.

Go

To view an example of this syntax, see the Go quickstart samples.

Accessing the pipeline options object

You pass PipelineOptions when you create your Pipeline object in your Apache Beam program. When the Dataflow service runs your pipeline, it sends a copy of the PipelineOptions to each worker.

Java

You can access PipelineOptions inside any ParDo's DoFn instance by using the method ProcessContext.getPipelineOptions.

Python

This feature is not supported in the Apache Beam SDK for Python.

Go

You can access pipeline options using beam.PipelineOptions.

Launching on Dataflow

You can run your job on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with Dataflow creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project.

Setting required options

To execute your pipeline using Dataflow, set the following pipeline options:

Java

  • project: the ID of your Google Cloud project.
  • runner: the pipeline runner that executes your pipeline. For Google Cloud execution, this must be DataflowRunner.
  • gcpTempLocation: a Cloud Storage path for Dataflow to stage most temporary files. If you want to specify a bucket, you must create the bucket ahead of time. If you don't set gcpTempLocation, you can set the pipeline option tempLocation and then gcpTempLocation is set to the value of tempLocation. If neither are specified, a default gcpTempLocation is created.
  • stagingLocation: a Cloud Storage bucket for Dataflow to stage your binary files. If you're using the Apache Beam SDK 2.28 or higher, do not set this option. For the Apache Beam SDK 2.28 or lower, if you do not set this option, what you specified for the tempLocation is used for the staging location.

    A default gcpTempLocation is created if neither it nor tempLocation is specified. If tempLocation is specified and gcpTempLocation is not, tempLocation must be a Cloud Storage path, and gcpTempLocation defaults to it. If tempLocation is not specified and gcpTempLocation is, tempLocation is not populated.

Python

  • project: your Google Cloud project ID.
  • region: the regional endpoint for your Dataflow job.
  • runner: the pipeline runner that executes your pipeline. For Google Cloud execution, this must be DataflowRunner.
  • temp_location: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

Go

  • project: your Google Cloud project ID.
  • region: the regional endpoint for your Dataflow job.
  • runner: the pipeline runner that executes your pipeline. For Google Cloud execution, this must be dataflow.
  • staging_location: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

Setting pipeline options programmatically

The following example code shows how to construct a pipeline by programmatically setting the runner and other required options to execute the pipeline using Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

The Apache Beam SDK for Go uses Go command-line arguments. Use flag.Set() to set flag values.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Using pipeline options from the command line

The following example shows how to use pipeline options that are specified on the command line. This example doesn't set the pipeline options programmatically.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Use the Python argparse module to parse command-line options.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Use the Go flag package to parse command-line options. You must parse the options before you call beam.Init(). In this example, output is a command-line option.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Controlling execution modes

When an Apache Beam program runs a pipeline on a service such as Dataflow, the program can either run the pipeline asynchronously, or can block until pipeline completion. You can change this behavior by using the following guidance.

Java

When an Apache Beam Java program runs a pipeline on a service such as Dataflow, it is typically executed asynchronously. To run a pipeline and wait until the job completes, set DataflowRunner as the pipeline runner and explicitly call pipeline.run().waitUntilFinish().

When you use DataflowRunner and call waitUntilFinish() on the PipelineResult object returned from pipeline.run(), the pipeline executes on Google Cloud but the local code waits for the cloud job to finish and return the final DataflowPipelineJob object. While the job runs, the Dataflow service prints job status updates and console messages while it waits.

Python

When an Apache Beam Python program runs a pipeline on a service such as Dataflow, it is typically executed asynchronously. To block until pipeline completion, use the wait_until_finish() method of the PipelineResult object, returned from the run() method of the runner.

Go

When an Apache Beam Go program runs a pipeline on Dataflow, it is synchronous by default and blocks until pipeline completion. If you don't want to block, there are two options:

  1. Start the job in a Go routine.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Use the --async command-line flag, which is in the jobopts package.

To view execution details, monitor progress, and verify job completion status, use the Dataflow monitoring interface or the Dataflow command line interface.

Using streaming sources

Java

If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.

Python

If your pipeline uses an unbounded data source, such as Pub/Sub, you must set the streaming option to true.

Go

If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.

Streaming jobs use a Compute Engine machine type of n1-standard-2 or higher by default.

Launching locally

Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud project.

When you use local execution, you must run your pipeline with datasets small enough to fit in local memory. You can create a small in-memory data set using a Create transform, or you can use a Read transform to work with small local or remote files. Local execution provides a fast and easy way to perform testing and debugging with fewer external dependencies but is limited by the memory available in your local environment.

The following example code shows how to construct a pipeline that executes in your local environment.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input_file)
      | beam.io.WriteToText(args.output_path))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

After you've constructed your pipeline, run it.

Creating custom pipeline options

You can add your own custom options in addition to the standard PipelineOptions. Apache Beam's command line can also parse custom options using command line arguments specified in the same format.

Java

To add your own options, define an interface with getter and setter methods for each option, as in the following example:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

To add your own options, use the add_argument() method (which behaves exactly like Python's standard argparse module, as in the following example:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input-file')
    parser.add_argument('--output-path')

Go

To add your own options, use the Go flag package as shown in the following example:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

You can also specify a description, which appears when a user passes --help as a command-line argument, and a default value.

Java

You set the description and default value using annotations, as follows:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

We recommend that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory validates that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Now your pipeline can accept --myCustomOption=value as a command-line argument.

Python

You set the description and default value as follows:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input-file',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output-path',
        required=True,
        help='The path prefix for output files.')

Go

You set the description and default value as follows:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)