Creating classic Dataflow templates

In this document, you learn how to create a custom classic template from your Dataflow pipeline code. Classic templates package existing Dataflow pipelines to create reusable templates that you can customize for each job by changing specific pipeline parameters. Rather than writing the template, you use a command to generate the template from an existing pipeline.

The following is a brief overview of the process. Details of this process are provided in subsequent sections.

  1. In your pipeline code, use the ValueProvider interface for all pipeline options that you want to set or use at runtime. Use DoFn objects that accept runtime parameters.
  2. Extend your template with additional metadata so that custom parameters are validated when the classic template is run. Examples of such metadata include the name of your custom classic template and optional parameters.
  3. Check if the pipeline I/O connectors support ValueProvider objects, and make changes as required.
  4. Create and stage the custom classic template.
  5. Run the custom classic template.

To learn about the different kinds of Dataflow templates, their benefits, and when to choose a classic template, see Dataflow templates.

Required permissions for running a classic template

The permissions that you need to run the Dataflow classic template depend on where you run the template, and whether your source and sink for the pipeline are in another project.

For more information about running Dataflow pipelines either locally or by using Google Cloud, see Dataflow security and permissions.

For a list of Dataflow roles and permissions, see Dataflow access control.

Limitations

  • The following pipeline option isn't supported with classic templates. If you need to control the number of worker harness threads, use Flex Templates.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • The Dataflow runner doesn't support the ValueProvider options for Pub/Sub topics and subscription parameters. If you require Pub/Sub options in your runtime parameters, use Flex Templates.

About runtime parameters and the ValueProvider interface

The ValueProvider interface allows pipelines to accept runtime parameters. Apache Beam provides three types of ValueProvider objects.

Name Description
RuntimeValueProvider

RuntimeValueProvider is the default ValueProvider type. RuntimeValueProvider allows your pipeline to accept a value that is only available during pipeline execution. The value is not available during pipeline construction, so you can't use the value to change your pipeline's workflow graph.

You can use isAccessible() to check if the value of a ValueProvider is available. If you call get() before pipeline execution, Apache Beam returns an error:
Value only available at runtime, but accessed from a non-runtime context.

Use RuntimeValueProvider when you do not know the value ahead of time. To change the parameter values at runtime, do not set values for the parameters in the template. Set the values for the parameters when you create jobs from the template.

StaticValueProvider

StaticValueProvider lets you provide a static value to your pipeline. The value is available during pipeline construction, so you can use the value to change your pipeline's workflow graph.

Use StaticValueProvider when you know the value ahead of time. See the StaticValueProvider section for examples.

NestedValueProvider

NestedValueProvider lets you compute a value from another ValueProvider object. NestedValueProvider wraps a ValueProvider, and the type of the wrapped ValueProvider determines whether the value is accessible during pipeline construction.

Use NestedValueProvider when you want to use the value to compute another value at runtime. See the NestedValueProvider section for examples.

Use runtime parameters in your pipeline code

This section walks through how to use ValueProvider, StaticValueProvider, and NestedValueProvider.

Use ValueProvider in your pipeline options

Use ValueProvider for all pipeline options that you want to set or use at runtime.

For example, the following WordCount code snippet does not support runtime parameters. The code adds an input file option, creates a pipeline, and reads lines from the input file:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

To add runtime parameter support, modify the input file option to use ValueProvider.

Java

Use ValueProvider<String> instead of String for the type of the input file option.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Replace add_argument with add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Use ValueProvider in your functions

To use runtime parameter values in your own functions, update the functions to use ValueProvider parameters.

