Setting pipeline options

This page explains how to set pipeline options for your Dataflow jobs. These pipeline options configure how and where your pipeline executes and which resources it uses.

Pipeline execution is separate from your Apache Beam program's execution. The Apache Beam program that you've written constructs a pipeline for deferred execution. This means that the program generates a series of steps that any supported Apache Beam runner can execute. Compatible runners include the Dataflow runner on Google Cloud and the direct runner that executes the pipeline directly in a local environment.

Using pipeline options

You can set the pipeline runner and other execution parameters by using the Apache Beam SDK class PipelineOptions.

There are two methods for specifying pipeline options:

  • Set them programmatically by supplying a list of pipeline options.
  • Set them directly on the command line when you run your pipeline code.

Setting pipeline options programmatically

You can set pipeline options programmatically by creating and modifying a PipelineOptions object.

Java

Construct your PipelineOptions object using the method PipelineOptionsFactory.fromArgs.

For an example, view the Launching on Dataflow sample. This example sets the streaming pipeline option programmatically.

Python

Create a PipelineOptions object.

For an example, view the Launching on Dataflow sample. This example sets the streaming pipeline option programmatically.

Setting pipeline options on the command line.

You can set pipeline options using command line arguments.

To view an example of this syntax, read the Java quickstart and Python quickstart samples.

Accessing the pipeline options object

You pass PipelineOptions when you create your Pipeline object in your Apache Beam program. When the Dataflow service runs your pipeline, it sends a copy of the PipelineOptions to each worker.

Java

You can access PipelineOptions inside any ParDo's DoFn instance by using the method ProcessContext.getPipelineOptions.

Python

This feature is not supported in the Apache Beam SDK for Python.

Launching on Dataflow

You can run your job on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with Dataflow creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project.

Setting required options

To execute your pipeline using Dataflow, set the following pipeline options:

Java

  • project: the ID of your Google Cloud project.
  • runner: the pipeline runner that executes your pipeline. For Google Cloud execution, this must be DataflowRunner.
  • gcpTempLocation: a Cloud Storage path for Dataflow to stage most temporary files. You must create this bucket ahead of time, before running your pipeline. In case you don't set gcpTempLocation, you can set the pipeline option tempLocation and then gcpTempLocation is set to the value of tempLocation. If neither are specified, a default gcpTempLocation is created.
  • stagingLocation: a Cloud Storage bucket for Dataflow to stage your binary files. If you're using the Apache Beam SDK 2.28 or higher, do not set this option. For the Apache Beam SDK 2.28 or lower, if you do not set this option, what you specified for the tempLocation is used for the staging location.
  • A default gcpTempLocation is created if neither it nor tempLocation is specified. If tempLocation is specified and gcpTempLocation is not, tempLocation must be a Cloud Storage path, and gcpTempLocation defaults to it. If tempLocation is not specified and gcpTempLocation is, tempLocation is not populated.

Python

  • project: your Google Cloud project ID.
  • region: the regional endpoint for your Dataflow job.
  • runner: the pipeline runner that executes your pipeline. For Google Cloud execution, this must be DataflowRunner.
  • temp_location: a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

Setting pipeline options programmatically

The following example code shows how to construct a pipeline by programmatically setting the runner and other required options to execute the pipeline using Dataflow.

Java

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For cloud execution, set the Google Cloud project, staging location,
  // and set DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Setting pipeline options using the command line

The following example shows how to set pipeline options using the command line. This example doesn't set the pipeline options programmatically:

Java

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Controlling execution modes

When an Apache Beam program is configured to run a pipeline on a service like Dataflow, it is typically executed asynchronously. You can run a pipeline and wait until the job completes by using the following:

Java

Set DataflowRunner as the pipeline runner and explicitly call pipeline.run().waitUntilFinish().

When you use DataflowRunner and call waitUntilFinish() on the PipelineResult object returned from pipeline.run(), the pipeline executes on Google Cloud but the local code waits for the cloud job to finish and return the final DataflowPipelineJob object. While the job runs, the Dataflow service prints job status updates and console messages while it waits.

Python

To block until pipeline completion, use the wait_until_finish() method of the PipelineResult object, returned from the run() method of the runner.

To view execution details, monitor progress, and verify job completion status, use the Dataflow monitoring interface or the Dataflow command line interface.

Using streaming sources

Java

If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.

Python

If your pipeline uses an unbounded data source, such as Pub/Sub, you must set the streaming option to true.

Streaming jobs use a Compute Engine machine type of n1-standard-2 or higher by default.

Launching locally

Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud project.

When you use local execution, you must run your pipeline with datasets small enough to fit in local memory. You can create a small in-memory data set using a Create transform, or you can use a Read transform to work with small local or remote files. Local execution provides a fast and easy way to perform testing and debugging with fewer external dependencies but is limited by the memory available in your local environment.

The following example code shows how to construct a pipeline that executes in your local environment.

Java

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input_file)
      | beam.io.WriteToText(args.output_path))

After you've constructed your pipeline, run it.

Creating custom pipeline options

You can add your own custom options in addition to the standard PipelineOptions. Apache Beam's command line can also parse custom options using command line arguments specified in the same format.

Java

To add your own options, define an interface with getter and setter methods for each option, as in the following example:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

To add your own options, use the add_argument() method (which behaves exactly like Python's standard argparse module), as in the following example:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input-file')
    parser.add_argument('--output-path')

You can also specify a description, which appears when a user passes --help as a command line argument, and a default value.

Java

You set the description and default value using annotations, as follows:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

We recommend that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory validates that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Now your pipeline can accept --myCustomOption=value as a command line argument.

Python

You set the description and default value as follows:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input-file',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output-path',
        required=True,
        help='The path prefix for output files.')