ParDo による並列処理

ParDo は、Dataflow SDK のコア並列処理オペレーションです。一般的な並列処理で ParDo を使用します。ParDo の処理スタイルは Map/Shuffle/Reduce スタイル アルゴリズムの Mapper クラスでの処理と似ています。ParDo は、PCollection 入力の各要素を受け取り、その要素に対してなんらかの処理を行い、1 つまたは複数の要素を PCollection 出力に排出します。何も排出しないこともあります。

ParDo が入力 PCollection の各要素に対して実行する関数を指定します。指定した関数は、Dataflow ジョブの複数のワーカー インスタンスで、独立して並列で呼び出されます。

ParDo は次のようなさまざまなデータ処理オペレーションに役立ちます。

  • データセットをフィルタリングするParDo を使用して PCollection の各要素を評価し、その要素を新しいコレクションに出力するか、破棄できます。
  • データセット内の各要素の型をフォーマットまたは変換するParDoを使用して PCollection 内の要素をフォーマットできます。たとえば、Key-Value ペアを印刷可能な文字列に変換できます。
  • データセット内の各要素の部分を抽出するParDo を使用して、PCollection 内の各要素の一部のみを抽出できます。これは、BigQuery テーブルの行から個々のフィールドを抽出する場合に特に役立ちます。
  • データセット内の各要素の計算を実行するParDo を使用して、PCollection のすべての要素または特定の要素に対して、簡単または複雑な計算を行えます。

ParDo は、パイプラインの一般的な中間ステップでもあります。たとえば、ParDo キーを使用して PCollection の各要素に Key-Value ペアを割り当てられます。後から GroupByKey 変換を使用してペアをグループ化することができます。

ParDo 変換を適用する

ParDo を使用するには、それを変換する PCollection に適用し、戻り値を適切な型の PCollection として保存します。

ParDo に指定する引数は、Dataflow SDK の特定の型のサブクラス(DoFn)にする必要があります。DoFn の詳細については、このセクション後半の処理ロジックを作成して指定するをご覧ください。

次のサンプルコードは、文字列の PCollection に基本の ParDo を適用しています。DoFn ベースの関数を渡して各文字列の長さを計算し、文字列の長さを整数の PCollection に出力します。

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

この例で、コードは入力コレクション("words")に対して apply を呼び出します。ParDoPTransform の引数です。.of オペレーションでは、この例で ComputeWordLengthFn() と呼ばれる各要素に対して実施する DoFn を指定します。

処理ロジックを作成して指定する

ParDo に提供する処理ロジックは、パイプラインの作成に使用する Dataflow SDK に必要な特定の型である必要があります。

Java

SDK クラス DoFn のサブクラスを作成する必要があります。

指定する関数は、複数の Google Compute Engine インスタンスから、独立して呼び出されます。

また、DoFn が、複数の呼び出し間で不変な永続的状態に依存しないようにしてください。Cloud Platform の処理関数のインスタンスは、その関数の他のインスタンスの状態情報にアクセスできない可能性があります。

注: Dataflow SDK では ParDo のバリアントが提供されます。これを使用すると、不変の永続的データを、ユーザーコードの各呼び出しに副入力として渡せます。

Java

DoFn は、PCollection 入力の要素を一度に 1 つずつ処理します。DoFn のサブクラスを作成する際、入力要素の型と出力要素の型を型パラメータとして指定します。次のサンプルコードは前の例の ComputeWordLengthFn() 関数の定義方法を示しています。この関数は、入力 String を受け取って出力 Integer を生成します。

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

DoFn のサブクラスは、要素を処理するメソッド processElement をオーバーライドする必要があります。入力要素を実際に処理するコードをここに指定してください。次のサンプルコードは完全な ComputeWordLengthFn() です。

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

入力コレクションから要素を手動で抽出する必要はありません。Dataflow Java SDK が各要素の抽出を処理し、DoFn サブクラスに渡します。processElement のオーバーライド時のオーバーライド メソッドは、ProcessContext 型のオブジェクトを受け入れる必要があります。これにより、処理する要素にアクセスできます。ProcessContext.element() メソッドを使用して、DoFn に渡した要素にアクセスします。

