Once your Apache Beam program has constructed a pipeline, you'll need to have the pipeline executed. Pipeline execution is separate from your Apache Beam program's execution; your Apache Beam program constructs the pipeline, and the code you've written generates a series of steps to be executed by a pipeline runner. The pipeline runner can be the Dataflow managed service on Google Cloud, a third-party runner service, or a local pipeline runner that executes the steps directly in the local environment.
You can specify the pipeline runner and other execution options by using the Apache Beam
SDK class PipelineOptions
. You use PipelineOptions
to configure how and
where your pipeline executes and what resources it uses.
Much of the time, you'll want your pipeline to run on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with the Dataflow service creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project.
You can also run your pipeline locally. When you run your pipeline locally, the pipeline transforms are executed on the same machine where your Dataflow program executes. Local execution is useful for testing and debugging purposes, especially if your pipeline can use smaller in-memory datasets.
Setting PipelineOptions
You pass PipelineOptions
when you create your Pipeline
object in your
Dataflow program. When the Dataflow service runs your pipeline, it
sends a copy of the PipelineOptions
to each worker instance.
Java: SDK 2.x
Note: You can access PipelineOptions
inside any
ParDo
's DoFn
instance by using the method
ProcessContext.getPipelineOptions
.
Python
This feature is not yet supported in the Apache Beam SDK for Python.
Java: SDK 1.x
Note: You can access PipelineOptions
inside any
ParDo's DoFn
instance by using the method
ProcessContext.getPipelineOptions
.
Setting PipelineOptions from command-line arguments
While you can configure your pipeline by creating a PipelineOptions
object and
setting the fields directly, the Apache Beam SDKs include a command-line parser that
you can use to set fields in PipelineOptions
using command-line arguments.
Java: SDK 2.x
To read options from the command-line, construct your PipelineOptions
object using
the method PipelineOptionsFactory.fromArgs
, as in the following example code:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Note: Appending the method .withValidation
causes
Dataflow to check for required command-line arguments and validate argument values.
Using PipelineOptionsFactory.fromArgs
interprets command-line arguments that follow
the format:
--<option>=<value>
Building your PipelineOptions
this way lets you specify any of the options in any
subinterface of org.apache.beam.sdk.options.PipelineOptions
as a command-line argument.
Python
To read options from the command-line, create your PipelineOptions
object,
as in the following example code:
options = PipelineOptions(flags=argv)
The argument (flags=argv)
to PipelineOptions
interprets
command-line arguments that follow the format:
--<option>=<value>
Building your PipelineOptions
this way lets you specify any of the options by
subclassing from PipelineOptions
.
Java: SDK 1.x
To read options from the command-line, construct your PipelineOptions
object using
the method PipelineOptionsFactory.fromArgs
, as in the following example code:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Note: Appending the method .withValidation
causes
Dataflow to check for required command-line arguments and validate argument values.
Using PipelineOptionsFactory.fromArgs
interprets command-line arguments that follow
the format:
--<option>=<value>
Building your PipelineOptions
this way lets you specify any of the options in any
subinterface of com.google.cloud.dataflow.sdk.options.PipelineOptions
as a command-line argument.
Creating custom options
You can add your own custom options in addition to the standard PipelineOptions
.
Dataflow's command-line parser can also set your custom options using command-line
arguments specified in the same format.
Java: SDK 2.x
To add your own options, define an interface with getter and setter methods for each option, as in the following example:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Python
To add your own options, use the add_argument()
method (which behaves exactly like
Python's standard argparse module),
as in the following example:
class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input') parser.add_argument('--output')
Java: SDK 1.x
To add your own options, define an interface with getter and setter methods for each option, as in the following example:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
You can also specify a description, which appears when a user passes --help
as a
command-line argument, and a default value.
Java: SDK 2.x
You set the description and default value using annotations, as follows:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
We recommend that you register your interface with PipelineOptionsFactory
and
then pass the interface when creating the PipelineOptions
object. When you register
your interface with PipelineOptionsFactory
, the --help
can find your
custom options interface and add it to the output of the --help
command.
PipelineOptionsFactory
will also validate that your custom options are compatible
with all other registered options.
The following example code shows how to register your custom options interface with
PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
Now your pipeline can accept --myCustomOption=value
as a command-line argument.
Python
You set the description and default value as follows:
class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input', help='Input for the pipeline', default='gs://my-bucket/input') parser.add_argument('--output', help='Output for the pipeline', default='gs://my-bucket/output')
Java: SDK 1.x
You set the description and default value using annotations, as follows:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
We recommend that you register your interface with PipelineOptionsFactory
and
then pass the interface when creating the PipelineOptions
object. When you register
your interface with PipelineOptionsFactory
, the --help
can find your
custom options interface and add it to the output of the --help
command.
PipelineOptionsFactory
will also validate that your custom options are compatible
with all other registered options.
The following example code shows how to register your custom options interface with
PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
Now your pipeline can accept --myCustomOption=value
as a command-line argument.
Configuring PipelineOptions for execution on the Cloud Dataflow service
To execute your pipeline using the Dataflow managed service,
you must set the following fields in
PipelineOptions
:
Java: SDK 2.x
project
- The ID of your Google Cloud project.runner
- The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must beDataflowRunner
.gcpTempLocation
- A Cloud Storage path for Dataflow to stage any temporary files. You must create this bucket ahead of time, before running your pipeline. In case you don't specifygcpTempLocation
, you can specify the pipeline optiontempLocation
and thengcpTempLocation
will be set to the value oftempLocation
. If neither are specified, a defaultgcpTempLocation
will be created.stagingLocation
- A Cloud Storage bucket for Dataflow to stage your binary files. If you do not set this option, what you specified for thetempLocation
will be used for the staging location as well.
A default
gcpTempLocation
will be created if neither it nor tempLocation
is specified. If
tempLocation
is specified and gcpTempLocation
is not, tempLocation
must be a Cloud Storage path,
and gcpTempLocation
will be defaulted to it. If tempLocation
is not specified and gcpTempLocation
is, tempLocation
will not be populated.
Note: If you use the Apache Beam SDK for Java 2.15.0 or later, you must also specify
region
.
Python
job_name
- The name of the Dataflow job being executed.project
- The ID of your Google Cloud project.runner
- The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must beDataflowRunner
.staging_location
- A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.temp_location
- A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
Note: If you use the Apache Beam SDK for Python 2.15.0 or later, you must also specify
region
.
Java: SDK 1.x
project
- The ID of your Google Cloud project.runner
- The pipeline runner that will parse your program and construct your pipeline. For cloud execution, this must be eitherDataflowPipelineRunner
orBlockingDataflowPipelineRunner
.stagingLocation
- A Cloud Storage bucket for Dataflow to stage your binary and any temporary files. You must create this bucket ahead of time, before running your pipeline.
You can set these options programmatically, or specify them using the command-line. The following example code shows how to construct a pipeline by programmatically setting the runner and other necessary options to execute the pipeline using the Dataflow managed service.
Java: SDK 2.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Create and set your PipelineOptions. options = PipelineOptions(flags=argv) # For Cloud execution, set the Cloud Platform project, job_name, # staging location, temp_location and specify DataflowRunner. google_cloud_options = options.view_as(GoogleCloudOptions) google_cloud_options.project = 'my-project-id' google_cloud_options.job_name = 'myjob' google_cloud_options.staging_location = 'gs://my-bucket/binaries' google_cloud_options.temp_location = 'gs://my-bucket/temp' options.view_as(StandardOptions).runner = 'DataflowRunner' # Create the Pipeline with the specified options. p = Pipeline(options=options)
Java: SDK 1.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowPipelineRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.
The following example code shows how to set the required options for Dataflow service execution using the command-line:
Java: SDK 2.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Use Python argparse module to parse custom arguments import argparse parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') known_args, pipeline_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. with beam.Pipeline(argv=pipeline_args) as p: lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
Java: SDK 1.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
After you've constructed your pipeline, specify all the pipeline reads, transforms, and writes, and run the pipeline.
Java: SDK 2.x
When passing the required options on the command-line, use the --project
,
--runner
, --gcpTempLocation
, and, optionally, the
--stagingLocation
options.
Python
When passing the required options on the command-line, use the --project
,
--runner
, and --stagingLocation
options.
Java: SDK 1.x
When passing the required options on the command-line, use the --project
,
--runner
, and --stagingLocation
options.
Asynchronous execution
Java: SDK 2.x
Using DataflowRunner
causes your pipeline to execute asynchronously on
Google's cloud. While your pipeline executes you can monitor the job's progress, view details on
execution, and receive updates on the pipeline's results by using the
Dataflow Monitoring Interface or
the Dataflow Command-line
Interface.
Python
Using DataflowRunner
causes your pipeline to execute asynchronously on
Google's cloud. While your pipeline executes you can monitor the job's progress, view details on
execution, and receive updates on the pipeline's results by using the
Dataflow Monitoring Interface or
the Dataflow Command-line
Interface.
Java: SDK 1.x
Using DataflowPipelineRunner
causes your pipeline to execute asynchronously on
Google's cloud. While your pipeline executes you can monitor the job's progress, view details on
execution, and receive updates on the pipeline's results by using the
Dataflow Monitoring Interface or
the Dataflow Command-line
Interface.
Blocking execution
Java: SDK 2.x
Specify DataflowRunner
as the pipeline runner and explicitly call
pipeline.run().waitUntilFinish()
.
When you use DataflowRunner
and call waitUntilFinish()
on the
PipelineResult
object returned from pipeline.run()
, the
pipeline executes on the cloud in the same way as DataflowRunner
, but waits for the
launched job to finish and returns the final DataflowPipelineJob
object. While the
job runs, the Dataflow service prints job status updates and console messages while it
waits.
If you were a Java SDK 1.x user and used --runner
BlockingDataflowPipelineRunner
on the command line to interactively induce your
main program to block until the pipeline has terminated, then with Java 2.x your main program
needs to explicitly call waitUntilFinish()
.
Python
To block until pipeline completion, use the wait_until_finish()
method of the
PipelineResult
object, returned from the run()
method of the runner.
Java: SDK 1.x
Specify BlockingDataflowPipelineRunner
as the pipeline runner.
When you use BlockingDataflowPipelineRunner
, the pipeline
executes on the cloud in the same way as DataflowPipelineRunner
, but waits for the
launched job to finish. While the job runs, the Dataflow service prints job status updates
and console messages while it waits. Using BlockingDataflowPipelineRunner
returns
the final DataflowPipelineJob
object.
Note: Typing Ctrl+C
from the command line does not
cancel your job. The Dataflow service is still executing the job on Google Cloud; to
cancel the job, you'll need to use the Dataflow
Monitoring Interface or the Dataflow
Command-line Interface.
Streaming execution
Java: SDK 2.x
If your pipeline reads from an unbounded data source, such as Pub/Sub, the pipeline automatically executes in streaming mode.
If your pipeline uses unbounded data sources and sinks, it is necessary to pick a
Windowing strategy for your unbounded
PCollections before you use any aggregation such as a GroupByKey
.
Python
If your pipeline uses an unbounded data source or sink (such as Pub/Sub), you must set the
streaming
option to true.
Streaming jobs use a Compute Engine machine type
of n1-standard-2
or higher by default. You must not override this,
as n1-standard-2
is the minimum required machine type for running
streaming jobs.
If your pipeline uses unbounded data sources and sinks, you must pick a
Windowing strategy for your unbounded
PCollections before you use any aggregation such as a GroupByKey
.
Java: SDK 1.x
If your pipeline uses an unbounded data source or sink (such as Pub/Sub),
you must set the streaming
option in PipelineOptions
to true
. You can set the streaming
option programmatically as shown in
the following example:
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); dataflowOptions.setStreaming(true);
If your pipeline uses unbounded data sources and sinks, you must pick a
Windowing strategy for your unbounded
PCollections before you use any aggregation such as a
GroupByKey
.
Setting other Cloud Dataflow pipeline options
To run your pipeline on the cloud, you can programmatically set the following fields in your
PipelineOptions
object:
Java: SDK 2.x
Field | Type | Description | Default Value |
---|---|---|---|
runner |
Class (NameOfRunner) |
The PipelineRunner to use. This field allows you to determine the
PipelineRunner at runtime. |
DirectRunner (local mode) |
streaming |
boolean |
Specifies whether streaming mode is enabled or disabled; true if enabled. | If your pipeline reads from an unbounded source, default value is true .
Otherwise, false . |
project |
String |
The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Dataflow managed service. | If not set, defaults to the currently configured project in the Cloud SDK. |
gcpTempLocation |
String |
Cloud Storage path for temporary files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
|
stagingLocation |
String |
Cloud Storage path for staging local files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
If not set, defaults to what you specified for tempLocation . |
autoscalingAlgorithm |
String
| The autoscaling mode for your Dataflow job. Possible values are
THROUGHPUT_BASED to enable autoscaling, or NONE to disable. See
Autotuning features to learn
more about how autoscaling works in the Dataflow managed service. |
Defaults to THROUGHPUT_BASED for all batch Dataflow jobs that use
Dataflow SDK for Java version 1.6.0 or later; defaults to NONE for
streaming jobs or batch jobs using earlier versions of the Dataflow SDK for Java. |
numWorkers |
int |
The initial number of Google Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. | If unspecified, the Dataflow service determines an appropriate number of workers. |
maxNumWorkers |
int |
The maximum number of Compute Engine instances to be made available to your pipeline
during execution. Note that this can be higher than the initial number of workers (specified
by numWorkers to allow your job to scale up, automatically or otherwise. |
If unspecified, the Dataflow service will determine an appropriate number of workers. |
region |
String |
Specifying a regional endpoint allows you to define a region for deploying your Dataflow jobs. | If not set, defaults to us-central1 . |
zone |
String |
The Compute Engine zone for launching worker instances to run your pipeline. | If you specified the region parameter, the zone parameter will
default to a zone from the corresponding region. You can override this behavior
by
specifying a different zone. |
dataflowKmsKey |
String |
Specifies the customer-managed encryption key (CMEK)
used to encrypt data at rest. You can control the encryption key through KMS.
You must also specify gcpTempLocation to use this feature. |
If unspecified, Dataflow uses the default Google Cloud encryption instead of a CMEK. |
flexRSGoal |
String |
Specifies Flexible Resource Scheduling (FlexRS) for
autoscaled batch jobs. Affects the numWorkers , autoscalingAlgorithm ,
zone , region , and workerMachineType parameters. For
more information, see the FlexRS pipeline options section. |
If unspecified, defaults to SPEED_OPTIMIZED, which is the same as omitting this flag. To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources. |
filesToStage |
List<String> |
A list of local files, directories of files, or archives (.jar, .zip) to make available
to each worker. If you set this option, then only those files
you specify will be uploaded (the Java classpath will be ignored). You must specify all
of your resources in the correct classpath order. Resources are not limited to code,
but can also include configuration files and other resources to make available to all
workers. Your code can access the listed resources using Java's standard
resource lookup methods. Cautions: Specifying a directory path is sub-optimal since
Dataflow will zip the files before uploading, which involves a higher startup time
cost. Also, don't use this option to transfer data to workers that is meant to be processed
by the pipeline since doing so is significantly slower than using native
Cloud Storage/BigQuery APIs combined with the appropriate
Dataflow data source.
|
If filesToStage is blank, Dataflow will infer the files to stage based
on the Java classpath. The considerations and cautions mentioned in the left column also
apply here (types of files to list and how to access them from your code).
|
network |
String |
The Compute Engine network for launching Compute Engine instances to run your pipeline. See how to specify your network. | If not set, Google Cloud assumes that you intend to use a network named default . |
subnetwork |
String |
The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline. See how to specify your subnetwork. | The Dataflow service determines the default value. |
usePublicIps |
boolean |
Specifies whether Dataflow workers use
public IP addresses.
If the value is set to false , Dataflow workers use
private IP addresses for all communication. In this case, if the subnetwork
option is specified, the network option is ignored. Make sure that the
specified network or subnetwork has
Private Google Access enabled. |
If not set, the default value is true and Dataflow workers use
public IP addresses. |
enableStreamingEngine |
boolean |
Specifies whether Dataflow Streaming Engine is enabled or disabled; true if enabled. Enabling Streaming Engine allows you to run the steps of your streaming pipeline in the Dataflow service backend, thus conserving CPU, memory, and Persistent Disk storage resources. | The default value is false. This means that the steps of your streaming pipeline are executed entirely on worker VMs. |
diskSizeGb |
int |
The disk size, in gigabytes, to use on each remote Compute Engine worker instance. If set, specify at least 30 GB to account for the worker boot image and local logs. For batch jobs, this option sets the size of a worker VM's boot disk. If a streaming job uses Streaming Engine, this option sets size of the boot disks. For streaming jobs not using Streaming Engine, this option sets the size of each additional persistent disk created by the Dataflow service; the boot disk is not affected. |
Set to If a batch job uses Dataflow Shuffle, then the default is 25 GB; otherwise, the default is 250 GB. If a streaming job uses Streaming Engine, then the default is 30 GB; otherwise, the default is 400 GB. Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs not using Dataflow Shuffle or Streaming Engine may result in increased runtime and job cost. |
serviceAccount |
String |
Specifies a user-managed controller service account, using the format
my-service-account-name@<project-id>.iam.gserviceaccount.com . For more
information, see the Controller service account
section of the Cloud Dataflow security and permissions page.
|
If not set, workers use your project's Compute Engine service account as the controller service account. |
workerDiskType |
String |
The type of persistent disk
to use, specified by a full URL of the disk type resource. For example, use
compute.googleapis.com/projects//zones//diskTypes/pd-ssd to specify a SSD
persistent disk. For more information, see the Compute Engine API reference page for
diskTypes. |
The Dataflow service determines the default value. |
workerMachineType |
String |
The Compute Engine machine type that
Dataflow will use when spinning up worker VMs. Dataflow supports n1 series and
custom machine types. Shared core machine types such as f1 and g1
series workers are not supported under Dataflow's Service Level
Agreement. |
The Dataflow service will choose the machine type based on your job if you do not set this option. |
See the API for Java reference documentation for the PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.
Python
Field | Type | Description | Default Value |
---|---|---|---|
runner |
str |
The PipelineRunner to use. This field can be either
DirectRunner or DataflowRunner . |
DirectRunner (local mode) |
streaming |
bool |
Specifies whether streaming mode is enabled or disabled; true if enabled. | false |
project |
str |
The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Dataflow managed service. | If not set, throws an error. |
temp_location |
str |
Cloud Storage path for temporary files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
If not set, defaults to the value for staging_location . You must specify at
least one of temp_location or staging_location to run your pipeline
on the Google cloud. |
staging_location |
str |
Cloud Storage path for staging local files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
If not set, defaults to a staging directory within temp_location . You must
specify at least one of temp_location or staging_location to run
your pipeline on the Google cloud. |
autoscaling_algorithm |
str
| The autoscaling mode for your Dataflow job. Possible values are
THROUGHPUT_BASED to enable autoscaling, or NONE to disable. See
Autotuning features to learn
more about how autoscaling works in the Dataflow managed service. |
Defaults to THROUGHPUT_BASED for all batch Dataflow jobs and
NONE for all streaming Dataflow jobs. |
num_workers |
int |
The number of Compute Engine instances to use when executing your pipeline. | If unspecified, the Dataflow service will determine an appropriate number of workers. |
max_num_workers |
int |
The maximum number of Compute Engine instances to be made available to your pipeline
during execution. Note that this can be higher than the initial number of workers (specified
by num_workers to allow your job to scale up, automatically or otherwise. |
If unspecified, the Dataflow service will determine an appropriate number of workers. |
region |
str |
Specifying a regional endpoint allows you to define a region for deploying your Dataflow jobs. | If not set, defaults to us-central1 . |
zone |
str |
The Compute Engine zone for launching worker instances to run your pipeline. | If you specified the region parameter, the zone parameter will
default to a zone from the corresponding region. You can override this behavior
by
specifying a different zone. |
dataflow_kms_key |
str |
Specifies the customer-managed encryption key (CMEK)
used to encrypt data at rest. You can control the encryption key through KMS.
You must also specify temp_location to use this feature. |
If unspecified, Dataflow uses the default Google Cloud encryption instead of a CMEK. |
flexrs_goal |
str |
Specifies Flexible Resource Scheduling (FlexRS) for
autoscaled batch jobs. Affects the num_workers , autoscaling_algorithm ,
zone , region , and machine_type parameters. For more information,
see the FlexRS pipeline options section. |
If unspecified, defaults to SPEED_OPTIMIZED, which is the same as omitting this flag. To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources. |
network |
str |
The Compute Engine network for launching Compute Engine instances to run your pipeline. See how to specify your network. | If not set, Google Cloud assumes that you intend to use a network named default . |
subnetwork |
str |
The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline. See how to specify your subnetwork. | The Dataflow service determines the default value. |
no_use_public_ips |
bool |
Specifies that Dataflow workers must not use
public IP addresses.
As a result, Dataflow workers will use private IP addresses for all
communication. If the subnetwork option is specified, the
network option is ignored. Make sure that the specified network or
subnetwork has
Private Google Access enabled. The public IPs parameter requires the
Beam SDK for Python. The Dataflow SDK for Python does not support this parameter. |
If not set, Dataflow workers use public IP addresses. |
enable_streaming_engine |
boolean |
Specifies whether Dataflow Streaming Engine is enabled or disabled; true if enabled. Enabling Streaming Engine allows you to run the steps of your streaming pipeline in the Dataflow service backend, thus conserving CPU, memory, and Persistent Disk storage resources. | The default value is false. This means that the steps of your streaming pipeline are executed entirely on worker VMs. |
disk_size_gb |
int |
The disk size, in gigabytes, to use on each remote Compute Engine worker instance. If set, specify at least 30 GB to account for the worker boot image and local logs. For batch jobs, this option sets the size of a worker VM's boot disk. If a streaming job uses Streaming Engine, this option sets size of the boot disks. For streaming jobs not using Streaming Engine, this option sets the size of each additional persistent disk created by the Dataflow service; the boot disk is not affected. |
Set to If a batch job uses Dataflow Shuffle, then the default is 25 GB; otherwise, the default is 250 GB. If a streaming job uses Streaming Engine, then the default is 30 GB; otherwise, the default is 400 GB. Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs not using Dataflow Shuffle or Streaming Engine may result in increased runtime and job cost. |
service_account_email |
str |
Specifies a user-managed controller service account, using the format
my-service-account-name@<project-id>.iam.gserviceaccount.com . For more
information, see the Controller service account
section of the Cloud Dataflow security and permissions page.
|
If not set, workers use your project's Compute Engine service account as the controller service account. |
worker_disk_type |
str |
The type of persistent disk
to use, specified by a full URL of the disk type resource. For example, use
compute.googleapis.com/projects//zones//diskTypes/pd-ssd to specify a SSD
persistent disk. For more information, see the Compute Engine API reference page for
diskTypes. |
The Dataflow service determines the default value. |
machine_type |
str |
The Compute Engine machine type that
Dataflow will use when spinning up worker VMs. Note that worker_machine_type is an alias flag. |
The Dataflow service will choose the machine type based on your job if you do not set this option. |
Java: SDK 1.x
Field | Type | Description | Default Value |
---|---|---|---|
runner |
Class (NameOfRunner) |
The PipelineRunner to use. This field allows you to determine the
PipelineRunner at runtime. |
DirectPipelineRunner (local mode) |
streaming |
boolean |
Specifies whether streaming mode (currently in beta) is enabled or disabled; true if enabled. | false |
project |
String |
The project ID for your Google Cloud Project. This is required if you want to run your pipeline using the Dataflow managed service. | If not set, defaults to the currently configured project in the Cloud SDK. |
tempLocation |
String |
Cloud Storage path for temporary files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
If not set, defaults to the value for stagingLocation . You must specify at
least one of tempLocation or stagingLocation to run your pipeline
on the Google cloud. |
stagingLocation |
String |
Cloud Storage path for staging local files. Must be a valid Cloud Storage URL,
beginning with gs:// . |
If not set, defaults to a staging directory within tempLocation . You must
specify at least one of tempLocation or stagingLocation to run
your pipeline on the Google cloud. |
autoscalingAlgorithm |
String
| The Autoscaling mode to use for your Dataflow job. Possible values are
THROUGHPUT_BASED to enable Autoscaling, or NONE to disable. See
Autotuning Features to learn
more about how Autoscaling works in the Dataflow managed service. |
Defaults to THROUGHPUT_BASED for all batch Dataflow jobs that use
Dataflow SDK for Java version 1.6.0 or later; defaults to NONE for
streaming jobs or batch jobs using earlier versions of the Dataflow SDK for Java. |
numWorkers |
int |
The initial number of Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. | If unspecified, the Dataflow service will determine an appropriate number of workers. |
maxNumWorkers |
int |
The maximum number of Compute Engine instances to be made available to your pipeline
during execution. Note that this can be higher than the initial number of workers (specified
by numWorkers to allow your job to scale up, automatically or otherwise. |
If unspecified, the Dataflow service will determine an appropriate number of workers. |
zone |
String |
The Compute Engine zone for launching worker instances to run your pipeline. | If not set, defaults to us-central1-f . You can specify a different
zone to ensure, for example, that your workers are launched in the EU. |
filesToStage |
List<String> |
A list of local files, directories of files, or archives (.jar, .zip) to make available
to each worker. If you set this option, then only those files
you specify will be uploaded (the Java classpath will be ignored). You must specify all
of your resources in the correct classpath order. Resources are not limited to code,
but can also include configuration files and other resources to make available to all
workers. Your code can access the listed resources using Java's standard
resource lookup methods. Cautions: Specifying a directory path is sub-optimal since
Dataflow will zip the files before uploading, which involves a higher startup time
cost. Also, don't use this option to transfer data to workers that is meant to be processed
by the pipeline since doing so is significantly slower than using native
Cloud Storage/BigQuery APIs combined with the appropriate Dataflow data
source.
|
If filesToStage is blank, Dataflow will infer the files to stage based
on the Java classpath. The considerations and cautions mentioned in the left column also
apply here (types of files to list and how to access them from your code).
|
network |
String |
The Compute Engine network for launching Compute Engine instances to run your pipeline. See how to specify your network. | If not set, Google Cloud assumes that you intend to use a network named default . |
subnetwork |
String |
The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline. See how to specify your subnetwork. | The Dataflow service determines the default value. |
diskSizeGb |
int |
The disk size, in gigabytes, to use on each remote Compute Engine worker instance. If set, specify at least 30 GB to account for the worker boot image and local logs. For batch jobs, this option sets the size of a worker VM's boot disk. If a streaming job uses Streaming Engine, this option sets size of the boot disks. For streaming jobs not using Streaming Engine, this option sets the size of each additional persistent disk created by the Dataflow service; the boot disk is not affected. |
Set to If a batch job uses Dataflow Shuffle, then the default is 25 GB; otherwise, the default is 250 GB. If a streaming job uses Streaming Engine, then the default is 30 GB; otherwise, the default is 400 GB. Warning: Lowering the disk size reduces available shuffle I/O. Shuffle-bound jobs not using Dataflow Shuffle or Streaming Engine may result in increased runtime and job cost. |
workerDiskType |
String |
The type of persistent disk
to use, specified by a full URL of the disk type resource. For example, use
compute.googleapis.com/projects//zones//diskTypes/pd-ssd to specify a SSD
persistent disk. For more information, see the Compute Engine API reference page for
diskTypes. |
The Dataflow service determines the default value. |
workerMachineType |
String |
The Compute Engine machine type that
Dataflow will use when spinning up worker VMs. Dataflow supports n1 series and
custom machine types. Shared core machine types such as f1 and g1
series workers are not supported under Dataflow's Service Level
Agreement. |
The Dataflow service will choose the machine type based on your job if you do not set this option. |
See the API for Java reference documentation for the PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.
Configuring PipelineOptions for local execution
Instead of running your pipeline on managed cloud resources, you can choose to execute your pipeline locally. Local execution has certain advantages for testing, debugging, or running your pipeline over small data sets. For example, local execution removes the dependency on the remote Dataflow service and associated Google Cloud Project.
When you use local execution, we highly recommend that you run your pipeline
with datasets small enough to fit in local memory. You can create a small in-memory
data set using a Create
transform, or you can use a Read
transform to
work with small local or remote files. Local execution provides a fast and easy way to perform
testing and debugging with fewer external dependencies, but will be limited by the memory
available in your local environment.
The following example code shows how to construct a pipeline that executes in your local environment.
Java: SDK 2.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Note: For local mode, you do not need to set the runner since the
DirectRunner
is already the default. However, you do need to either explicitly
include the DirectRunner
as a dependency or add it to the classpath.
Python
# Create and set your Pipeline Options. options = PipelineOptions() p = Pipeline(options=options)
Note: For local mode, you do not need to set the runner since the
DirectRunner
is already the default.
Java: SDK 1.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Note: For local mode, you do not need to set the runner since the
DirectPipelineRunner
is already the default.
After you've constructed your pipeline, run it.
Setting other local pipeline options
When executing your pipeline locally, the default values for the properties in
PipelineOptions
are generally sufficient.
Java: SDK 2.x
You can find the default values for Java
PipelineOptions
in the Java API reference; see the
PipelineOptions class listing for complete details.
If your pipeline uses Google Cloud such as BigQuery or Cloud Storage for IO, you might
need to set certain Google Cloud project and credential options. In such cases, you should use
GcpOptions.setProject
to set your Google Cloud Project ID. You may also need
to set credentials explicitly. See the GcpOptions
class for complete details.
Python
You can find the default values for Python PipelineOptions
in the Python API reference; see the
PipelineOptions
module listing for complete details.
If your pipeline uses Google Cloud services such as BigQuery or
Cloud Storage for IO, you might need to set certain Google Cloud project and
credential options. In such cases, you should use options.view_as(GoogleCloudOptions).project
to set your Google Cloud Project ID. You may also need to set credentials explicitly. See the
GoogleCloudOptions
class for complete details.
Java: SDK 1.x
You can find the default values for Java
PipelineOptions
in the Java API reference; see the
PipelineOptions class
listing for complete details.
If your pipeline uses Google Cloud services such as BigQuery or Cloud Storage
for IO, you might need to set certain Google Cloud project and credential options. In such cases, you should use
GcpOptions.setProject
to set your Google Cloud Project ID. You may also need
to set credentials explicitly. See the GcpOptions
class for complete details.