Dataflow クラシック テンプレートの作成

このドキュメントでは、Dataflow パイプライン コードからカスタム クラシック テンプレートを作成する方法について説明します。クラシック テンプレートは、既存の Dataflow パイプラインをパッケージ化しています。特定のパイプライン パラメータを変更することで、ジョブごとにカスタマイズできる再利用可能なテンプレートを作成できます。テンプレートを作成する代わりに、コマンドを使用して既存のパイプラインからテンプレートを生成します。

まず、このプロセスを簡単に紹介します。詳細については、以降のセクションで説明します。

  1. パイプラインのコードでは、実行時に設定または使用するすべてのパイプライン オプションに ValueProvider インターフェースを使用します。ランタイムのパラメータを受け入れる DoFn オブジェクトを使用します。
  2. 追加のメタデータを使用して、クラシック テンプレートの実行時にカスタム パラメータが検証されるようにテンプレートを拡張します。このようなメタデータの例としては、カスタム クラシック テンプレートの名前やオプションのパラメータなどがあります。
  3. パイプライン I/O コネクタが ValueProvider オブジェクトをサポートしているかどうかを確認し、必要に応じて変更します。
  4. カスタム クラシック テンプレートを作成してステージングします。
  5. カスタム クラシック テンプレートを実行します。

さまざまな種類の Dataflow テンプレート、それぞれのメリット、従来のテンプレートを選択するタイミングについては、Dataflow テンプレートをご覧ください。

クラシック テンプレートの実行に必要な権限

Dataflow のクラシック テンプレートを実行するために必要な権限は、テンプレートを実行する場所と、パイプラインのソースとシンクが別のプロジェクト内にあるかどうかによって異なります。

ローカルまたは Google Cloud で Dataflow パイプラインを実行する詳しい方法については、Dataflow のセキュリティと権限をご覧ください。

Dataflow のロールと権限のリストについては、Dataflow アクセス制御をご覧ください。

制限事項

  • 次のパイプライン オプションは、クラシック テンプレートではサポートされていません。ワーカー ハーネス スレッドの数を制御する必要がある場合は、Flex テンプレートを使用します。

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Dataflow ランナーは、Pub/Sub トピックとサブスクリプション パラメータの ValueProvider オプションをサポートしていません。ランタイムのパラメータで Pub/Sub オプションが必要な場合は、Flex テンプレートを使用します。

ランタイムのパラメータと ValueProvider インターフェースについて

ValueProvider インターフェースを使用すると、パイプラインでランタイムのパラメータを受け入れることができます。Apache Beam には、3 タイプの ValueProvider オブジェクトが用意されています。

名前 説明
RuntimeValueProvider

RuntimeValueProvider は、ValueProvider のデフォルトのタイプです。RuntimeValueProvider を使用すると、パイプラインの実行中にのみ使用可能な値をパイプラインで受け入れることができます。この値はパイプラインの作成中には使用できないため、この値を使用してパイプラインのワークフロー グラフを変更することはできません。

isAccessible() を使用すると、ValueProvider の値が使用可能かどうかを確認できます。パイプラインの実行前に get() を呼び出すと、Apache Beam から次のエラーが返されます。
Value only available at runtime, but accessed from a non-runtime context.

RuntimeValueProvider は、事前に値がわからない場合に使用します。実行時にパラメータ値を変更する場合は、テンプレートのパラメータの値を設定しないでください。テンプレートからジョブを作成するときに、パラメータの値を設定します。

StaticValueProvider

StaticValueProvider を使用すると、パイプラインに静的な値を指定できます。この値はパイプラインの作成中に使用できるため、この値を使用してパイプラインのワークフロー グラフを変更できます。

StaticValueProvider は、事前に値がわかっている場合に使用します。例については、StaticValueProvider のセクションをご覧ください。

NestedValueProvider

NestedValueProvider を使用すると、別の ValueProvider オブジェクトから値を計算できます。NestedValueProviderValueProvider をラップしたもので、ラップされた ValueProvider のタイプによってパイプラインの作成中に値にアクセスできるかどうかが決定されます。

NestedValueProvider は、値を使用して実行時に別の値を計算する場合に使用します。例については、NestedValueProvider のセクションをご覧ください。

