ParDo による並列処理

ParDo は Dataflow SDK のコア並列処理オペレーションです。ParDo をジェネリック並列処理に使用します。ParDo の処理スタイルは Map/Shuffle/Reduce スタイル アルゴリズムの Mapper クラスでの処理と似ています。ParDo は、PCollection 入力の各要素を受け取り、その要素に対してなんらかの処理を行い、1 つまたは複数の要素を PCollection 出力に排出します。何も排出しないこともあります。

PCollection 入力の各要素に対して ParDo 実施する関数を指定します。指定した関数は、Dataflow ジョブの複数のワーカー インスタンスで、独立して並列で呼び出されます。

ParDo は次のようなさまざまなデータ処理オペレーションに役立ちます。

  • データセットをフィルタリング。 ParDo を使用して PCollection の各要素を検討し、その要素を新しいコレクションに出力するか、破棄することができます。
  • データセット内の各要素の型を書式設定または変換。 ParDoを使用して PCollection 内の要素をフォーマットできます。例えばキーと値のペアを印刷可能な文字列に変換できます。
  • データセット内の各要素の部分を抽出。 ParDo を使用して、PCollection 内の各要素の一部のみを抽出できます。これは、BigQuery テーブルの行から個々のフィールドを抽出する場合に特に役立ちます。
  • データセット内の各要素の計算を実施。 ParDo を使用して、PCollection のすべての要素または特定の要素に対して、簡単または複雑な計算を行えます。

ParDo はパイプラインの一般的な中間ステップでもあります。たとえば、ParDoキーを使用して PCollection の各要素にキーと値のペアを割り当てられます。後から GroupByKey 変換を使用してペアをグループ化することができます。

ParDo 変換を適用する

ParDoを使用するには、それを変換したい PCollection に適用して戻り値を適切な型の PCollection として保存します。

ParDo に指定した引数は、DoFn と呼ばれるDataflow SDK によって提供される特定の型のサブクラスである必要があります。DoFn の詳細については、このセクション後半の処理ロジックの作成と指定をご覧ください。

次のコード例は、文字列の PCollection に適用したベーシック ParDo で、DoFn ベースの関数を渡して各文字列の長さを計算し、整数の PCollection に文字列の長さを出力します。

