Specifying pipeline execution parameters

Once your Apache Beam program has constructed a pipeline, you'll need to have the pipeline executed. Pipeline execution is separate from your Apache Beam program's execution; your Apache Beam program constructs the pipeline, and the code you've written generates a series of steps to be executed by a pipeline runner. The pipeline runner can be the Dataflow managed service on Google Cloud, a third-party runner service, or a local pipeline runner that executes the steps directly in the local environment.

You can specify the pipeline runner and other execution options by using the Apache Beam SDK class PipelineOptions. You use PipelineOptions to configure how and where your pipeline executes and what resources it uses.

Much of the time, you'll want your pipeline to run on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with the Dataflow service creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project.

You can also run your pipeline locally. When you run your pipeline locally, the pipeline transforms are executed on the same machine where your Dataflow program executes. Local execution is useful for testing and debugging purposes, especially if your pipeline can use smaller in-memory datasets.

Setting PipelineOptions

You pass PipelineOptions when you create your Pipeline object in your Dataflow program. When the Dataflow service runs your pipeline, it sends a copy of the PipelineOptions to each worker instance.

Java: SDK 2.x

Note: You can access PipelineOptions inside any ParDo's DoFn instance by using the method ProcessContext.getPipelineOptions.

Python

This feature is not yet supported in the Apache Beam SDK for Python.

Java: SDK 1.x

Setting PipelineOptions from command-line arguments

While you can configure your pipeline by creating a PipelineOptions object and setting the fields directly, the Apache Beam SDKs include a command-line parser that you can use to set fields in PipelineOptions using command-line arguments.

Java: SDK 2.x

To read options from the command-line, construct your PipelineOptions object using the method PipelineOptionsFactory.fromArgs, as in the following example code:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Note: Appending the method .withValidation causes Dataflow to check for required command-line arguments and validate argument values.

Using PipelineOptionsFactory.fromArgs interprets command-line arguments that follow the format:

--<option>=<value>

Building your PipelineOptions this way lets you specify any of the options in any subinterface of org.apache.beam.sdk.options.PipelineOptions as a command-line argument.

Python

To read options from the command-line, create your PipelineOptions object, as in the following example code:

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

The argument (flags=argv) to PipelineOptions interprets command-line arguments that follow the format:

--<option>=<value>

Building your PipelineOptions this way lets you specify any of the options by subclassing from PipelineOptions.

Java: SDK 1.x

Creating custom options

You can add your own custom options in addition to the standard PipelineOptions. Dataflow's command-line parser can also set your custom options using command-line arguments specified in the same format.

Java: SDK 2.x

To add your own options, define an interface with getter and setter methods for each option, as in the following example:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

