Creating Templates

You can create your own Google Cloud Dataflow templates that use custom parameters. If you want to create a template based on your existing pipeline, you must:

  • Modify your pipeline code to use runtime parameters.
  • Use the TemplatingDataflowPipelineRunner to create and stage your template in Google Cloud Storage.

Note: To create templates, you must have Cloud Dataflow Java SDK version 1.9.0 or higher.

Modifying code to use runtime parameters

ValueProvider is an interface that allows pipelines to accept parameters at runtime. You must modify your code to use ValueProvider for any options that you may want to set or use at runtime.

In this WordCount example, the existing pipeline code extends PipelineOptions, adds an input file option, creates a pipeline, and passes in the options.

  public interface WordCountOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);

    ...
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()));

  ...

Modify the input file option to use ValueProvider as shown.

  public interface WordCountOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider getInputFile();
    void setInputFile(ValueProvider value);

    ...
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()));

  ...

To use runtime parameter values in your own functions, update the functions to use ValueProvider parameters. This example shows a simple function that adds an integer and is dependent on the ValueProvider. The function uses options.getInt() to get the integer value.

  class Options {
      ValueProvider getInt();
      void setInt(...);
  }

  class MyFn implements SerializableFunction {
      ValueProvider myInteger;

      MyFn(Options options) {
          this.myInteger = options.getInt();
      }

      @Override
      public Integer apply(Integer in) {
          return in + myInteger.get();
      }
  }

  Options options = ...;

  ...
  pipeline.apply(ParDo.of(new MyFn(options)));
  ...

Pipeline I/O and runtime parameters

Google BigQuery, Google Cloud Datastore, Google Cloud Pub/Sub, and TextIO I/O APIs for Dataflow provide methods that accept standard runtime parameters.

I/O Methods that accept runtime parameters
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

Note: For BigQuery batch pipelines, templates can only be executed once, as the BigQuery job ID is set at template creation time. This restriction will be removed in a future release.

Creating and staging templates

To create and stage your template file, use the TemplatingDataflowPipelineRunner.

Note: Template creation is currently limited to Java and Maven.

Java and Maven

This example generates a Maven project and uses the WordCount example pipeline to create and stage a template. Set runner to TemplatingDataflowPipelineRunner and dataflowJobFile to the Cloud Storage location where the template file will be saved.

mvn archetype:generate \
 -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
 -DarchetypeGroupId=com.google.cloud.dataflow \
 -DgroupId=com.example \
 -DartifactId=first-dataflow \
 -Dversion="0.1" \
 -DinteractiveMode=false \
 -Dpackage=com.google.cloud.dataflow.examples

cd first-dataflow

mvn compile exec:java \
 -Dexec.mainClass=com.google.cloud.dataflow.examples.WordCount \
 -Dexec.args="--project={YOUR_PROJECT_ID} \
              --stagingLocation=gs://{YOUR_BUCKET}/staging \
              --output=gs://{YOUR_BUCKET}/output \
              --dataflowJobFile=gs://{YOUR_BUCKET}/templates/WordCount \
              --runner=TemplatingDataflowPipelineRunner"

After you create and stage your template, your next step is to execute the template.

Send feedback about...

Cloud Dataflow Documentation