Creating classic templates

Dataflow templates use runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, you can pass these parameters to functions that run within the pipeline (such as a DoFn).

To create a template from your Apache Beam pipeline, you must modify your pipeline code to support runtime parameters:

Then, create and stage your template.

Runtime parameters and the ValueProvider interface

The ValueProvider interface allows pipelines to accept runtime parameters. Apache Beam provides three types of ValueProvider objects.

Name Description
RuntimeValueProvider

RuntimeValueProvider is the default ValueProvider type. RuntimeValueProvider allows your pipeline to accept a value that is only available during pipeline execution. The value is not available during pipeline construction, so you can't use the value to change your pipeline's workflow graph.

You can use isAccessible() to check if the value of a ValueProvider is available. If you call get() before pipeline execution, Apache Beam returns an error:
Value only available at runtime, but accessed from a non-runtime context.

Use RuntimeValueProvider when you do not know the value ahead of time.

StaticValueProvider

StaticValueProvider allows you to provide a static value to your pipeline. The value is available during pipeline construction, so you can use the value to change your pipeline's workflow graph.

Use StaticValueProvider when you know the value ahead of time. See the StaticValueProvider section for examples.

NestedValueProvider

NestedValueProvider allows you to compute a value from another ValueProvider object. NestedValueProvider wraps a ValueProvider, and the type of the wrapped ValueProvider determines whether the value is accessible during pipeline construction.

Use NestedValueProvider when you want to use the value to compute another value at runtime. See the NestedValueProvider section for examples.

The Dataflow runner does not support ValueProvider options for Pub/Sub topics and subscription parameters. If you require Pub/Sub options in your runtime parameters, switch to using Flex Templates.

Modifying your code to use runtime parameters

This section walks through how to use ValueProvider, StaticValueProvider, and NestedValueProvider.

Using ValueProvider in your pipeline options

Use ValueProvider for all pipeline options that you want to set or use at runtime.

For example, the following WordCount code snippet does not support runtime parameters. The code adds an input file option, creates a pipeline, and reads lines from the input file:

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

To add runtime parameter support, modify the input file option to use ValueProvider.

Java: SDK 2.x

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Replace add_argument with add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Using ValueProvider in your functions

To use runtime parameter values in your own functions, update the functions to use ValueProvider parameters.

The following example contains an integer ValueProvider option, and a simple function that adds an integer. The function depends on the ValueProvider integer. During execution, the pipeline applies MySumFn to every integer in a PCollection that contains [1, 2, 3]. If the runtime value is 10, the resulting PCollection contains [11, 12, 13].

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

Using StaticValueProvider

To provide a static value to your pipeline, use StaticValueProvider.

This example uses MySumFn, which is a DoFn that takes a ValueProvider<Integer>. If you know the value of the parameter ahead of time, you can use StaticValueProvider to specify your static value as a ValueProvider.

Java: SDK 2.x

This code gets the value at pipeline runtime:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Instead, you can use StaticValueProvider with a static value:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

This code gets the value at pipeline runtime:

  beam.ParDo(MySumFn(user_options.templated_int))

Instead, you can use StaticValueProvider with a static value:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

You can also use StaticValueProvider when you implement an I/O module that supports both regular parameters and runtime parameters. StaticValueProvider reduces the code duplication from implementing two similar methods.

Java: SDK 2.x

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In this example, there is a single constructor that accepts both a string or a ValueProvider argument. If the argument is a string, it is converted to a StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

Using NestedValueProvider

To compute a value from another ValueProvider object, use NestedValueProvider.

NestedValueProvider takes a ValueProvider and a SerializableFunction translator as input. When you call .get() on a NestedValueProvider, the translator creates a new value based on the ValueProvider value. This translation allows you to use a ValueProvider value to create the final value that you want.

In the following example, the user provides the filename file.txt. The transform prepends the path gs://directory_name/ to the filename. Calling .get() returns gs://directory_name/file.txt.

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java: SDK 1.x

Metadata

You can extend your template with additional metadata so that custom parameters are validated when the template is run. If you want to create metadata for your template, follow these steps:

  1. Create a JSON-formatted file named TEMPLATE_NAME_metadata using the parameters in Metadata parameters and the format in Example metadata file. Replace TEMPLATE_NAME with the name of your template.

    Ensure the metadata file does not have a filename extension. For example, if your template name is myTemplate, then its metadata file must be myTemplate-metadata.

  2. Store the metadata file in Cloud Storage in the same folder as the template.

Metadata parameters

Parameter key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the template.
parameters No An array of additional parameters that the template uses. An empty array is used by default.
name Yes The name of the parameter that is used in your template.
label Yes A human readable string that is used in the Cloud Console to label the parameter.
helpText Yes A short paragraph of text that describes the parameter.
isOptional No false if the parameter is required and true if the parameter is optional. Unless set with a value, isOptional defaults to false. If you do not include this parameter key for your metadata, the metadata becomes a required parameter.
regexes No An array of POSIX-egrep regular expressions in string form that is used to validate the value of the parameter. For example, ["^[a-zA-Z][a-zA-Z0-9]+"] is a single regular expression that validates that the value starts with a letter and then has one or more characters. An empty array is used by default.

Example metadata file

Java

The Dataflow service uses the following metadata to validate the WordCount template's custom parameters:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

The Dataflow service uses the following metadata to validate the WordCount template's custom parameters:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

You can download metadata files for the Google-provided templates from the Dataflow template directory.

Pipeline I/O and runtime parameters

Java: SDK 2.x

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for a specific connector and method, see the API reference documentation for the I/O connector. Supported methods have an overload with a ValueProvider. If a method does not have an overload, the method does not support runtime parameters. The following I/O connectors have at least partial ValueProvider support:

  • File-based IOs: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requires SDK 2.3.0 or later)
  • PubSubIO
  • SpannerIO

Python

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for I/O connectors and their methods, see the API reference documentation for the connector. The following I/O connectors accept runtime parameters:

  • File-based IOs: textio, avroio, tfrecordio

Java: SDK 1.x

Creating and staging templates

After you write your pipeline, you must create and stage your template file.

See the following examples on how to stage a template file:

Java: SDK 2.x

This Maven command creates and stages a template at the Cloud Storage location specified with --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME
                  --region=REGION"
    

Verify that the templateLocation path is correct. Replace the following:

  • com.example.myclass: your Java class
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • TEMPLATE_NAME: the name of your template
  • REGION: the regional endpoint to deploy your Dataflow job

Python

This Python command creates and stages a template at the Cloud Storage location specified with --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verify that the template_location path is correct. Replace the following:

  • examples.mymodule: your Python module
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • TEMPLATE_NAME: the name of your template
  • REGION: the regional endpoint to deploy your Dataflow job

Java: SDK 1.x

After you create and stage your template, your next step is to execute the template.