PCollection の要素が Key-Value ペアである場合、キーには ProcessContext.element().getKey() を使用してアクセスします。また、値には ProcessContext.element().getValue() を使用してアクセスできます。

Java

Dataflow SDK for Java は、出力要素を結果の PCollection に自動的にまとめます。ProcessContext オブジェクトを使用して、結果要素を processElement から出力コレクションに出力します。結果コレクションの要素を出力するには、メソッド ProcessContext.output() を使用します。

軽量 DoFn

Dataflow SDK では、DoFn の実装方法を単純化する言語固有の方法が提供されます。

Java

多くの場合、匿名の内部クラス インスタンスとして ParDo に渡すシンプルな DoFn 引数を作成できます。DoFn が数行しかない場合は、インラインに指定する方がわかりやすくなります。次のサンプルコードでは、ParDoComputeWordLengthFn 関数を匿名 DoFn として適用しています。

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

上記のような変換では、入力の各要素に関数を適用して要素ごとに 1 つの出力のみを生成していますが、このような場合は高度の MapElements 変換を使用できます。MapElements はラムダ関数を受け取るため、この方法は Java 8 では特に簡潔です。

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

同様に、Java 8 ラムダ関数を FilterFlatMapElementsPartition 変換に使用できます。これらの変換の詳細については、Dataflow SDK の作成済み変換をご覧ください。

変換の名前

パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。グラフで変換を認識できるように、わかりやすい名前を変換に付けることが非常に重要です。

Java

.named オペレーションで、パイプラインのこのステップに対応する変換名を指定します。パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。ParDo に匿名 DoFn インスタンスを使用する場合、モニタリング インターフェースでのステップで簡単に確認できるように、わかりやすい名前を変換に付けることが非常に重要になります。

副入力

主入力 PCollection に加え、ParDo 変換にその他の入力を副入力として指定できます。副入力は、DoFn が入力 PCollection の要素を処理するたびにアクセスできる追加の入力です。副入力を指定すると、他のデータのビューを作成し、各要素の処理中に ParDo 変換の DoFn からデータを読み込むことができます。

副入力が役立つのは、ParDo が入力 PCollection の各要素を処理している間に追加データが必要になり、追加データを実行時に決める必要がある場合です(ハードコーディングできない場合)。このような値は、入力データによって決まることもあれば、パイプラインの別のブランチに依存することもあります。たとえば、パイプラインが走行しているときにリモート サービスの値を取得して、それを副入力として使用できます。または、パイプラインの別のブランチで計算された値を、別のブランチの ParDo への副入力として追加することもできます。

副入力を表す

Java

副入力の型は常に PCollectionView です。PCollectionView は、PCollection を単一のエンティティとして表す 1 つの方法で、次に ParDo に副入力として渡すことができます。PCollectionView を作成して、PCollection を次の型のいずれかとして表すことができます。

ビューの型 用途
View.asSingleton PCollection を個別の値として表します。通常、Combine.globally を使用して PCollection を結合した後で使用します。副入力が 1 つの計算値である場合に使用してください。通常は Combine.globally(...).asSingletonView() を使用して、シングルトン ビューを作成する必要があります。
View.asList PCollectionList として表します。このビューは、副入力が個別値のコレクションである場合に使用してください。
View.asMap PCollectionMap として表します。このビューは、副入力が Key-Value ペア(PCollection<K, V>)で構成され、キーごとに 1 つの値を持つ場合に使用してください。
View.asMultimap PCollectionMultiMap として表します。このビューは、副入力が Key-Value ペア(PCollection<K, V>)で構成され、各キーに複数の値がある場合に使用してください。

注: 他のパイプライン データと同様に、PCollectionView は一度作成すると変更できません。

ParDo に副入力を渡す

Java