パイプライン コードでランタイム パラメータを使用する

このセクションでは、ValueProviderStaticValueProviderNestedValueProvider の使用方法について説明します。

パイプライン オプションで ValueProvider を使用する

実行時に設定または使用するすべてのパイプライン オプションで ValueProvider を使用します。

たとえば、次の WordCount コード スニペットではランタイムのパラメータはサポートされていません。このコードは、入力ファイルのオプションを追加し、パイプラインを作成して、入力ファイルから行を読み取ります。

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

ランタイムのパラメータのサポートを追加するには、ValueProvider を使用するように入力ファイルのオプションを変更します。

Java

入力ファイルのオプションのタイプには、String ではなく ValueProvider<String> を使用します。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argumentadd_value_provider_argument に置き換えます。

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

関数で ValueProvider を使用する

ランタイムのパラメータ値を独自の関数で使用するには、ValueProvider パラメータを使用するように関数を更新します。

次の例には、整数 ValueProvider のオプションと、整数を追加する単純な関数が含まれています。この関数は整数 ValueProvider に依存します。このパイプラインが実行されると、[1, 2, 3] を含む PCollection のすべての整数に MySumFn が適用されます。ランタイムの値が 10 の場合、PCollection には [11, 12, 13] が含まれます。

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

StaticValueProvider を使用する

パイプラインに静的な値を指定するには、StaticValueProvider を使用します。

この例では、ValueProvider<Integer> を取る DoFnMySumFn を使用します。事前にパラメータの値がわかっている場合は、StaticValueProvider を使用して静的な値を ValueProvider として指定できます。

Java

このコードは、パイプラインの実行時に値を取得します。

  .apply(ParDo.of(new MySumFn(options.getInt())))

代わりに、StaticValueProvider を使用して静的な値を指定できます。

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

このコードは、パイプラインの実行時に値を取得します。

  beam.ParDo(MySumFn(user_options.templated_int))

代わりに、StaticValueProvider を使用して静的な値を指定できます。

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

通常のパラメータとランタイムのパラメータの両方をサポートする I/O モジュールを実装する場合は、StaticValueProvider を使用することもできます。StaticValueProvider によって、2 つの類似したメソッドを実装することによるコードの重複が減ります。

Java

この例のソースコードは、GitHub の Apache Beam の TextIO.java のものです。

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

この例には、引数として string または ValueProvider を受け入れる単一のコンストラクタがあります。引数が string の場合は、StaticValueProvider に変換されます。

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

NestedStaticValueProvider を使用する

別の ValueProvider オブジェクトから値を計算するには、NestedValueProvider を使用します。

NestedValueProvider は、ValueProviderSerializableFunction トランスレータを入力として受け入れます。NestedValueProvider.get() を呼び出すと、トランスレータは ValueProvider 値に基づいて新しい値を作成します。この変換では、ValueProvider 値を使用して必要な最終的な値を作成できます。

次の例では、ユーザーがファイル名 file.txt を指定しています。変換により、ファイル名にパス gs://directory_name/ が追加されます。.get() を呼び出すと、gs://directory_name/file.txt が返されます。

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

パイプライン コードでメタデータを使用する

追加のメタデータを使用して、テンプレートの実行時にカスタム パラメータが確認されるようにテンプレートを拡張できます。テンプレートのメタデータを作成する場合、手順は次のとおりです。

  1. メタデータ パラメータのパラメータとメタデータ ファイルの例の形式を使用して、TEMPLATE_NAME_metadata という名前の JSON 形式のファイルを作成します。TEMPLATE_NAME をテンプレートの名前に置き換えます。

    メタデータ ファイルにファイル名の拡張子が付いていないことを確認します。たとえば、テンプレート名が myTemplate の場合、メタデータ ファイルは myTemplate_metadata である必要があります。

  2. メタデータ ファイルを Cloud Storage でテンプレートと同じフォルダに保存します。

メタデータのパラメータ

