Creating Templates

You can create your own Google Cloud Dataflow templates that use custom runtime parameters. Runtime parameters allow your pipeline to accept a value that is available only during pipeline execution. Runtime parameters are not available during pipeline construction. These parameters can be passed to functions that run within a pipeline (such as DoFns) and allow you to customize the execution of a templated pipeline.

If you want to create a template based on your existing pipeline, you must:

  • Modify your pipeline options to use runtime parameters.
  • Use I/O methods that support runtime parameters wherever you want to parameterize your pipeline.
  • Use DoFns that support runtime parameters.
  • Create and stage your template in Google Cloud Storage.

ValueProvider interface

ValueProvider is an interface that allows pipelines to accept runtime parameters. You can use ValueProviders to:

  • Control behavior inside your DoFns.
  • Use built-in I/O transforms that support ValueProviders.
  • Use custom I/O transforms that support ValueProviders.

There are three types of ValueProvider objects.

Name Description
RuntimeValueProvider

RuntimeValueProvider is the default ValueProvider type, and allows your pipeline to accept a value that is only available during pipeline execution. Because the value is not available during pipeline construction, you can't use it to change your pipeline's workflow graph. Attempting to call get() prior to pipeline execution returns an error. You can use isAccessible() to determine if a ValueProvider's value is currently available.

RuntimeValueProvider is useful when you do not know the value ahead of time.

StaticValueProvider

StaticValueProvider allows you to provide a static value to your pipeline. The value is available during pipeline construction, so you can use it to change your pipeline's workflow graph.

StaticValueProvider is useful when you know the value ahead of time. A common use case is when you use an API that expects a ValueProvider, but you already have a specific value that you want to use.

NestedValueProvider

NestedValueProvider allows you to compute a value from another ValueProvider object. The wrapped ValueProvider determines whether the computed ValueProvider is accessible during pipeline construction.

NestedValueProvider is useful when the pipeline option is something simple (such as a table name), but the actual value needed at runtime must be computed from the pipeline option.

Note: The Dataflow SDK for Python does not currently support NestedValueProvider.

Modifying your code to use runtime parameters

This section walks through how to use ValueProvider, StaticValueProvider, and NestedValueProvider.

Using ValueProvider in your pipeline options

You must modify your code to use ValueProvider for any options that you may want to set or use at runtime.

For this example, we will modify the following code from WordCount. The code adds an input file option, creates a pipeline, and reads lines from the input file.

Java: SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Modify the input file option to use ValueProvider as shown below.

Java: SDK 1.x

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

Java: SDK 2.x

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Replace add_argument with add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Using ValueProvider in your functions

To use runtime parameter values in your own functions, update the functions to use ValueProvider parameters.

The following example contains an integer ValueProvider option, and a simple function that adds an integer and is dependent on the ValueProvider integer. When this pipeline runs, MySumFn is applied to every integer of a PCollection that contains [1, 2, 3]. If the runtime value is 10, the resulting PCollection contains [11, 12, 13].

Java: SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3).withCoder(BigEndianIntegerCoder.of()));
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3).withCoder(BigEndianIntegerCoder.of()));
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Using StaticValueProvider

To provide a static value to your pipeline, use StaticValueProvider.

This simple example uses MySumFn from the previous ValueProvider example. MySumFn takes a ValueProvider<Integer>, but in this case, you already know what the value must be set to. You can use StaticValueProvider to specify your static value as a ValueProvider.

Java: SDK 1.x

Instead of getting the value at runtime as shown in this line of code from the previous example:

  .apply(ParDo.of(new MySumFn(options.getInt())))

You can use StaticValueProvider with a value of 10:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Java: SDK 2.x

Instead of getting the value at runtime as shown in this line of code from the previous example:

  .apply(ParDo.of(new MySumFn(options.getInt())))

You can use StaticValueProvider with a value of 10:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Instead of getting the value at runtime as shown in this line of code from the previous example:

  beam.ParDo(MySumFn(user_options.templated_int))

You can use StaticValueProvider with a value of 10:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Another situation where you might use StaticValueProvider is when you implement your own I/O module. If you want to support both regular parameters and runtime parameters, you can use StaticValueProvider to reduce the code duplication from implementing two similar methods.

Java: SDK 1.x

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Java: SDK 2.x

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In this example, there is a single constructor that accepts both a string or a ValueProvider argument. If the argument is a string, it is converted to a StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Using NestedValueProvider

Note: The Dataflow SDK for Python does not currently support NestedValueProvider.

