Dataflow templates
use runtime parameters to accept values that are only available during pipeline
execution. To customize the execution of a templated pipeline, you can pass these
parameters to functions that run within the pipeline (such as a DoFn
).
To create a template from your Apache Beam pipeline, you must modify your pipeline code to support runtime parameters:
- Use
ValueProvider
for all pipeline options that you want to set or use at runtime. - Call I/O methods that accept runtime parameters wherever you want to parameterize your pipeline.
- Use
DoFn
objects that accept runtime parameters.
Then, create and stage your template.
Runtime parameters and the ValueProvider interface
The ValueProvider
interface allows pipelines to accept
runtime parameters. Apache Beam provides three types of
ValueProvider
objects.
Name | Description |
---|---|
RuntimeValueProvider |
You can use Use |
StaticValueProvider |
Use |
NestedValueProvider |
Use Note: The Apache Beam SDK for Python does not support
|
Modifying your code to use runtime parameters
This section walks through how to use ValueProvider
,
StaticValueProvider
, and NestedValueProvider
.
Using ValueProvider in your pipeline options
Use ValueProvider
for all pipeline options that you want to set
or use at runtime.
For example, the following WordCount
code snippet does not support
runtime parameters. The code adds an input file option, creates a pipeline, and
reads lines from the input file:
Java: SDK 2.x
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
To add runtime parameter support, modify the input file option to use
ValueProvider
.
Java: SDK 2.x
Use ValueProvider<String>
instead of String
for the type of the input file option.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Replace add_argument
with add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Using ValueProvider in your functions
To use runtime parameter values in your own functions, update the functions
to use ValueProvider
parameters.
The following example contains an integer ValueProvider
option,
and a simple function that adds an integer. The function depends on the
ValueProvider
integer. During execution, the pipeline
applies MySumFn
to every integer in a PCollection
that contains [1, 2, 3]
. If the runtime value is 10, the resulting
PCollection
contains [11, 12, 13]
.
Java: SDK 2.x
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Java: SDK 1.x
Using StaticValueProvider
To provide a static value to your pipeline, use
StaticValueProvider
.
This example uses MySumFn
, which is a DoFn
that takes a ValueProvider<Integer>
. If you know the
value of the parameter ahead of time, you can use StaticValueProvider
to specify your static value as a ValueProvider
.
Java: SDK 2.x
This code gets the value at pipeline runtime:
.apply(ParDo.of(new MySumFn(options.getInt())))
Instead, you can use StaticValueProvider
with a static value:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
This code gets the value at pipeline runtime:
beam.ParDo(MySumFn(user_options.templated_int))
Instead, you can use StaticValueProvider
with a static value:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Java: SDK 1.x
You can also use StaticValueProvider
when you implement an
I/O module that supports both regular parameters and runtime parameters.
StaticValueProvider
reduces the code duplication from implementing two similar methods.
Java: SDK 2.x
The source code for this example is from Apache Beam's TextIO.java on GitHub.
// Create a StaticValueProviderfrom a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
In this example, there is a single constructor that accepts both a
string
or a ValueProvider
argument. If the
argument is a string
, it is converted to a
StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, basestring): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Java: SDK 1.x
Using NestedValueProvider
Note: The Apache Beam SDK for Python does not
support NestedValueProvider
.
To compute a value from another ValueProvider
object, use
NestedValueProvider
.
NestedValueProvider
takes a ValueProvider
and a
SerializableFunction
translator as input. When you call
.get()
on a NestedValueProvider
, the translator
creates a new value based on the ValueProvider
value. This
translation allows you to use a ValueProvider
value to create
the final value that you want:
- Example 1: The user provides a file name
file.txt
. The transform prepends the file pathgs://directory_name/
to the file name. Calling.get()
returnsgs://directory_name/file.txt
. - Example 2: The user provides a substring for a BigQuery query, such
as a specific date. The transform uses the substring to create the full
query. Calling
.get()
returns the full query.
Note: NestedValueProvider
accepts only one value input. You can't use a NestedValueProvider
to combine two different values.
The following code uses NestedValueProvider
to
implement the first example: the user provides a file name, and the
transform prepends the file path to the file name.
Java: SDK 2.x
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Python
The Apache Beam SDK for Python does not support
NestedValueProvider
.
Java: SDK 1.x
Metadata
You can extend your templates with additional metadata so that custom parameters are validated when the template executes. If you want to create metadata for your template, you need to:
- Create a JSON-formatted file named
<template-name>_metadata
using the parameters from the table below.Note: Do not name the file you create
<template-name>_metadata.json
. While the file contains JSON, it cannot end in the.json
file extension. - Store the JSON file in Cloud Storage in the same folder as the template.
Note: The template should be stored in
<template-name>
and the metadata should be stored in<template-name>_metadata
.
Metadata parameters
Parameter Key | Required | Description of the value | |
---|---|---|---|
name |
Yes | The name of your template. | |
description |
No | A short paragraph of text describing the templates. | |
parameters |
No. Defaults to an empty array. | An array of additional parameters that will be used by the template. | |
name |
Yes | The name of the parameter used in your template. | |
label |
Yes | A human readable label that will be used in the UI to label the parameter. | |
helpText |
Yes | A short paragraph of text describing the parameter. | |
isOptional |
No. Defaults to false. | false if the parameter is required and true if the parameter is
optional. |
|
regexes |
No. Defaults to an empty array. | An array of POSIX-egrep regular expressions in string form that will be used to validate the
value of the parameter. For example: ["^[a-zA-Z][a-zA-Z0-9]+"] is a single
regular expression that validates that the value starts with a letter and then has one or
more characters. |
Example metadata file
The Dataflow service uses the following metadata to validate the WordCount template's custom parameters:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
You can download this metadata file from the Dataflow template directory.
Pipeline I/O and runtime parameters
Java: SDK 2.x
Some I/O connectors contain methods that accept
ValueProvider
objects. To determine support for a specific
connector and method, see the API reference documentation
for the I/O connector. Supported methods have an overload with a
ValueProvider
. If a method does not have an overload, the method does not
support runtime parameters. The following I/O connectors have at
least partial ValueProvider
support:
- File-based IOs:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(requires SDK 2.3.0 or later)PubSubIO
SpannerIO
* Note: If you want to run a batch pipeline
that reads from BigQuery, you must use
.withTemplateCompatibility()
on all BigQuery reads.
Python
Some I/O connectors contain methods that accept
ValueProvider
objects. To determine support for I/O connectors
and their methods, see the API reference documentation
for the connector. The following I/O connectors accept runtime parameters:
- File-based IOs:
textio
,avroio
,tfrecordio
Java: SDK 1.x
Creating and staging templates
After you write your pipeline, you must create and stage your template file. Use the command for your SDK version.
Note: After you create and stage a template, the staging location contains additional files that are necessary to execute your template. If you delete the staging location, template execution will fail.
Java: SDK 2.x
This Maven command creates and stages a template at the Cloud Storage location
specified with --templateLocation
.
Replace
YOUR_PROJECT_ID
with your project ID.Replace
YOUR_BUCKET_NAME
with the name of your Cloud Storage bucket.Replace
YOUR_TEMPLATE_NAME
with the name of your template.Replace
com.example.myclass
with your Java class.Verify that the
templateLocation
path is correct.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=YOUR_PROJECT_ID \ --stagingLocation=gs://YOUR_BUCKET_NAME/staging \ --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
Python
This Python command creates and stages a template at the Cloud Storage
location specified with --template_location
. Make the following changes
to the command:
Replace
YOUR_PROJECT_ID
with your project ID.Replace
YOUR_BUCKET_NAME
with the name of your Cloud Storage bucket.Replace
YOUR_TEMPLATE_NAME
with the name of your template.Replace
examples.mymodule
with your Python module.Verify that the
template_location
path is correct.
python -m examples.mymodule \ --runner DataflowRunner \ --project YOUR_PROJECT_ID \ --staging_location gs://YOUR_BUCKET_NAME/staging \ --temp_location gs://YOUR_BUCKET_NAME/temp \ --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
Java: SDK 1.x
After you create and stage your template, your next step is to execute the template.