パラメータキー 必須 値の説明
name テンプレートの名前。
description × テンプレートを説明する短い文章。
streaming × true の場合、このテンプレートはストリーミングをサポートします。デフォルト値は false です。
supportsAtLeastOnce × true の場合、このテンプレートは 1 回以上の処理をサポートします。デフォルト値は false です。テンプレートが 1 回以上のストリーミング モードで動作するように設計されている場合は、このパラメータを true に設定します。
supportsExactlyOnce × true の場合、このテンプレートは 1 回限りの処理をサポートします。デフォルト値は true です。
parameters × テンプレートで使用する追加のパラメータの配列。デフォルトで空の配列が使用されます。
name テンプレートで使用されるパラメータの名前。
label パラメータにラベルを付けるために Google Cloud コンソールで使用される、人が読める文字列。
helpText パラメータを説明する短い文章。
isOptional × パラメータが必須の場合は false、パラメータが省略可能な場合は true。値が設定されていない場合、isOptional はデフォルトで false に設定されます。このパラメータキーをメタデータに含めない場合、メタデータが必須パラメータになります。
regexes × パラメータの値を検証するために使用される文字列形式の POSIX-egrep 正規表現の配列。たとえば、["^[a-zA-Z][a-zA-Z0-9]+"] は、値がアルファベットで始まり、その後に文字が 1 つ以上続くことを検証する単独の正規表現です。デフォルトで空の配列が使用されます。

メタデータ ファイルの例

Java

Dataflow サービスでは、次のメタデータを使用して、WordCount テンプレートのカスタム パラメータを検証します。

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow サービスでは、次のメタデータを使用して、WordCount テンプレートのカスタム パラメータを検証します。

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Google 提供のテンプレートのメタデータ ファイルは、Dataflow のテンプレート ディレクトリからダウンロードできます。

サポートされているパイプライン I/O コネクタと ValueProvider

Java

一部の I/O コネクタには、ValueProvider オブジェクトを受け入れるメソッドが含まれています。特定のコネクタまたはメソッドがサポートされるかどうかは、I/O コネクタの API リファレンス ドキュメントで確認してください。サポートされているメソッドには、ValueProvider のオーバーロードがあります。メソッドにオーバーロードがない場合、そのメソッドではランタイムのパラメータがサポートされません。次の I/O コネクタでは、ValueProvider が少なくとも部分的にサポートされます。

  • ファイルベースの I/O: TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(SDK 2.3.0 以降が必要)
  • PubSubIO
  • SpannerIO

Python

一部の I/O コネクタには、ValueProvider オブジェクトを受け入れるメソッドが含まれています。I/O コネクタとメソッドがサポートされているかどうかは、コネクタの API リファレンス ドキュメントで確認してください。次の I/O コネクタは、ランタイムのパラメータを受け入れます。

  • ファイルベースの I/O: textioavroiotfrecordio

クラシック テンプレートを作成してステージングする

パイプラインを作成したら、テンプレート ファイルを作成してステージングする必要があります。テンプレートの作成とステージングが完了したら、ステージング場所にテンプレートの実行に必要なその他のファイルが格納されます。ステージング場所を削除すると、テンプレートの実行が失敗します。Dataflow ジョブはテンプレートをステージングした直後には実行されません。カスタム テンプレート ベースの Dataflow ジョブを実行するには、Google Cloud コンソールDataflow REST APIgcloud CLI を使用します。

次の例は、テンプレート ファイルをステージングする方法を示しています。

Java

この Maven コマンドは、--templateLocation で指定された Cloud Storage の場所にテンプレートを作成してステージングします。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

templateLocation パスが正しいことを確認します。次のように置き換えます。

  • com.example.myclass: Java クラス
  • PROJECT_ID: プロジェクト ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • TEMPLATE_NAME: テンプレートの名前
  • REGION: Dataflow ジョブをデプロイするリージョン

Python

この Python コマンドは、--template_location で指定された Cloud Storage の場所にテンプレートを作成してステージングします。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

template_location パスが正しいことを確認します。次のように置き換えます。

  • examples.mymodule: Python モジュール
  • PROJECT_ID: プロジェクト ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • TEMPLATE_NAME: テンプレートの名前
  • REGION: Dataflow ジョブをデプロイするリージョン

テンプレートの作成とステージングが完了したら、次はテンプレートを実行します。