To compute a value from another ValueProvider object, use NestedValueProvider.

NestedValueProvider takes a ValueProvider and a SerializableFunction translator as input. When you call .get() on a NestedValueProvider, the translator can create a new value based on the ValueProvider value. This mechanism allows you to use a ValueProvider value to create the actual value that you want to use.

Some example situations where you might want this behavior are:

  • Example 1: The user provides a file name file.txt. The transform prepends the file path gs://directory_name/ to the file name. Calling .get() returns gs://directory_name/file.txt.
  • Example 2: The user provides a substring for a BigQuery query, such as a specific date. The transform uses the substring to create the full query. Calling .get() returns the full query.

Note: NestedValueProvider only accepts one value input. You can't use a NestedValueProvider to combine two different values.

The following example code uses NestedValueProvider to implement the first example: the user provides a file name, and the transform prepends the file path to the file name.

Java: SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3).withCoder(BigEndianIntegerCoder.of()));
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3).withCoder(BigEndianIntegerCoder.of()));
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

The Dataflow SDK for Python does not currently support NestedValueProvider.

Metadata

You can extend your templates with additional metadata so that custom parameters may be validated when the template is executed. If you want to create metadata for your template, you need to:

  1. Create a JSON-formatted file named <template-name>_metadata using the parameters from the table below.

    Note: Do not name the file you create <template-name>_metadata.json. While the file contains JSON, it cannot end in the .json file extension.

  2. Store that file in Cloud Storage in the same folder as the template.

    Note: The template should be stored in <template-name> and the metadata should be stored in <template-name>_metadata.

Metadata parameters

Parameter Key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the templates.
parameters No. Defaults to an empty array. An array of additional parameters that will be used by the template.
name Yes The name of the parameter used in your template.
label Yes A human readable label that will be used in the UI to label the parameter.
help_text Yes A short paragraph of text describing the parameter.
is_optional No. Defaults to false. true if the parameter is required and false if the parameter is optional.
regexes No. Defaults to an empty array. An array of POSIX regular expressions in string form that will be used to validate the value of the parameter. For example: ["^[a-zA-Z]\w+"] is a single regular expression that validates that the value starts with a letter and then has one or more word characters.

Example metadata file

{
  "name": "WordCount",
  "description": "An example pipeline that counts words in the input file.",
  "parameters": [{
    "name": "inputFile",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": true
  },
  {
    "name": "output",
    "label": "Output Cloud Storage File Prefix",
    "help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  }]
}

Pipeline I/O and runtime parameters

You can use specific Cloud Dataflow I/O methods that accept standard runtime parameters. The following table contains the complete list of methods that accept runtime parameters. If a method is not listed, it does not currently accept runtime parameters.

Java: SDK 1.x

I/O Method
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* For BigQuery batch pipelines, templates can only be executed once, as the BigQuery job ID is set at template creation time.

Java: SDK 2.x

I/O Method
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.read().from()
TextIO.write().to()

* For BigQuery batch pipelines, templates can only be executed once, as the BigQuery job ID is set at template creation time. This restriction will be removed in a future release.

Python

I/O Method
TextIO apache_beam.io.ReadFromText()
apache_beam.io.WriteToText()

Creating and staging templates

After you write your parameterized pipeline, you must create and stage your template file. Follow the example for your Cloud Dataflow SDK version.

Note: After you create and stage a template, the staging location contains additional files that are necessary to execute your template. If you delete the staging location, template execution will fail.

Java: SDK 1.x

This Maven command creates and stages a template at the Cloud Storage location specified with dataflowJobFile.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --output=gs://[YOUR_BUCKET_NAME]/output \
                  --dataflowJobFile=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

Java: SDK 2.x

This Maven command creates and stages a template at the Cloud Storage location specified with templateLocation.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --output=gs://[YOUR_BUCKET_NAME]/output \
                  --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

Python

This Python command creates and stages a template at the Cloud Storage location specified with template_location.

Replace [YOUR_PROJECT_ID] with your project ID, and replace [YOUR_BUCKET_NAME] with the name of your Cloud Storage bucket.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project [YOUR_PROJECT_ID] \
    --staging_location gs://[YOUR_BUCKET_NAME]/staging \
    --temp_location gs://[YOUR_BUCKET_NAME]/temp \
    --output gs://[YOUR_BUCKET_NAME]/output \
    --template_location gs://[YOUR_BUCKET_NAME]/templates/mytemplate

After you create and stage your template, your next step is to execute the template.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation