複合変換を作成する

Dataflow SDK 内の変換はネスト構造にすることができます。ネスト構造では、複数のシンプルな変換から複合変換を構成できます。この種の変換は、他の複数の変換オペレーションで構成される場合があります(つまり、1 つ以上の ParDoCombine、または GroupByKey を実行する場合があります)。これらの変換は、複合変換と呼ばれます。複合変換は、複数のステップで構成された再利用可能な変換を作成したい場合に便利です。

1 つの複合変換内で複数の変換をネストすると、Dataflow パイプラインに次のような利点を提供できます。

  • コードのモジュール化が促進され、コードがわかりやすくなり、コードの再利用がしやすくなる。
  • Dataflow Monitoring Interface では複合変換を名前で参照できるため、デベロッパーがパイプラインの実行時にその進捗状況を追跡・理解しやすくなる。

複合変換のサンプル

Dataflow SDK で事前に記述されている変換の多くは、複合変換です。

CountWords に含まれる WordCount example program 変換は、複合変換の一例です。CountWords は、ネストされた複数の変換で構成された PTransform サブクラスです。

その apply メソッドで、CountWords 変換は次の変換オペレーションを適用します。

  1. テキスト行を含んだ入力 PCollectionParDo を適用し、個別の単語を含んだ出力 PCollection を生成する。
  2. 単語の PCollection に Dataflow SDK ライブラリ変換 Count* を適用し、Key-Value ペアの PCollection を生成する。各キーはテキスト内の単語を表し、各値は元のデータ内での単語の出現回数を表します。
  3. 最終的な ParDo を Key-Value ペアの PCollection に適用し、出力ファイルへの書き込みに適した印刷可能文字列の PCollection を生成する。

図 1 は、CountWords を含んだパイプラインが複合変換を使用してどのように構築されるかを示した図です。

CountWords 変換は、2 つの ParDo オペレーションと、Count という SDK 付属の変換を使用する複合変換です。
図 1: 複合的な CountWords 変換の構造

Java

複合変換のパラメータと戻り値は、全体的な変換の初期入力型と最終戻り値に一致する必要があります。たとえば、CountWords.apply は入力の PCollection<String> を受け付け、変換の中間データによって型が複数回変更された場合でも、PCollection<String> を返します。

  static class CountWords
      extends PTransform<PCollection<String>, PCollection<String>> {
    @Override
    public PCollection<String> apply(PCollection<String> lines) {
      PCollection<String> words = lines.apply(
          ParDo
          .named("ExtractWords")
          .of(new ExtractWordsFn()));

      PCollection<KV<String, Integer>> wordCounts =
          words.apply(Count.<String>perElement());

      PCollection<String> results = wordCounts.apply(
          ParDo
          .named("FormatCounts")
          .of(new DoFn<KV<String, Integer>, String>() {
              @Override
              public void processElement(ProcessContext c) {
                c.output(c.element().getKey() + ": " + c.element().getValue());
              }
            }));

      return results;
    }
  }

複合変換の作成

デベロッパーは、Dataflow SDK 内の Ptransform クラスのサブクラスを作成し、apply メソッドをオーバーライドして実際の処理ロジックを指定することで、独自の複合変換を作成できます。その後は、その変換を SDK 内の組み込み変換と同様に使用できます。

Java

PTransform クラスの型パラメータについては、変換が入力として受け取り、出力として生成する PCollection 型を渡します。複数の PCollections を入力として受け取ったり、複数の PCollections を出力として生成するには、関連する型パラメータにマルチコレクション型のいずれかを使用します。

次のサンプルコードは、StringPCollection を入力として受け付け、IntegerPCollection を出力する PTransform の宣言方法を示したものです。

  static class ComputeWordLengths
    extends PTransform<PCollection<String>, PCollection<Integer>> {
    ...
  }

apply メソッドのオーバーライド

PTransform サブクラス内で、apply メソッドをオーバーライドする必要があります。PTransform の処理ロジックを追加する作業は、apply の中で行います。apply のオーバーライドでは、パラメータとして適切な型の入力 PCollection を受け取り、結果値として出力 PCollection を指定する必要があります。

Java

次のサンプルコードは、前述のサンプルで宣言された ComputeWordLengths クラスの apply をオーバーライドする方法を示したものです。

  static class ComputeWordLengths
      extends PTransform<PCollection<String>, PCollection<Integer>> {
    @Override
    public PCollection<Integer> apply(PCollection<String>) {
      ...
      // transform logic goes here
      ...
    }

PTransform サブクラス内で apply メソッドをオーバーライドして適切な入力 PCollection を受け取り、対応する出力 PCollection を返す限り、変換は必要な数だけ含めることができます。これらの変換には、Dataflow SDK のライブラリに含まれるコア変換、複合変換、または変換を含めることができます。

Java

PTransformapply メソッドは、変換のユーザーによって直接呼び出されるものではありません。代わりに、変換を引数として使用して、PCollection 自体に対して apply メソッドを呼び出す必要があります。これにより、パイプラインの構造内で変換をネスト化することが可能になります。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。