The following example contains an integer ValueProvider option, and a simple function that adds an integer. The function depends on the ValueProvider integer. During execution, the pipeline applies MySumFn to every integer in a PCollection that contains [1, 2, 3]. If the runtime value is 10, the resulting PCollection contains [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Use StaticValueProvider

To provide a static value to your pipeline, use StaticValueProvider.

This example uses MySumFn, which is a DoFn that takes a ValueProvider<Integer>. If you know the value of the parameter ahead of time, you can use StaticValueProvider to specify your static value as a ValueProvider.

Java

This code gets the value at pipeline runtime:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Instead, you can use StaticValueProvider with a static value:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

This code gets the value at pipeline runtime:

  beam.ParDo(MySumFn(user_options.templated_int))

Instead, you can use StaticValueProvider with a static value:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

You can also use StaticValueProvider when you implement an I/O module that supports both regular parameters and runtime parameters. StaticValueProvider reduces the code duplication from implementing two similar methods.

Java

The source code for this example is from Apache Beam's TextIO.java on GitHub.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In this example, there is a single constructor that accepts both a string or a ValueProvider argument. If the argument is a string, it is converted to a StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Use NestedStaticValueProvider

To compute a value from another ValueProvider object, use NestedValueProvider.

NestedValueProvider takes a ValueProvider and a SerializableFunction translator as input. When you call .get() on a NestedValueProvider, the translator creates a new value based on the ValueProvider value. This translation lets you use a ValueProvider value to create the final value that you want.

In the following example, the user provides the filename file.txt. The transform prepends the path gs://directory_name/ to the filename. Calling .get() returns gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Use metadata in your pipeline code

You can extend your template with additional metadata so that custom parameters are validated when the template is run. If you want to create metadata for your template, follow these steps:

  1. Create a JSON-formatted file named TEMPLATE_NAME_metadata using the parameters in Metadata parameters and the format in Example metadata file. Replace TEMPLATE_NAME with the name of your template.

    Ensure the metadata file does not have a filename extension. For example, if your template name is myTemplate, then its metadata file must be myTemplate_metadata.

  2. Store the metadata file in Cloud Storage in the same folder as the template.

Metadata parameters

Parameter key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the template.
streaming No If true, this template supports streaming. The default value is false.
supportsAtLeastOnce No If true, this template supports at-least-once processing. The default value is false. Set this parameter to true if the template is designed to work with at-least-once streaming mode.
supportsExactlyOnce No If true, this template supports exactly-once processing. The default value is true.
parameters No An array of additional parameters that the template uses. An empty array is used by default.
name Yes The name of the parameter that is used in your template.
label Yes A human readable string that is used in the Google Cloud console to label the parameter.
helpText Yes A short paragraph of text that describes the parameter.
isOptional No false if the parameter is required and true if the parameter is optional. Unless set with a value, isOptional defaults to false. If you do not include this parameter key for your metadata, the metadata becomes a required parameter.
regexes No An array of POSIX-egrep regular expressions in string form that is used to validate the value of the parameter. For example, ["^[a-zA-Z][a-zA-Z0-9]+"] is a single regular expression that validates that the value starts with a letter and then has one or more characters. An empty array is used by default.

Example metadata file

Java

The Dataflow service uses the following metadata to validate the WordCount template's custom parameters:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

The Dataflow service uses the following metadata to validate the WordCount template's custom parameters:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

You can download metadata files for the Google-provided templates from the Dataflow template directory.

Supported pipeline I/O connectors and ValueProvider

Java

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for a specific connector and method, see the API reference documentation for the I/O connector. Supported methods have an overload with a ValueProvider. If a method does not have an overload, the method does not support runtime parameters. The following I/O connectors have at least partial ValueProvider support:

  • File-based IOs: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requires SDK 2.3.0 or later)
  • PubSubIO
  • SpannerIO

Python

Some I/O connectors contain methods that accept ValueProvider objects. To determine support for I/O connectors and their methods, see the API reference documentation for the connector. The following I/O connectors accept runtime parameters:

  • File-based IOs: textio, avroio, tfrecordio

Create and stage a classic template

After you write your pipeline, you must create and stage your template file. When you create and stage a template, the staging location contains additional files that are necessary to run your template. If you delete the staging location, the template fails to run. The Dataflow job does not run immediately after you stage the template. To run a custom template-based Dataflow job, you can use the Google Cloud console, the Dataflow REST API, or the gcloud CLI.

The following example shows how to stage a template file:

Java

This Maven command creates and stages a template at the Cloud Storage location specified with --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Verify that the templateLocation path is correct. Replace the following:

  • com.example.myclass: your Java class
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • TEMPLATE_NAME: the name of your template
  • REGION: the region to deploy your Dataflow job in

Python

This Python command creates and stages a template at the Cloud Storage location specified with --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verify that the template_location path is correct. Replace the following:

  • examples.mymodule: your Python module
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • TEMPLATE_NAME: the name of your template
  • REGION: the region to deploy your Dataflow job in

After you create and stage your template, your next step is to run the template.