Specifying Pipeline Execution Parameters

Once your Cloud Dataflow program has constructed a pipeline, you'll need to have the pipeline executed. Pipeline execution is separate from your Cloud Dataflow program's execution; your Cloud Dataflow program constructs the pipeline, and the code you've written generates a series of steps to be executed by a pipeline runner. The pipeline runner can be the Cloud Dataflow managed service on Google Cloud Platform, a third-party runner service, or a local pipeline runner that executes the steps directly in the local environment.

You can specify the pipeline runner and other execution options by using the Dataflow SDK class PipelineOptions. You use PipelineOptions to configure how and where your pipeline executes and what resources it uses.

Much of the time, you'll want your pipeline to run on managed Cloud Platform resources by using the Cloud Dataflow runner service. Running your pipeline with the Cloud Dataflow service creates a Cloud Dataflow job, which uses the Google Compute Engine and Google Cloud Storage resources in your Cloud Platform project.

You can also run your pipeline locally. When you run your pipeline locally, the pipeline transforms are executed on the same machine where your Cloud Dataflow program executes. Local execution is useful for testing and debugging purposes, especially if your pipeline can use smaller in-memory datasets.

Setting PipelineOptions

You pass PipelineOptions when you create your Pipeline object in your Dataflow program. (See Constructing Your Pipeline for more information.) When the Dataflow service runs your pipeline, it sends a copy of the PipelineOptions to each worker instance.

Java: SDK 1.x

Note: You can access PipelineOptions inside any ParDo's DoFn instance by using the method ProcessContext.getPipelineOptions.

Java: SDK 2.x

Note: You can access PipelineOptions inside any ParDo's DoFn instance by using the method ProcessContext.getPipelineOptions.

Python

This feature is not yet supported in the Dataflow SDK for Python.

Setting PipelineOptions from Command-Line Arguments

While you can configure your pipeline by creating a PipelineOptions object and setting the fields directly, the Dataflow SDKs include a command-line parser that you can use to set fields in PipelineOptions using command-line arguments.

Java: SDK 1.x

To read options from the command-line, construct your PipelineOptions object using the method PipelineOptionsFactory.fromArgs, as in the following example code:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Note: Appending the method .withValidation causes Dataflow to check for required command-line arguments and validate argument values.

Using PipelineOptionsFactory.fromArgs interprets command-line arguments that follow the format:

--<option>=<value>

Building your PipelineOptions this way lets you specify any of the options in any subinterface of com.google.cloud.dataflow.sdk.options.PipelineOptions as a command-line argument.

Java: SDK 2.x

To read options from the command-line, construct your PipelineOptions object using the method PipelineOptionsFactory.fromArgs, as in the following example code:

PipelineOptions options = PipelineOptions.fromArgs(args).withValidation().create();

Note: Appending the method .withValidation causes Dataflow to check for required command-line arguments and validate argument values.

Using PipelineOptionsFactory.fromArgs interprets command-line arguments that follow the format:

--<option>=<value>

Building your PipelineOptions this way lets you specify any of the options in any subinterface of org.apache.beam.sdk.options.PipelineOptions as a command-line argument.

Python

To read options from the command-line, create your PipelineOptions object, as in the following example code:

options = PipelineOptions(flags=argv)

The argument (flags=argv) to PipelineOptions interprets command-line arguments that follow the format:

--<option>=<value>

Building your PipelineOptions this way lets you specify any of the options by subclassing from PipelineOptions.

Creating Custom Options

You can add your own custom options in addition to the standard PipelineOptions. Dataflow's command-line parser can also set your custom options using command-line arguments specified in the same format.

Java: SDK 1.x

To add your own options, define an interface with getter and setter methods for each option, as in the following example:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Java: SDK 2.x

To add your own options, define an interface with getter and setter methods for each option, as in the following example:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

