Creating Templates

Cloud Dataflow templates use runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, you can pass these parameters to functions that run within the pipeline (such as a DoFn).

To create a template from your Apache Beam pipeline, you must modify your pipeline code to support runtime parameters:

Then, create and stage your template.

Runtime parameters and the ValueProvider interface

The ValueProvider interface allows pipelines to accept runtime parameters. Apache Beam provides three types of ValueProvider objects.

Name Description
RuntimeValueProvider

RuntimeValueProvider is the default ValueProvider type. RuntimeValueProvider allows your pipeline to accept a value that is only available during pipeline execution. The value is not available during pipeline construction, so you can't use the value to change your pipeline's workflow graph.

You can use isAccessible() to check if the value of a ValueProvider is available. If you call get() before pipeline execution, Apache Beam returns an error:
Value only available at runtime, but accessed from a non-runtime context.

Use RuntimeValueProvider when you do not know the value ahead of time.

StaticValueProvider

StaticValueProvider allows you to provide a static value to your pipeline. The value is available during pipeline construction, so you can use the value to change your pipeline's workflow graph.

Use StaticValueProvider when you know the value ahead of time. See the StaticValueProvider section for examples.

NestedValueProvider

NestedValueProvider allows you to compute a value from another ValueProvider object. NestedValueProvider wraps a ValueProvider, and the type of the wrapped ValueProvider determines whether the value is accessible during pipeline construction.

Use NestedValueProvider when you want to use the value to compute another value at runtime. See the NestedValueProvider section for examples.

Note: The Apache Beam SDK for Python does not support NestedValueProvider.

Modifying your code to use runtime parameters

This section walks through how to use ValueProvider, StaticValueProvider, and NestedValueProvider.

Using ValueProvider in your pipeline options

Use ValueProvider for all pipeline options that you want to set or use at runtime.

For example, the following WordCount code snippet does not support runtime parameters. The code adds an input file option, creates a pipeline, and reads lines from the input file:

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

To add runtime parameter support, modify the input file option to use ValueProvider.

Java: SDK 2.x

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Replace add_argument with add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

Using ValueProvider in your functions

To use runtime parameter values in your own functions, update the functions to use ValueProvider parameters.

The following example contains an integer ValueProvider option, and a simple function that adds an integer. The function depends on the ValueProvider integer. During execution, the pipeline applies MySumFn to every integer in a PCollection that contains [1, 2, 3]. If the runtime value is 10, the resulting PCollection contains [11, 12, 13].

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply(MapElements.via(
        new SimpleFunction<Integer, String>() {
          public String apply(Integer i) {
            return i.toString();
          }
        }))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Using StaticValueProvider

To provide a static value to your pipeline, use StaticValueProvider.

This example uses MySumFn, which is a DoFn that takes a ValueProvider<Integer>. If you know the value of the parameter ahead of time, you can use StaticValueProvider to specify your static value as a ValueProvider.

Java: SDK 2.x

This code gets the value at pipeline runtime:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Instead, you can use StaticValueProvider with a static value:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

This code gets the value at pipeline runtime:

  beam.ParDo(MySumFn(user_options.templated_int))

Instead, you can use StaticValueProvider with a static value:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

This code gets the value at pipeline runtime:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Instead, you can use StaticValueProvider with a static value:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

You can also use StaticValueProvider when you implement an I/O module that supports both regular parameters and runtime parameters. StaticValueProvider reduces the code duplication from implementing two similar methods.

Java: SDK 2.x

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In this example, there is a single constructor that accepts both a string or a ValueProvider argument. If the argument is a string, it is converted to a StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Using NestedValueProvider

Note: The Apache Beam SDK for Python does not support NestedValueProvider.

To compute a value from another ValueProvider object, use NestedValueProvider.