Java

      // The input PCollection of Strings.
      PCollection<String> words = ...;

      // The DoFn to perform on each element in the input PCollection.
      static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

      // Apply a ParDo to the PCollection "words" to compute lengths for each word.
      PCollection<Integer> wordLengths = words.apply(
          ParDo
          .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                                  // we define above.
    

この例で、コードは入力コレクション("words")に対して apply を呼び出します。ParDoPTransform の引数です。.of オペレーションでは、この例で ComputeWordLengthFn() と呼ばれる各要素に対して実施する DoFn を指定します。

処理ロジックを作成して指定する

ParDo に提供する処理ロジックは、パイプラインの作成に使用する Dataflow SDK に必要な特定の型である必要があります。

Java

SDK クラスDoFnのサブクラスをビルドする必要があります。

指定する関数は、複数の Google Compute Engine インスタンスから、独立して呼び出されます。

また、DoFn が、複数の呼び出し間で不変な永続的状態に依存しないようにしてください。Cloud Platform の処理関数のインスタンスは、その関数の他のインスタンスの状態情報にアクセスできない可能性があります。

注: Dataflow SDK ParDoのバリアントを提供します。それを使用すれば、ユーザーコードのそれぞれの呼び出しに際して、副入力として変更できない持続データを渡すことができます。

Java

DoFn は、PCollection 入力の要素を一度に 1 つずつ処理します。DoFn のサブクラスを作成する際、入力要素の型と出力要素の型を型パラメータとして指定します。次のサンプルコードは、以前の例を基に入力Stringを受け付け、出力Integerを発生するComputeWordLengthFn()関数をどのように定義するかを示しています。

      static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
    

DoFn のサブクラスは、要素を処理するメソッドである processElement を上書きする必要があります。入力要素を実際に処理するコードをここに指定してください。次のコードサンプルは完全な ComputeWordLengthFn() です。

      static class ComputeWordLengthFn extends DoFn<String, Integer> {
        @Override
        public void processElement(ProcessContext c) {
          String word = c.element();
          c.output(word.length());
        }
      }
    

入力コレクションから要素を手動で抽出する必要はありません。Dataflow Java SDK が各要素の抽出を処理し、DoFn サブクラスに渡します。processElement のオーバーライド時のオーバーライドメソッドは、ProcessContext タイプのオブジェクトを受け入れる必要があります。これにより処理する要素にアクセスできます。ProcessContext.element() メソッドを使用して、DoFn に渡した要素にアクセスします。

PCollection の要素がキーと値のペアである場合、キーは ProcessContext.element().getKey() を使用して、値は ProcessContext.element().getValue() を使用してそれぞれアクセスできます。

Java

Dataflow SDK for Java は、出力要素を結果の PCollection に自動的にまとめます。ProcessContext オブジェクトを使用して、結果要素を processElement から出力コレクションに出力します。結果コレクションの要素を出力するには、メソッド ProcessContext.output() を使用します。

軽量 DoFn

Dataflow SDK では、DoFn の実装方法を単純化する言語固有の方法が提供されます。

Java

多くの場合、匿名の内部クラス インスタンスとして ParDo に渡すシンプルな DoFn 引数を作成することができます。DoFn が数行しかない場合は、インラインに指定する方がわかりやすくなります。次のコード例では、ParDoComputeWordLengthFn 関数を匿名 DoFn として適用しています。

      // The input PCollection.
      PCollection<String> words = ...;

      // Apply a ParDo with an anonymous DoFn to the PCollection words.
      // Save the result as the PCollection wordLengths.
      PCollection<Integer> wordLengths = words.apply(
        ParDo
          .named("ComputeWordLengths")            // the transform name
          .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
            @Override
            public void processElement(ProcessContext c) {
              c.output(c.element().length());
            }
          }));
    

入力の各要素に関数を適用して要素ごとに 1 つの出力のみを生成する上記のような変換の場合は、高度の MapElements 変換を使用できます。MapElements はラムダ関数を受け取るため、この方法は Java 8 では特に簡潔です。

      // The input PCollection.
      PCollection<String> words = ...;

      // Apply a MapElements with an anonymous lambda function to the PCollection words.
      // Save the result as the PCollection wordLengths.
      PCollection<Integer> wordLengths = words.apply(
        MapElements.via((String word) -> word.length())
            .withOutputType(new TypeDescriptor<Integer>() {});
    

同様に、Java 8 ラムダ関数を FilterFlatMapElements、およびPartition 変換に使用できます。これらの変換について詳しくは、Dataflow SDK の作成済み変換をご覧ください。

変換の名前

パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。グラフで変換を認識できるように、わかりやすい名前を変換に付けることが非常に重要です。

Java

.named オペレーションで、パイプラインのこのステップに対応する変換名を指定します。パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。ParDo に匿名 DoFn インスタンスを使用する場合、モニタリング インターフェースでのステップで読みやすい名前となるよう。わかりやすい名前を変換に付けることが非常に重要です。

副入力

主入力 PCollection に加え、ParDo 変換にその他の入力を副入力として指定できます。副入力は、DoFn が入力 PCollection の要素を処理するたびにアクセスできる、追加の入力です。副入力を指定すると他のデータのビューが作成され、それは各要素の処理中に ParDo 変換の DoFn の中から読むことができます。

副入力が役立つのは、ParDo が入力 PCollection の処理中に追加データを注入しなくてはならないが、追加データを実行時に決定する必要がある場合(ハードコーディングできない場合)です。このような値は、入力データによって決まることもあれば、パイプラインの別のブランチに依存することもあります。たとえば、パイプラインが走行しているときにリモート サービスの値を取得して、それを副入力として使用できます。または、パイプラインの別のブランチで計算された値を、別のブランチの ParDo への副入力として追加することもできます。

副入力を表す

Java

副入力の型は常に PCollectionView です。PCollectionViewは、PCollection を単一のエンティティとして表す1つの方法で、次に ParDo に副入力として渡すことができます。PCollectionView を作成して、PCollection を次のタイプのいずれかとして表すことができます。

ビューの型 使用量
View.asSingleton PCollectionを個別の値として表します。指定します。通常、Combine.globallyを使用して PCollection を結合した後で使用します。副入力が 1 つの計算値である場合に使用してください。通常は Combine.globally(...).asSingletonView() を使用して、シングルトン ビューを作成する必要があります。
View.asList PCollectionList として表します。このビューは、副入力が個別値のコレクションである場合に使用してください。
View.asMap PCollectionMap として表します。このビューは、副入力がキーと値のペア(PCollection<K, V>)で構成され、キーごとに 1 つの値を持つ場合に使用してください。
View.asMultimap PCollectionMultiMap として表します。このビューは、副入力がキーと値のペア(PCollection<K, V>)で構成され、キーごとに複数の値を持つ場合に使用してください。

注: 他のパイプラインデータと同様に、PCollectionView は一度作成すると変更できません。

ParDo に副入力を渡す

Java

.withSideInputs を呼び出して副入力を ParDo 変換に渡します。DoFn の内部で、DoFn.ProcessContext.sideInput メソッドを使用して副入力にアクセスします。

次のサンプルコードは、PCollection<Integer> からのシングルトン側入力を作成し、後続の ParDo に渡します。

この例では、個々の単語の集合を表し words と呼ばれる PCollection<String>、および単語長を表す PCollection<Integer> があります。後者を使用してシングルトン値として最大ワード長カットオフを計算し、その計算値を副入力として、カットオに基づいて words をフィルタルタする ParDo に渡します。

      // The input PCollection to ParDo.
      PCollection<String> words = ...;

      // A PCollection of word lengths that we'll combine into a single value.
      PCollection<Integer> wordLengths = ...; // Singleton PCollection

      // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
      final PCollectionView<Integer> maxWordLengthCutOffView =
         wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

      // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
        PCollection<String> wordsBelowCutOff =
        words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                          .of(new DoFn<String, String>() {
            public void processElement(ProcessContext c) {
              String word = c.element();
              // In our DoFn, access the side input.
              int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
              if (word.length() <= lengthCutOff) {
                c.output(word);
              }
        }}));
    }
    

副入力とウィンドウ処理

ウィンドウ処理された PCollectionPCollectionView を作成し、そのサイズが無制限であるため 1 つの値(または 1 つのコレクション クラス)に圧縮できない場合、PCollectionViewウィンドウごとに 1 つのエンティティを表します。つまり、PCollectionView は、ウィンドウごとに 1 つのシングルトン、ウィンドウごとに 1 つのリストなどを表します。

Dataflow は主入力要素のウィンドウを使用して、副入力要素の適切なウィンドウを探します。Dataflow は、主入力要素のウィンドウを副入力のウィンドウ セットに投影し、これによって生成されるウィンドウから副入力を使用します。主入力と副入力のウィンドウが同一の場合、投影によって対応する正確なウィンドウが提供されます。ただし、それぞれの入力のウィンドウが異なる場合、Dataflow は投影を使用して、最も適切な副入力ウィンドウを選択します。

Java

たとえば、主入力が 1 分間の固定時間ウィンドウを使用してウィンドウ処理され、副入力が 1 時間の固定時間ウィンドウを使用してウィンドウ処理されている場合、Dataflow は主入力のウィンドウを副入力のウィンドウ セットに対して投影し、適切な 1 時間の副入力ウィンドウから副入力の値を選択します。

副入力に複数のトリガー呼び出しが含まれる場合、Dataflow は最新のトリガー呼び出しの値を使用します。これが特に役立つのは、グローバル ウィンドウが 1 つの副入力を使用し、トリガーを指定する場合です。

副出力

ParDo は常に主出力 PCollection を生成する(apply からの戻り値として)一方で、ParDo に任意の数の追加出力 PCollection を生成させることができます。複数の出力を生成するように選択した場合は、ParDo により出力 PCollection のすべて(主出力を含む)がバンドルされて返されます。たとえば、Java では出力 PCollection が型安全の PCollectionTuple としてバンドルされます。

副出力用のタグ

Java

要素を副出力 PCollection に出力するには、ParDo が生成する各コレクションを識別する TupleTag オブジェクトを作成する必要があります。たとえば、ParDo が 3 つの出力 PCollection(主出力と 2 つの副出力)を生成する場合、3 つの関連する TupleTag を作成する必要があります。

次のサンプルコードに、主出力と 2 つの副出力を持つ ParDo のための TupleTag の作成方法を示します。

      // Input PCollection to our ParDo.
      PCollection<String> words = ...;

      // The ParDo will filter words whose length is below a cutoff and add them to
      // the main output PCollection<String>.
      // If a word is above the cutoff, the ParDo will add the word length to a side output
      // PCollection<Integer>.
      // If a word starts with the string "MARKER", the ParDo will add that word to a different
      // side output PCollection<String>.
      final int wordLengthCutOff = 10;

      // Create the TupleTags for the main and side outputs.
      // Main output.
      final TupleTag<String> wordsBelowCutOffTag =
          new TupleTag<String>(){};
      // Word lengths side output.
      final TupleTag<Integer> wordLengthsAboveCutOffTag =
          new TupleTag<Integer>(){};
      // "MARKER" words side output.
      final TupleTag<String> markedWordsTag =
          new TupleTag<String>(){};
    

出力タグを ParDo に渡す

それぞれの ParDo 出力のために TupleTag 指定した後は、.withOutputTags を呼び出して ParDo にタグを渡す必要があります。最初に主出力のタグを渡し、次に TupleTagList 内の任意の副出力のタグを渡します。

次に示すのは、前の例に対して 3 つの TupleTag(主出力に 1 つと副出力に 2 つ)を ParDo に渡す方法です。

      PCollectionTuple results =
          words.apply(
              ParDo
              // Specify the tag for the main output, wordsBelowCutoffTag.
              .withOutputTags(wordsBelowCutOffTag,
              // Specify the tags for the two side outputs as a TupleTagList.
                              TupleTagList.of(wordLengthsAboveCutOffTag)
                                          .and(markedWordsTag))
              .of(new DoFn<String, String>() {
                // DoFn continues here.
                ...
              }
    

すべての出力(主出力 PCollection を含む)は、results. という PCollectionTuple にバンドルされて返されることに注意してください。

DoFn で副出力に排出する

Java

ParDoDoFn の内部では、メソッド ProcessContext.sideOutput を使用して要素を副出力に出力できます。ProcessContext.sideOutput を呼び出す際、ターゲット側の出力コレクションのための適切な TupleTag を渡す必要があります。

前の例に基づいて、主出力と副出力を排出する DoFn を次に示します。

      .of(new DoFn<String, String>() {
         public void processElement(ProcessContext c) {
           String word = c.element();
           if (word.length() <= wordLengthCutOff) {
             // Emit this short word to the main output.
             c.output(word);
           } else {
             // Emit this long word's length to a side output.
             c.sideOutput(wordLengthsAboveCutOffTag, word.length());
           }
           if (word.startsWith("MARKER")) {
             // Emit this word to a different side output.
             c.sideOutput(markedWordsTag, word);
           }
         }}));
    

ParDo の後、戻された PCollectionTuple から主出力と副出力 PCollection を抽出する必要があります。タプルから個々の PCollection を抽出する方法の例は、PCollectionTupleに関するセクションをご覧ください。