To add your own options, use the add_argument() method (which behaves exactly like Python's standard argparse module), as in the following example:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

You can also specify a description, which appears when a user passes --help as a command-line argument, and a default value.

Java: SDK 1.x

You set the description and default value using annotations, as follows:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("D-FAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

It's recommended that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory will also validate that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                                .withValidation()
                                                .as(MyOptions.class);

Now your pipeline can accept --myCustomOption=value as a command-line argument.

Java: SDK 2.x

You set the description and default value using annotations, as follows:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("D-FAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

It's recommended that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory will also validate that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
...
MyOptions myOptions = options.as(MyOptions.class);

Note: TODO

Now your pipeline can accept --myCustomOption=value as a command-line argument.

Python

You set the description and default value as follows:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

Configuring PipelineOptions for Execution on the Cloud Dataflow Service

To execute your pipeline using the Cloud Dataflow managed service, you'll need to set the following fields in PipelineOptions:

Java: SDK 1.x

  • project - The ID of your Cloud Platform project.
  • runner - The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be either DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  • stagingLocation - A Google Cloud Storage bucket for Dataflow to stage your binary and any temporary files. You must create this bucket ahead of time, before running your pipeline.

Java: SDK 2.x

  • project - The ID of your Cloud Platform project.
  • runner - The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.
  • gcpTempLocation - A Google Cloud Storage path for Dataflow to stage any temporary files. You must create this bucket ahead of time, before running your pipeline. In case you don't specify gcpTempLocation, you can specify the pipeline option tempLocation and then gcpTempLocation will be set to the value of tempLocation. If neither are specified, a default gcpTempLocation will be created.
  • stagingLocation - A Google Cloud Storage bucket for Dataflow to stage your binary files. If you do not set this option, what you specified for the tempLocation will be used for the staging location as well.
  • A default gcpTempLocation will be created if neither it nor tempLocation are specified. If tempLocation is specified and gcpTempLocation is not, tempLocation must be a Cloud Storage path, and gcpTempLocation will be defaulted to it. If tempLocation is not specified and gcpTempLocation is, tempLocation will not be populated.

Python

  • job_name - The name of the Cloud Dataflow job being executed.
  • project - The ID of your Cloud Platform project.
  • runner - The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.
  • staging_location - A Google Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.
  • temp_location - A Google Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

You can set these options programmatically, or specify them using the command-line. The following example code shows how to construct a pipeline by programmatically setting the runner and other necessary options to execute the pipeline using the Cloud Dataflow managed service.

Java: SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

The following example code shows how to set the required options for Dataflow service execution using the command-line:

Java: SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Java: SDK 1.x

When passing the required options on the command-line, use the --project, --runner, and --stagingLocation options.

Java: SDK 2.x

When passing the required options on the command-line, use the --project, --runner, --tempLocation, and, optionally, the --stagingLocation options.

Python

When passing the required options on the command-line, use the --project, --runner, and --stagingLocation options.

Asynchronous Execution

Java: SDK 1.x

Using DataflowPipelineRunner causes your pipeline to execute asynchronously on Google's cloud. While your pipeline executes you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Java: SDK 2.x

Using DataflowRunner causes your pipeline to execute asynchronously on Google's cloud. While your pipeline executes you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Python

Using DataflowRunner causes your pipeline to execute asynchronously on Google's cloud. While your pipeline executes you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Blocking Execution

Java: SDK 1.x

Specify BlockingDataflowPipelineRunner as the pipeline runner.

When you use BlockingDataflowPipelineRunner, the pipeline executes on the cloud in the same way as DataflowPipelineRunner, but waits for the launched job to finish. While the job runs, the Cloud Dataflow service prints job status updates and console messages while it waits. Using BlockingDataflowPipelineRunner returns the final DataflowPipelineJob object.

Java: SDK 2.x

Specify DataflowRunner as the pipeline runner and explicitly call pipeline.run().waitUntilFinish().

When you use DataflowRunner and call waitUntilFinish() on the PipelineResult object returned from pipeline.run(), the pipeline executes on the cloud in the same way as DataflowRunner, but waits for the launched job to finish and returns the final DataflowPipelineJob object. While the job runs, the Cloud Dataflow service prints job status updates and console messages while it waits.

If you were a Java SDK 1.x user and used --runner BlockingDataflowPipelineRunner on the command line to interactively induce your main program to block until the pipeline has terminated, then with Java 2.x your main program needs to explicitly call waitUntilFinish().

Python

To block until pipeline completion, use the wait_until_finish() method of the PipelineResult object, returned from the run() method of the runner.

Note: Typing Ctrl+C from the command line does not cancel your job. The Cloud Dataflow service is still executing the job on Cloud Platform; to cancel the job, you'll need to use the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Streaming Execution

Java: SDK 1.x

If your pipeline uses an unbounded data source or sink (such as Pub/Sub), you must set the streaming option in PipelineOptions to true. You can set the streaming option programmatically as shown in the following example:

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

If your pipeline uses unbounded data sources and sinks, it is necessary to pick a Windowing strategy for your unbounded PCollections before any aggregation such as a GroupByKey can be used.

Java: SDK 2.x

If your pipeline reads from an unbounded data source (such as Pub/Sub), the pipeline will automatically be executed in streaming mode.

If your pipeline uses unbounded data sources and sinks, it is necessary to pick a Windowing strategy for your unbounded PCollections before any aggregation such as a GroupByKey can be used.

Python

This feature is not yet supported in the Dataflow SDK for Python.

Setting Other Cloud Pipeline Options

To run your pipeline on the cloud, you can programmatically set the following fields in your PipelineOptions object:

Java: SDK 1.x

Field Type Description Default Value
runner Class (NameOfRunner) The PipelineRunner to use. This field allows you to determine the PipelineRunner at runtime. DirectPipelineRunner (local mode)
streaming boolean Whether streaming mode is enabled or disabled; true if enabled. false
project String The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Cloud Dataflow managed service. If not set, defaults to the currently configured project in the Cloud SDK.
tempLocation String Google Cloud Storage path for temporary files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to the value for stagingLocation. You must specify at least one of tempLocation or stagingLocation to run your pipeline on the Google cloud.
stagingLocation String Google Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to a staging directory within tempLocation. You must specify at least one of tempLocation or stagingLocation to run your pipeline on the Google cloud.
autoscalingAlgorithm String The Autoscaling mode to use for your Dataflow job. Possible values are THROUGHPUT_BASED to enable Autoscaling, or NONE to disable. See Autotuning Feaures to learn more about how Autoscaling works in the Dataflow managed service. Defaults to THROUGHPUT_BASED for all batch Dataflow jobs that use Dataflow SDK for Java version 1.6.0 or later; defaults to NONE for streaming jobs or batch jobs using earlier versions of the Dataflow SDK for Java.
numWorkers int The initial number of Google Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. If unspecified, the Dataflow service will determine an appropriate number of workers.
maxNumWorkers int The maximum number of Google Compute Engine instances to be made available to your pipeline during execution. Note that this can be higher than the initial number of workers (specified by numWorkers to allow your job to scale up, automatically or otherwise. If unspecified, the Dataflow service will determine an appropriate number of workers.
zone String The Compute Engine availability zone for launching worker instances to run your pipeline. If not set, defaults to us-central1-f. You can specify a different availability zone to ensure, for example, that your workers are launched in the EU.
filesToStage List<String> A list of local files, directories of files, or archives (.jar, .zip) to make available to each worker. If you set this option, then only those files you specify will be uploaded (the Java classpath will be ignored). You must specify all of your resources in the correct classpath order. Resources are not limited to code, but can also include configuration files and other resources to make available to all workers. Your code can access the listed resources using Java's standard resource lookup methods. Cautions: Specifying a directory path is sub-optimal since Cloud Dataflow will zip the files before uploading, which involves a higher startup time cost. Also, don't use this option to transfer data to workers that is meant to be processed by the pipeline since doing so is significantly slower than using native Cloud Storage/BigQuery APIs combined with the appropriate Cloud Dataflow data source. If filesToStage is blank, Cloud Dataflow will infer the files to stage based on the Java classpath. The considerations and cautions mentioned in the left column also apply here (types of files to list and how to access them from your code).
diskSizeGb int The disk size, in gigabytes, to use on each remote Compute Engine worker instance. The minimum disk size is 30 GB, to account for the worker boot image and local logs. If your pipeline shuffles data, you should allocate more than the minimum.

Set to 0 to use the default size defined in your Cloud Platform project.

Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs may result in increased runtime and job cost.

network String The Google Compute Engine network for launching Compute Engine instances to run your pipeline. The Dataflow service determines the default value.
workerMachineType String The Google Compute Engine machine type that Dataflow will use when spinning up worker VMs. Dataflow supports n1 series and custom machine types. Shared core machine types such as f1 and g1 series workers are not supported under Dataflow's Service Level Agreement. The Dataflow service will choose the machine type based on your job if you do not set this option.

See the API for Java reference documentation for the PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.

Java: SDK 2.x

Field Type Description Default Value
runner Class (NameOfRunner) The PipelineRunner to use. This field allows you to determine the PipelineRunner at runtime. DirectRunner (local mode)
streaming boolean Whether streaming mode is enabled or disabled; true if enabled. If your pipeline reads from an unbounded source, deafult value is true. Otherwise, false.
project String The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Cloud Dataflow managed service. If not set, defaults to the currently configured project in the Cloud SDK.
gcpTempLocation String Google Cloud Storage path for temporary files. Must be a valid Cloud Storage URL, beginning with gs://.
stagingLocation String Google Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to what you specified for tempLocation.
autoscalingAlgorithm String The Autoscaling mode to use for your Dataflow job. Possible values are THROUGHPUT_BASED to enable Autoscaling, or NONE to disable. See Autotuning Feaures to learn more about how Autoscaling works in the Dataflow managed service. Defaults to THROUGHPUT_BASED for all batch Dataflow jobs that use Dataflow SDK for Java version 1.6.0 or later; defaults to NONE for streaming jobs or batch jobs using earlier versions of the Dataflow SDK for Java.
numWorkers int The initial number of Google Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. If unspecified, the Dataflow service will determine an appropriate number of workers.
maxNumWorkers int The maximum number of Google Compute Engine instances to be made available to your pipeline during execution. Note that this can be higher than the initial number of workers (specified by numWorkers to allow your job to scale up, automatically or otherwise. If unspecified, the Dataflow service will determine an appropriate number of workers.
region (beta) String Specifying a regional endpoint allows you to define a region for deploying your Cloud Dataflow jobs. If not set, defaults to us-central1.
zone String The Compute Engine availability zone for launching worker instances to run your pipeline. If you specified the region parameter, the zone parameter will default to an availability zone from the corresponding region. You can override this behavior by specifying a different availability zone.
filesToStage List<String> A list of local files, directories of files, or archives (.jar, .zip) to make available to each worker. If you set this option, then only those files you specify will be uploaded (the Java classpath will be ignored). You must specify all of your resources in the correct classpath order. Resources are not limited to code, but can also include configuration files and other resources to make available to all workers. Your code can access the listed resources using Java's standard resource lookup methods. Cautions: Specifying a directory path is sub-optimal since Cloud Dataflow will zip the files before uploading, which involves a higher startup time cost. Also, don't use this option to transfer data to workers that is meant to be processed by the pipeline since doing so is significantly slower than using native Cloud Storage/BigQuery APIs combined with the appropriate Cloud Dataflow data source. If filesToStage is blank, Cloud Dataflow will infer the files to stage based on the Java classpath. The considerations and cautions mentioned in the left column also apply here (types of files to list and how to access them from your code).
diskSizeGb int The disk size, in gigabytes, to use on each remote Compute Engine worker instance. The minimum disk size is 30 GB, to account for the worker boot image and local logs. If your pipeline shuffles data, you should allocate more than the minimum.

Set to 0 to use the default size defined in your Cloud Platform project.

Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs may result in increased runtime and job cost.

network String The Google Compute Engine network for launching Compute Engine instances to run your pipeline. The Dataflow service determines the default value.
workerMachineType String The Google Compute Engine machine type that Dataflow will use when spinning up worker VMs. Dataflow supports n1 series and custom machine types. Shared core machine types such as f1 and g1 series workers are not supported under Dataflow's Service Level Agreement. The Dataflow service will choose the machine type based on your job if you do not set this option.

See the API for Java reference documentation for the PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.

Python

Field Type Description Default Value
runner str The PipelineRunner to use. This field can be either DirectRunner or DataflowRunner. DirectRunner (local mode)
streaming bool Whether streaming mode is enabled or disabled; true if enabled. false
project str The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Cloud Dataflow managed service. If not set, throws an error.
temp_location str Google Cloud Storage path for temporary files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to the value for staging_location. You must specify at least one of temp_location or staging_location to run your pipeline on the Google cloud.
staging_location str Google Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to a staging directory within temp_location. You must specify at least one of temp_location or staging_location to run your pipeline on the Google cloud.
autoscaling_algorithm str The Autoscaling mode to use for your Dataflow job. Possible values are THROUGHPUT_BASED to enable Autoscaling, or NONE to disable. See Autotuning Feaures to learn more about how Autoscaling works in the Dataflow managed service. Defaults to THROUGHPUT_BASED for all batch Dataflow jobs.
num_workers int The number of Google Compute Engine instaces to use when executing your pipeline. Set to 0 to use the default size determined by the Dataflow service.
region (beta) str Specifying a regional endpoint allows you to define a region for deploying your Cloud Dataflow jobs. If not set, defaults to us-central1.
zone str The Compute Engine availability zone for launching worker instances to run your pipeline. If you specified the region parameter, the zone parameter will default to an availability zone from the corresponding region. You can override this behavior by specifying a different availability zone.
disk_size_gb int The disk size, in gigabytes, to use on each remote Compute Engine worker instance. The minimum disk size is 30 GB, to account for the worker boot image and local logs. If your pipeline shuffles data, you should allocate more than the minimum. Set to 0 to use the default size defined in your Cloud Platform project.
network str The Google Compute Engine network for launching Compute Engine instances to run your pipeline. The Dataflow service determines the default value.
worker_machine_type str The Google Compute Engine machine type that Dataflow will use when spinning up worker VMs. The Dataflow service will choose the machine type based on your job if you do not set this option.

Configuring PipelineOptions for Local Execution

Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Cloud Dataflow service and associated Google Cloud Project.

When you use local execution, it is highly recommended that you run your pipeline with datasets small enough to fit in local memory. You can create a small in-memory data set using a Create transform, or you can use a Read transform to work with small local or remote files. Local execution provides a fast and easy way to perform testing and debugging with fewer external dependencies, but will be limited by the memory available in your local environment.

The following example code shows how to construct a pipeline that executes in your local environment.

Java: SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Note: For local mode, you do not need to set the runner since the DirectPipelineRunner is already the default.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Note: For local mode, you do not need to set the runner since the DirectRunner is already the default. However, you do need to either explicitly include the DirectRunner as a dependency or add it to the classpath.

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

Note: For local mode, you do not need to set the runner since the DirectRunner is already the default.

After you've constructed your pipeline, run it.

Setting Other Local Pipeline Options

When executing your pipeline locally, the default values for the properties in PipelineOptions are generally sufficient.

Java: SDK 1.x

You can find the default values for Java PipelineOptions in the Java API reference; see the PipelineOptions class listing for complete details.

If your pipeline uses cloud services such as BigQuery or Google Cloud Storage for IO, you might need to set certain cloud project and credential options. In such cases, you should use GcpOptions.setProject to set your Google Cloud Project ID. You may also need to set credentials explicitly if the default credentials are not sufficient; see the GCPOptions class for complete details.

Java: SDK 2.x

You can find the default values for Java PipelineOptions in the Java API reference; see the PipelineOptions class listing for complete details.

If your pipeline uses cloud services such as BigQuery or Google Cloud Storage for IO, you might need to set certain cloud project and credential options. In such cases, you should use GcpOptions.setProject to set your Google Cloud Project ID. You may also need to set credentials explicitly if the default credentials are not sufficient; see the GCPOptions class for complete details.

Python

You can find the default values for Python PipelineOptions in the options.py module.

If your pipeline uses cloud services such as BigQuery or Google Cloud Storage for IO, you might need to set certain cloud project and credential options. In such cases, you should use options.view_as(GoogleCloudOptions).project to set your Google Cloud Project ID. You may also need to set credentials explicitly if the default credentials are not sufficient; see the GoogleCloudOptions class in the options.py module for complete details.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation