テンプレートの作成

Cloud Dataflow テンプレートでは、ランタイムのパラメータを使用して、パイプラインの実行中にのみ使用可能な値を受け入れます。テンプレート化されたパイプラインの実行をカスタマイズするには、これらのパラメータをパイプライン内で実行される関数(DoFn など)に渡します。

Apache Beam パイプラインからテンプレートを作成するには、ランタイムのパラメータをサポートするように、次の変更をパイプライン コードに加える必要があります。

その後、テンプレートを作成してステージングします

ランタイムのパラメータと ValueProvider インターフェース

ValueProvider インターフェースを使用すると、パイプラインでランタイムのパラメータを受け入れることができます。Apache Beam には、3 つのタイプの ValueProvider オブジェクトが用意されています。

名前 説明
RuntimeValueProvider

RuntimeValueProvider は、ValueProvider のデフォルトのタイプです。 RuntimeValueProvider を使用すると、パイプラインの実行中にのみ使用可能な値をパイプラインで受け入れることができます。この値はパイプラインの作成中には使用できないため、この値を使用してパイプラインのワークフロー グラフを変更することはできません。

isAccessible() を使用すると、ValueProvider の値が使用可能かどうかを確認できます。パイプラインの実行前に get() を呼び出すと、Apache Beam から次のエラーが返されます。
Value only available at runtime, but accessed from a non-runtime context.

RuntimeValueProvider は、事前に値がわからない場合に使用します。

StaticValueProvider

StaticValueProvider を使用すると、パイプラインに静的な値を指定できます。この値はパイプラインの作成中に使用できるため、この値を使用してパイプラインのワークフロー グラフを変更できます。

StaticValueProvider は、事前に値がわかっている場合に使用します。例については、StaticValueProvider のセクションをご覧ください。

NestedValueProvider

NestedValueProvider を使用すると、別の ValueProvider オブジェクトから値を計算できます。NestedValueProviderValueProvider をラップしたもので、ラップされた ValueProvider のタイプによってパイプラインの作成中に値にアクセスできるかどうかが決定します。

NestedValueProvider は、値を使用して実行時に別の値を計算する場合に使用します。例については、NestedValueProvider のセクションをご覧ください。

注: Apache Beam SDK for Python では NestedValueProvider はサポートされていません。

コードを変更してランタイムのパラメータを使用する

このセクションでは、ValueProviderStaticValueProviderNestedValueProvider の使用方法について説明します。

ValueProvider をパイプライン オプションで使用する

実行時に設定または使用するすべてのパイプライン オプションで ValueProvider を使用します。

たとえば、次の WordCount コード スニペットではランタイムのパラメータはサポートされていません。このコードは、入力ファイルのオプションを追加し、パイプラインを作成し、入力ファイルから行を読み取ります。

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

ランタイムのパラメータのサポートを追加するには、ValueProvider を使用するように入力ファイルのオプションを変更します。

Java: SDK 2.x

入力ファイルのオプションのタイプには、String ではなく ValueProvider<String> を使用します。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argumentadd_value_provider_argument で置き換えます。

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

入力ファイルのオプションのタイプには、String ではなく ValueProvider<String> を使用します。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

ValueProvider を関数で使用する

ランタイムのパラメータ値を独自の関数で使用するには、ValueProvider パラメータを使用するように関数を更新します。

次の例には、整数 ValueProvider のオプションと、整数を追加する単純な関数が含まれています。この関数は整数 ValueProvider に依存します。このパイプラインが実行されると、[1, 2, 3] を含む PCollection のすべての整数に MySumFn が適用されます。ランタイムの値が 10 の場合、PCollection には [11, 12, 13] が含まれます。

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply(MapElements.via(
        new SimpleFunction<Integer, String>() {
          public String apply(Integer i) {
            return i.toString();
          }
        }))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

StaticValueProvider を使用する

パイプラインに静的な値を指定するには、StaticValueProvider を使用します。

この例では、ValueProvider<Integer> を取る DoFn である MySumFn を使用します。事前にパラメータの値がわかっている場合は、StaticValueProvider を使用して静的な値を ValueProvider として指定できます。

Java: SDK 2.x

このコードは、パイプラインの実行時に値を取得します。

  .apply(ParDo.of(new MySumFn(options.getInt())))

代わりに、StaticValueProvider を使用して静的な値を指定できます。

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

このコードは、パイプラインの実行時に値を取得します。

  beam.ParDo(MySumFn(user_options.templated_int))

代わりに、StaticValueProvider を使用して静的な値を指定できます。

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

このコードは、パイプラインの実行時に値を取得します。

  .apply(ParDo.of(new MySumFn(options.getInt())))

代わりに、StaticValueProvider を使用して静的な値を指定できます。

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

通常のパラメータとランタイムのパラメータの両方をサポートする I/O モジュールを実装する場合は、StaticValueProvider を使用することもできます。 StaticValueProvider によって、2 つの類似したメソッドを実装することによるコードの重複が減ります。

Java: SDK 2.x

この例のソースコードは、GitHub の Apache Beam の TextIO.java のものです。

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

この例には、引数として string または ValueProvider を受け入れる単一のコンストラクタがあります。引数が string の場合は、StaticValueProvider に変換されます。

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

この例のソースコードは、GitHub の Apache Beam の TextIO.java のものです。

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

NestedValueProvider を使用する

注: Apache Beam SDK for Python では NestedValueProvider はサポートされていません。

別の ValueProvider オブジェクトから値を計算するには、NestedValueProvider を使用します。

NestedValueProvider は、ValueProviderSerializableFunction トランスレータを入力として受け入れます。NestedValueProvider.get() を呼び出すと、トランスレータは ValueProvider 値に基づいて新しい値を作成します。この変換により、ValueProvider 値を使用して必要となる最終的な値を作成できます。

  • 例 1: ユーザーがファイル名 file.txt を指定します。変換によってファイル名の前にファイルパス gs://directory_name/ が付加されます。.get() を呼び出すと、gs://directory_name/file.txt が返されます。
  • 例 2: ユーザーが BigQuery クエリに部分文字列(特定の日付など)を指定します。変換では部分文字列を使用して完全なクエリを作成します。.get() を呼び出すと、完全なクエリが返されます。

注: NestedValueProvider は、単一の値の入力のみを受け入れます。NestedValueProvider を使用して 2 つの異なる値を組み合わせることはできません。

次のサンプルコードでは、NestedValueProvider を使用して最初の例を実装します。つまり、ユーザーがファイル名を指定し、変換によってファイル名の前にファイルパスが付加されます。

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

Apache Beam SDK for Python では NestedValueProvider はサポートされていません。

Java: SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

メタデータ

追加のメタデータを使用して、テンプレートの実行時にカスタムのパラメータを確認するようにテンプレートを拡張します。テンプレートのメタデータを作成する場合は、次の作業が必要です。

  1. 下記の表のパラメータを使用して、<template-name>_metadata という名前の JSON 形式のファイルを作成します。

    注: 作成するファイルに <template-name>_metadata.json という名前を付けないでください。このファイルには JSON が含まれていますが、.json ファイル拡張子で終えることはできません。

  2. JSON ファイルを Cloud Storage でテンプレートと同じフォルダに保存します。

    注: テンプレートは <template-name> に保存し、メタデータは <template-name>_metadata に保存してください。

メタデータのパラメータ