To add your own options, use the add_argument() method (which behaves exactly like Python's standard argparse module), as in the following example:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

You can also specify a description, which appears when a user passes --help as a command-line argument, and a default value.

Java: SDK 2.x

You set the description and default value using annotations, as follows:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

We recommend that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory will also validate that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Now your pipeline can accept --myCustomOption=value as a command-line argument.

Python

You set the description and default value as follows:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java: SDK 1.x

Configuring PipelineOptions for execution on the Cloud Dataflow service

To execute your pipeline using the Dataflow managed service, you must set the following fields in PipelineOptions:

Java: SDK 2.x

  • project - The ID of your Google Cloud project.
  • runner - The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.
  • gcpTempLocation - A Cloud Storage path for Dataflow to stage any temporary files. You must create this bucket ahead of time, before running your pipeline. In case you don't specify gcpTempLocation, you can specify the pipeline option tempLocation and then gcpTempLocation will be set to the value of tempLocation. If neither are specified, a default gcpTempLocation will be created.
  • stagingLocation - A Cloud Storage bucket for Dataflow to stage your binary files. If you do not set this option, what you specified for the tempLocation will be used for the staging location as well.
  • A default gcpTempLocation will be created if neither it nor tempLocation is specified. If tempLocation is specified and gcpTempLocation is not, tempLocation must be a Cloud Storage path, and gcpTempLocation will be defaulted to it. If tempLocation is not specified and gcpTempLocation is, tempLocation will not be populated.

Note: If you use the Apache Beam SDK for Java 2.15.0 or later, you must also specify region.

Python

  • project - The ID of your Google Cloud project.
  • runner - The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.
  • staging_location - A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.
  • temp_location - A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

Note: If you use the Apache Beam SDK for Python 2.15.0 or later, you must also specify region.

Java: SDK 1.x

You can set these options programmatically, or specify them using the command-line. The following example code shows how to construct a pipeline by programmatically setting the runner and other necessary options to execute the pipeline using the Dataflow managed service.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java: SDK 1.x

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

The following example code shows how to set the required options for Dataflow service execution using the command-line:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java: SDK 1.x

After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.

Java: SDK 2.x

When passing the required options on the command-line, use the --project, --runner, --gcpTempLocation, and, optionally, the --stagingLocation options.

Python

When passing the required options on the command-line, use the --project, --runner, and --staging_location options.

Java: SDK 1.x

Asynchronous execution

Java: SDK 2.x

Using DataflowRunner causes your pipeline to execute asynchronously on Google's cloud. While your pipeline executes you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Python

Using DataflowRunner causes your pipeline to execute asynchronously on Google's cloud. While your pipeline executes you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Java: SDK 1.x

Synchronous execution

Java: SDK 2.x

Specify DataflowRunner as the pipeline runner and explicitly call pipeline.run().waitUntilFinish().

When you use DataflowRunner and call waitUntilFinish() on the PipelineResult object returned from pipeline.run(), the pipeline executes on the cloud but the local code waits for the cloud job to finish and return the final DataflowPipelineJob object. While the job runs, the Dataflow service prints job status updates and console messages while it waits.

If you were a Java SDK 1.x user and used --runner BlockingDataflowPipelineRunner on the command line to interactively induce your main program to block until the pipeline has terminated, then with Java 2.x your main program needs to explicitly call waitUntilFinish().

Python

To block until pipeline completion, use the wait_until_finish() method of the PipelineResult object, returned from the run() method of the runner.

Java: SDK 1.x

Note: Typing Ctrl+C from the command line does not cancel your job. The Dataflow service is still executing the job on Google Cloud; to cancel the job, you'll need to use the Dataflow Monitoring Interface or the Dataflow Command-line Interface.

Streaming execution

Java: SDK 2.x

If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.

If your pipeline uses unbounded data sources and sinks, it is necessary to pick a Windowing strategy for your unbounded PCollections before you use any aggregation such as a GroupByKey.

Python

If your pipeline uses an unbounded data source or sink (such as Pub/Sub), you must set the streaming option to true.

Streaming jobs use a Compute Engine machine type of n1-standard-2 or higher by default. You must not override this, as n1-standard-2 is the minimum required machine type for running streaming jobs.

If your pipeline uses unbounded data sources and sinks, you must pick a Windowing strategy for your unbounded PCollections before you use any aggregation such as a GroupByKey.

Java: SDK 1.x

Setting other Cloud Dataflow pipeline options

To run your pipeline on the cloud, you can programmatically set the following fields in your PipelineOptions object:

Java: SDK 2.x

Field Type Description Default Value
runner Class (NameOfRunner) The PipelineRunner to use. This field allows you to determine the PipelineRunner at runtime. DirectRunner (local mode)
streaming boolean Specifies whether streaming mode is enabled or disabled; true if enabled. If your pipeline reads from an unbounded source, default value is true. Otherwise, false.
project String The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Dataflow managed service. If not set, defaults to the currently configured project in the Cloud SDK.
jobName String The name of the Dataflow job being executed as it appears in Dataflow's jobs list and job details. Also used when updating an existing pipeline. Dataflow generates a unique name automatically.
gcpTempLocation String Cloud Storage path for temporary files. Must be a valid Cloud Storage URL, beginning with gs://.
stagingLocation String Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to what you specified for tempLocation.
autoscalingAlgorithm String The autoscaling mode for your Dataflow job. Possible values are THROUGHPUT_BASED to enable autoscaling, or NONE to disable. See Autotuning features to learn more about how autoscaling works in the Dataflow managed service. Defaults to THROUGHPUT_BASED for all batch Dataflow jobs, and for streaming jobs that use Streaming Engine. Defaults to NONE for streaming jobs that do not use Streaming Engine.
numWorkers int The initial number of Google Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. If unspecified, the Dataflow service determines an appropriate number of workers.
maxNumWorkers int The maximum number of Compute Engine instances to be made available to your pipeline during execution. Note that this can be higher than the initial number of workers (specified by numWorkers to allow your job to scale up, automatically or otherwise. If unspecified, the Dataflow service will determine an appropriate number of workers.
numberOfWorkerHarnessThreads int The number of threads per worker harness. If unspecified, the Dataflow service determines an appropriate number of threads per worker.
region String Specifies a regional endpoint for deploying your Dataflow jobs. If not set, defaults to us-central1.
workerRegion String

Specifies a Compute Engine region for launching worker instances to run your pipeline. This option is used to run workers in a different location than the region used to deploy, manage, and monitor jobs. The zone for workerRegion is automatically assigned.

Note: This option cannot be combined with workerZone or zone.

If not set, defaults to the value set for region.
workerZone String

Specifies a Compute Engine zone for launching worker instances to run your pipeline. This option is used to run workers in a different location than the region used to deploy, manage, and monitor jobs.

Note: This option cannot be combined with workerRegion or zone.

If you specify either region or workerRegion, workerZone defaults to a zone from the corresponding region. You can override this behavior by specifying a different zone.
zone String (Deprecated) For Apache Beam SDK 2.17.0 or earlier, this specifies the Compute Engine zone for launching worker instances to run your pipeline. If you specify region, zone defaults to a zone from the corresponding region. You can override this behavior by specifying a different zone.
dataflowKmsKey String Specifies the customer-managed encryption key (CMEK) used to encrypt data at rest. You can control the encryption key through Cloud KMS. You must also specify gcpTempLocation to use this feature. If unspecified, Dataflow uses the default Google Cloud encryption instead of a CMEK.
flexRSGoal String Specifies Flexible Resource Scheduling (FlexRS) for autoscaled batch jobs. Affects the numWorkers, autoscalingAlgorithm, zone, region, and workerMachineType parameters. For more information, see the FlexRS pipeline options section. If unspecified, defaults to SPEED_OPTIMIZED, which is the same as omitting this flag. To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources.
filesToStage List<String> A list of local files, directories of files, or archives (.jar, .zip) to make available to each worker. If you set this option, then only those files you specify will be uploaded (the Java classpath will be ignored). You must specify all of your resources in the correct classpath order. Resources are not limited to code, but can also include configuration files and other resources to make available to all workers. Your code can access the listed resources using Java's standard resource lookup methods. Cautions: Specifying a directory path is sub-optimal since Dataflow will zip the files before uploading, which involves a higher startup time cost. Also, don't use this option to transfer data to workers that is meant to be processed by the pipeline since doing so is significantly slower than using native Cloud Storage/BigQuery APIs combined with the appropriate Dataflow data source. If filesToStage is blank, Dataflow will infer the files to stage based on the Java classpath. The considerations and cautions mentioned in the left column also apply here (types of files to list and how to access them from your code).
network String The Compute Engine network for launching Compute Engine instances to run your pipeline. See how to specify your network. If not set, Google Cloud assumes that you intend to use a network named default.
subnetwork String The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline. See how to specify your subnetwork. The Dataflow service determines the default value.
usePublicIps boolean Specifies whether Dataflow workers use public IP addresses. If the value is set to false, Dataflow workers use private IP addresses for all communication. In this case, if the subnetwork option is specified, the network option is ignored. Make sure that the specified network or subnetwork has Private Google Access enabled. If not set, the default value is true and Dataflow workers use public IP addresses.
enableStreamingEngine boolean Specifies whether Dataflow Streaming Engine is enabled or disabled; true if enabled. Enabling Streaming Engine allows you to run the steps of your streaming pipeline in the Dataflow service backend, thus conserving CPU, memory, and Persistent Disk storage resources. The default value is false. This means that the steps of your streaming pipeline are executed entirely on worker VMs.
diskSizeGb int

The disk size, in gigabytes, to use on each remote Compute Engine worker instance. If set, specify at least 30 GB to account for the worker boot image and local logs.

For batch jobs using Dataflow Shuffle, this option sets the size of a worker VM's boot disk. For batch jobs not using Dataflow Shuffle, this option sets the size of the disks used to store shuffled data; the boot disk size is not affected.

For streaming jobs using Streaming Engine, this option sets size of the boot disks. For streaming jobs not using Streaming Engine, this option sets the size of each additional persistent disk created by the Dataflow service; the boot disk is not affected. If a streaming job does not use Streaming Engine, you can set the boot disk size with the experiment flag streaming_boot_disk_size_gb. For example, specify --experiments=streaming_boot_disk_size_gb=80 to create boot disks of 80 GB.

Set to 0 to use the default size defined in your Cloud Platform project.

If a batch job uses Dataflow Shuffle, then the default is 25 GB; otherwise, the default is 250 GB.

If a streaming job uses Streaming Engine, then the default is 30 GB; otherwise, the default is 400 GB.

Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs not using Dataflow Shuffle or Streaming Engine may result in increased runtime and job cost.

serviceAccount String Specifies a user-managed controller service account, using the format my-service-account-name@<project-id>.iam.gserviceaccount.com. For more information, see the Controller service account section of the Cloud Dataflow security and permissions page. If not set, workers use your project's Compute Engine service account as the controller service account.
workerDiskType String The type of persistent disk to use, specified by a full URL of the disk type resource. For example, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd to specify a SSD persistent disk. For more information, see the Compute Engine API reference page for diskTypes. The Dataflow service determines the default value.
workerMachineType String

The Compute Engine machine type that Dataflow uses when starting worker VMs. You can use any of the available Compute Engine machine type families as well as custom machine types.

For best results, use n1 machine types. Shared core machine types, such as f1 and g1 series workers, are not supported under the Dataflow Service Level Agreement.

Note that Dataflow bills by the number of vCPUs and GB of memory in workers. Billing is independent of the machine type family.

The Dataflow service will choose the machine type based on your job if you do not set this option.

See the API for Java reference documentation for the PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.

Python

Field Type Description Default Value
runner str The PipelineRunner to use. This field can be either DirectRunner or DataflowRunner. DirectRunner (local mode)
streaming bool Specifies whether streaming mode is enabled or disabled; true if enabled. false
project str The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Dataflow managed service. If not set, throws an error.
job_name String The name of the Dataflow job being executed as it appears in Dataflow's jobs list and job details. Dataflow generates a unique name automatically.
temp_location str Cloud Storage path for temporary files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to the value for staging_location. You must specify at least one of temp_location or staging_location to run your pipeline on the Google cloud.
staging_location str Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. If not set, defaults to a staging directory within temp_location. You must specify at least one of temp_location or staging_location to run your pipeline on the Google cloud.
autoscaling_algorithm str The autoscaling mode for your Dataflow job. Possible values are THROUGHPUT_BASED to enable autoscaling, or NONE to disable. See Autotuning features to learn more about how autoscaling works in the Dataflow managed service. Defaults to THROUGHPUT_BASED for all batch Dataflow jobs, and for streaming jobs that use Streaming Engine. Defaults to NONE for streaming jobs that do not use Streaming Engine.
num_workers int The number of Compute Engine instances to use when executing your pipeline. If unspecified, the Dataflow service will determine an appropriate number of workers.
max_num_workers int The maximum number of Compute Engine instances to be made available to your pipeline during execution. Note that this can be higher than the initial number of workers (specified by num_workers to allow your job to scale up, automatically or otherwise. If unspecified, the Dataflow service will determine an appropriate number of workers.
number_of_worker_harness_threads int The number of threads per worker harness. If unspecified, the Dataflow service determines an appropriate number of threads per worker. In order to use this parameter, you also need to use the flag --experiments=use_runner_v2
region str Specifies a regional endpoint for deploying your Dataflow jobs. If not set, defaults to us-central1.
worker_region String

Specifies a Compute Engine region for launching worker instances to run your pipeline. This option is used to run workers in a different location than the region used to deploy, manage, and monitor jobs. The zone for worker_region is automatically assigned.

Note: This option cannot be combined with worker_zone or zone.

If not set, defaults to the value set for region.
worker_zone String

Specifies a Compute Engine zone for launching worker instances to run your pipeline. This option is used to run workers in a different location than the region used to deploy, manage, and monitor jobs.

Note: This option cannot be combined with worker_region or zone.

If you specify either region or worker_region, worker_zone defaults to a zone from the corresponding region. You can override this behavior by specifying a different zone.
zone str (Deprecated) For Apache Beam SDK 2.17.0 or earlier, this specifies the Compute Engine zone for launching worker instances to run your pipeline. If you specify region, zone defaults to a zone from the corresponding region. You can override this behavior by specifying a different zone.
dataflow_kms_key str Specifies the customer-managed encryption key (CMEK) used to encrypt data at rest. You can control the encryption key through Cloud KMS. You must also specify temp_location to use this feature. If unspecified, Dataflow uses the default Google Cloud encryption instead of a CMEK.
flexrs_goal str Specifies Flexible Resource Scheduling (FlexRS) for autoscaled batch jobs. Affects the num_workers, autoscaling_algorithm, zone, region, and machine_type parameters. For more information, see the FlexRS pipeline options section. If unspecified, defaults to SPEED_OPTIMIZED, which is the same as omitting this flag. To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources.
network str The Compute Engine network for launching Compute Engine instances to run your pipeline. See how to specify your network. If not set, Google Cloud assumes that you intend to use a network named default.
subnetwork str The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline. See how to specify your subnetwork. The Dataflow service determines the default value.
use_public_ips bool Specifies that Dataflow workers must use public IP addresses. If the value is set to false, Dataflow workers use private IP addresses for all communication. In this case, if the subnetwork option is specified, the network option is ignored. Make sure that the specified network or subnetwork has Private Google Access enabled. This option requires the Beam SDK for Python. The deprecated Dataflow SDK for Python does not support it. If not set, Dataflow workers use public IP addresses.
enable_streaming_engine bool Specifies whether Dataflow Streaming Engine is enabled or disabled; true if enabled. Enabling Streaming Engine allows you to run the steps of your streaming pipeline in the Dataflow service backend, thus conserving CPU, memory, and Persistent Disk storage resources. The default value is false. This means that the steps of your streaming pipeline are executed entirely on worker VMs.
disk_size_gb int

The disk size, in gigabytes, to use on each remote Compute Engine worker instance. If set, specify at least 30 GB to account for the worker boot image and local logs.

For batch jobs using Dataflow Shuffle, this option sets the size of a worker VM's boot disk. For batch jobs not using Dataflow Shuffle, this option sets the size of the disks used to store shuffled data; the boot disk size is not affected.

For streaming jobs using Streaming Engine, this option sets size of the boot disks. For streaming jobs not using Streaming Engine, this option sets the size of each additional persistent disk created by the Dataflow service; the boot disk is not affected. If a streaming job does not use Streaming Engine, you can set the boot disk size with the experiment flag streaming_boot_disk_size_gb. For example, specify --experiments=streaming_boot_disk_size_gb=80 to create boot disks of 80 GB.

Set to 0 to use the default size defined in your Cloud Platform project.

If a batch job uses Dataflow Shuffle, then the default is 25 GB; otherwise, the default is 250 GB.

If a streaming job uses Streaming Engine, then the default is 30 GB; otherwise, the default is 400 GB.

Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs not using Dataflow Shuffle or Streaming Engine may result in increased runtime and job cost.

service_account_email str Specifies a user-managed controller service account, using the format my-service-account-name@<project-id>.iam.gserviceaccount.com. For more information, see the Controller service account section of the Cloud Dataflow security and permissions page. If not set, workers use your project's Compute Engine service account as the controller service account.
worker_disk_type str The type of persistent disk to use, specified by a full URL of the disk type resource. For example, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd to specify a SSD persistent disk. For more information, see the Compute Engine API reference page for diskTypes. The Dataflow service determines the default value.
machine_type str

The Compute Engine machine type that Dataflow uses when starting worker VMs. You can use any of the available Compute Engine machine type families as well as custom machine types.

For best results, use n1 machine types. Shared core machine types, such as f1 and g1 series workers, are not supported under the Dataflow Service Level Agreement.

Note that Dataflow bills by the number of vCPUs and GB of memory in workers. Billing is independent of the machine type family.

The Dataflow service will choose the machine type based on your job if you do not set this option.

Java: SDK 1.x

Configuring PipelineOptions for local execution

Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud Project.

When you use local execution, we highly recommend that you run your pipeline with datasets small enough to fit in local memory. You can create a small in-memory data set using a Create transform, or you can use a Read transform to work with small local or remote files. Local execution provides a fast and easy way to perform testing and debugging with fewer external dependencies, but will be limited by the memory available in your local environment.

The following example code shows how to construct a pipeline that executes in your local environment.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Note: For local mode, you do not need to set the runner since the DirectRunner is already the default. However, you do need to either explicitly include the DirectRunner as a dependency or add it to the classpath.

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

Note: For local mode, you do not need to set the runner since the DirectRunner is already the default.

Java: SDK 1.x

After you've constructed your pipeline, run it.

Setting other local pipeline options

When executing your pipeline locally, the default values for the properties in PipelineOptions are generally sufficient.

Java: SDK 2.x

You can find the default values for Java PipelineOptions in the Java API reference; see the PipelineOptions class listing for complete details.

If your pipeline uses Google Cloud such as BigQuery or Cloud Storage for IO, you might need to set certain Google Cloud project and credential options. In such cases, you should use GcpOptions.setProject to set your Google Cloud Project ID. You may also need to set credentials explicitly. See the GcpOptions class for complete details.

Python

You can find the default values for Python PipelineOptions in the Python API reference; see the PipelineOptions module listing for complete details.

If your pipeline uses Google Cloud services such as BigQuery or Cloud Storage for IO, you might need to set certain Google Cloud project and credential options. In such cases, you should use options.view_as(GoogleCloudOptions).project to set your Google Cloud Project ID. You may also need to set credentials explicitly. See the GoogleCloudOptions class for complete details.

Java: SDK 1.x