副入力を ParDo 変換に渡すには .withSideInputs を呼び出します。DoFn の内部で、DoFn.ProcessContext.sideInput メソッドを使用して副入力にアクセスします。

次のサンプルコードは、PCollection<Integer> からシングルトン副入力を作成し、後続の ParDo に渡します。

この例には、個々の語のコレクションを表す words という PCollection<String> と、語の長さを表す PCollection<Integer> があります。後者を使用して、語の最大長の切り捨てをシングルトン値として計算し、計算した値を副入力として ParDo に渡します。ここでは、切り捨て値に基づいて words がフィルタリングされます。

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

副入力とウィンドウ処理

ウィンドウ処理された PCollectionPCollectionView を作成し、そのサイズが無制限であるため 1 つの値(または 1 つのコレクション クラス)に圧縮できない場合、PCollectionViewウィンドウごとに 1 つのエンティティを表します。つまり、PCollectionView は、ウィンドウごとに 1 つのシングルトン、ウィンドウごとに 1 つのリストなどを表します。

Dataflow は主入力要素のウィンドウを使用して、副入力要素の適切なウィンドウを探します。Dataflow は、主入力要素のウィンドウを副入力のウィンドウ セットに投影し、これによって生成されるウィンドウから副入力を使用します。主入力と副入力のウィンドウが同一の場合、投影によって対応する正確なウィンドウが提供されます。ただし、それぞれの入力のウィンドウが異なる場合、Dataflow は投影を使用して、最も適切な副入力ウィンドウを選択します。

Java

たとえば、主入力が 1 分間の固定時間ウィンドウを使用してウィンドウ処理され、副入力が 1 時間の固定時間ウィンドウを使用してウィンドウ処理されている場合、Dataflow は主入力のウィンドウを副入力のウィンドウ セットに対して投影し、適切な 1 時間の副入力ウィンドウから副入力の値を選択します。

副入力に複数のトリガー呼び出しが含まれる場合、Dataflow は最新のトリガー呼び出しの値を使用します。これが特に役立つのは、グローバル ウィンドウが 1 つの副入力を使用し、トリガーを指定する場合です。

副出力

ParDo は常に主出力 PCollection を生成する(apply からの戻り値として)一方で、ParDo に任意の数の追加出力 PCollection を生成させることができます。複数の出力を生成するように選択した場合は、ParDo により出力 PCollection のすべて(主出力を含む)がバンドルされて返されます。たとえば、Java では出力 PCollection が型安全の PCollectionTuple としてバンドルされます。

副出力用のタグ

Java

要素を副出力 PCollection に出力するには、ParDo が生成する各コレクションを識別する TupleTag オブジェクトを作成する必要があります。たとえば、ParDo が 3 つの出力 PCollection(主出力と 2 つの副出力)を生成する場合、3 つの関連する TupleTag を作成する必要があります。

次のサンプルコードでは、主出力と 2 つの副出力を生成する ParDoTupleTag を作成する方法を示します。

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

出力タグを ParDo に渡す

ParDo 出力に TupleTag をそれぞれ指定したら、.withOutputTags を呼び出して、そのタグを ParDo に渡す必要があります。最初に主出力のタグを渡して、TupleTagList 内の副出力のタグを渡します。

次に示すのは、前の例に対して 3 つの TupleTag(主出力に 1 つと副出力に 2 つ)を ParDo に渡す方法です。

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

すべての出力(主出力 PCollection を含む)は、results. という PCollectionTuple にバンドルされて返されることに注意してください。

DoFn で副出力に排出する

Java

ParDoDoFn の内部では、ProcessContext.sideOutput メソッドを使用して要素を副出力に出力できます。ProcessContext.sideOutput を呼び出す際、ターゲット側の出力コレクションに適切な TupleTag を渡す必要があります。

前の例に基づいて、主出力と副出力を排出する DoFn を次に示します。

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

ParDo の後、戻された PCollectionTuple から主出力と副出力 PCollection を抽出する必要があります。タプルから個々の PCollection を抽出する方法の例は、PCollectionTuple に関するセクションをご覧ください。