パラメータキー 必須 値の説明
name テンプレートの名前。
description × テンプレートを説明する短い文章。
parameters ×。デフォルトは空の配列です。 テンプレートで使用される追加のパラメータの配列。
name テンプレートで使用されるパラメータの名前。
label パラメータにラベルを付けるために UI で使用される、人が読める形式のラベル。
help_text テンプレートを説明する短い文章。
is_optional ×。デフォルトは false です。 パラメータが必須の場合は true、パラメータが省略可能な場合は false
regexes ×。デフォルトは空の配列です。 パラメータの値を検証するために使用される文字列形式の POSIX-egrep 正規表現の配列。たとえば、["^[a-zA-Z][a-zA-Z0-9]+"] は、値が文字で始まり、その後に 1 つ以上の文字が続くことを検証する単一の正規表現です。

メタデータ ファイルの例

Cloud Dataflow サービスでは、次のメタデータを使用して、WordCount テンプレートのカスタム パラメータを検証します。

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input GCS file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output GCS file(s)"
    }
  ]
}

Cloud Dataflow テンプレート ディレクトリからこのメタデータ ファイルをダウンロードできます。

パイプライン I/O とラインタイムのパラメータ

Java: SDK 2.x

一部の I/O コネクタには、ValueProvider オブジェクトを受け入れるメソッドが含まれています。特定のコネクタまたはメソッドがサポートされるかどうかは、I/O コネクタの API リファレンス ドキュメントで確認してください。サポートされているメソッドには、ValueProvider のオーバーロードがあります。メソッドにオーバーロードがない場合、そのメソッドではランタイムのパラメータがサポートされません。次の I/O コネクタでは、ValueProvider が少なくとも部分的にサポートされます。

  • ファイルベースの I/O: TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(SDK 2.3.0 以降が必要)
  • PubSubIO
  • SpannerIO

* 注: BigQuery からの読み込みを行うバッチ パイプラインを実行する場合は、BigQuery からのすべての読み込みで .withTemplateCompatibility() を使用する必要があります。

Python

一部の I/O コネクタには、ValueProvider オブジェクトを受け入れるメソッドが含まれています。I/O コネクタとメソッドがサポートされているかどうかは、コネクタの API リファレンス ドキュメントで確認してください。次の I/O コネクタは、ランタイムのパラメータを受け入れます。

  • ファイルベースの I/O: textioavroiotfrecordio

Java: SDK 1.x

次の表に、ランタイムのパラメータを受け入れるメソッドの完全なリストを示します。

I/O メソッド
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* BigQuery のバッチ パイプラインでは、BigQuery のジョブ ID がテンプレートの作成時に設定されるため、テンプレートを実行できるのは 1 回のみです。

テンプレートの作成とステージング

パイプラインを作成したら、テンプレート ファイルを作成してステージングする必要があります。お使いの SDK バージョンのコマンドを使用してください。

注: テンプレートの作成とステージングが完了したら、ステージング場所にテンプレートの実行に必要なその他のファイルが格納されます。ステージング場所を削除すると、テンプレートの実行は失敗します。

Java: SDK 2.x

この Maven コマンドは、--templateLocation で指定された Cloud Storage の場所にテンプレートを作成してステージングします。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。

  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。

  • YOUR_TEMPLATE_NAME をテンプレートの名前に置き換えます。

  • com.example.myclass を Java クラスに置き換えます。

  • templateLocation パスが正しいことを確認します。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

Python

この Python コマンドは、--template_location で指定された Cloud Storage の場所にテンプレートを作成してステージングします。このコマンドを次のように変更します。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。

  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。

  • YOUR_TEMPLATE_NAME をテンプレートの名前に置き換えます。

  • examples.mymodule を Python モジュールに置き換えます。

  • template_location パスが正しいことを確認します。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

Java: SDK 1.x

この Maven コマンドは、--dataflowJobFile で指定された Cloud Storage の場所にテンプレートを作成してステージングします。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。

  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。

  • YOUR_TEMPLATE_NAME をテンプレートの名前に置き換えます。

  • com.example.myclass を Java クラスに置き換えます。

  • dataflowJobFile パスが正しいことを確認します。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --dataflowJobFile=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

テンプレートの作成とステージングが完了したら、次はテンプレートを実行します。