NestedValueProvider takes a ValueProvider and a SerializableFunction translator as input. When you call .get() on a NestedValueProvider, the translator creates a new value based on the ValueProvider value. This translation allows you to use a ValueProvider value to create the final value that you want:

  • Example 1: The user provides a file name file.txt. The transform prepends the file path gs://directory_name/ to the file name. Calling .get() returns gs://directory_name/file.txt.
  • Example 2: The user provides a substring for a BigQuery query, such as a specific date. The transform uses the substring to create the full query. Calling .get() returns the full query.

Note: NestedValueProvider accepts only one value input. You can't use a NestedValueProvider to combine two different values.

The following code uses NestedValueProvider to implement the first example: the user provides a file name, and the transform prepends the file path to the file name.

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

The Apache Beam SDK for Python does not support NestedValueProvider.

Java: SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Metadata

You can extend your templates with additional metadata so that custom parameters are validated when the template executes. If you want to create metadata for your template, you need to:

  1. Create a JSON-formatted file named <template-name>_metadata using the parameters from the table below.

    Note: Do not name the file you create <template-name>_metadata.json. While the file contains JSON, it cannot end in the .json file extension.

  2. Store the JSON file in Cloud Storage in the same folder as the template.

    Note: The template should be stored in <template-name> and the metadata should be stored in <template-name>_metadata.

Metadata parameters

Parameter Key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the templates.
parameters No. Defaults to an empty array. An array of additional parameters that will be used by the template.
name Yes The name of the parameter used in your template.
label Yes A human readable label that will be used in the UI to label the parameter.
help_text Yes A short paragraph of text describing the parameter.
is_optional No. Defaults to false. true if the parameter is required and false if the parameter is optional.
regexes No. Defaults to an empty array. An array of POSIX-egrep regular expressions in string form that will be used to validate the value of the parameter. For example: ["^[a-zA-Z][a-zA-Z0-9]+"] is a single regular expression that validates that the value starts with a letter and then has one or more characters.

Example metadata file

{
  "name": "WordCount",
  "description": "An example pipeline that counts words in the input file.",
  "parameters": [{
    "name": "inputFile",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": true
  },
  {
    "name": "output",
    "label": "Output Cloud Storage File Prefix",
    "help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  }]
}

Pipeline I/O and runtime parameters

Java: SDK 2.x

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for a specific connector and method, see the API reference documentation for the I/O connector. Supported methods have an overload with a ValueProvider. If a method does not have an overload, the method does not support runtime parameters. The following I/O connectors have at least partial ValueProvider support:

  • File-based IOs: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requires SDK 2.3.0 or later)
  • PubSubIO
  • SpannerIO

* Note: If you want to run a batch pipeline that reads from BigQuery, you must use .withTemplateCompatibility() on all BigQuery reads.

Python

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for I/O connectors and their methods, see the API reference documentation for the connector. The following I/O connectors accept runtime parameters:

  • File-based IOs: textio, avroio, tfrecordio

Java: SDK 1.x

The following table contains the complete list of methods that accept runtime parameters.

I/O Method
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* You can only execute BigQuery batch pipeline templates one time, as the BigQuery job ID is set at template creation time.

Creating and staging templates

After you write your pipeline, you must create and stage your template file. Use the command for your SDK version.

Note: After you create and stage a template, the staging location contains additional files that are necessary to execute your template. If you delete the staging location, template execution will fail.

Java: SDK 2.x

This Maven command creates and stages a template at the Cloud Storage location specified with --templateLocation.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

Python

This Python command creates and stages a template at the Cloud Storage location specified with --template_location.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project [YOUR_PROJECT_ID] \
    --staging_location gs://[YOUR_BUCKET_NAME]/staging \
    --temp_location gs://[YOUR_BUCKET_NAME]/temp \
    --template_location gs://[YOUR_BUCKET_NAME]/templates/mytemplate

Java: SDK 1.x

This Maven command creates and stages a template at the Cloud Storage location specified with --dataflowJobFile.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --dataflowJobFile=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

After you create and stage your template, your next step is to execute the template.

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataflow Documentation