ParDo による並列処理

ParDo は、Dataflow SDK のコア並列処理オペレーションです。一般的な並列処理で ParDo を使用します。ParDo の処理スタイルは Map/Shuffle/Reduce スタイル アルゴリズムの Mapper クラスでの処理と似ています。ParDo は、入力 PCollection の各要素を受け取り、その要素に対してなんらかの処理を実行し、1 つまたは複数の要素を出力 PCollection に排出します。何も排出しないこともあります。

ParDo が入力 PCollection の各要素に対して実行する関数を指定してください。指定した関数は、Dataflow ジョブの複数のワーカー インスタンスで、独立して並列で呼び出されます。

ParDo は、次のようなさまざまなデータ処理操作で役立ちます。

  • データセットをフィルタリング。 ParDo を使用して、PCollection の各要素を検討し、その要素の新しいコレクションへの出力またはその要素の廃棄を行うことができます。
  • データセット内の各要素の型を書式設定または変換。 ParDo を使用して、PCollection の各要素を書式設定できます。たとえば、Key-Value ペアを印刷可能な文字列に書式設定します。
  • データセット内の各要素の部分を抽出。 ParDo を使用して、PCollection の各要素の一部のみを抽出できます。これは、BigQuery テーブルの行から個々のフィールドを抽出する場合に特に役立ちます。
  • データセット内の各要素の計算を実行。 ParDo を使用して、PCollection のすべての要素または特定の要素に対して、単純な計算や複雑な計算を実行できます。

ParDo は、パイプラインの一般的な中間ステップでもあります。たとえば、ParDo を使用して PCollection の各要素にキーを割り当てて、Key-Value ペアを作成できます。後から GroupByKey 変換を使用してペアをグループ化することができます。

ParDo 変換を適用する

ParDo を使用するには、変換する PCollection に適用し、戻り値を適切な型の PCollection として保存します。

ParDo に指定する引数は、Dataflow SDK によって提供される、DoFn という特定の型のサブクラスであることが必要です。DoFn の詳細については、このセクションの後半にある処理ロジックを作成して指定するをご覧ください。

次のコードサンプルは、文字列の PCollection に基本の ParDo を適用しています。DoFn ベースの関数を渡して各文字列の長さを計算し、文字列の長さを整数の PCollection に出力します。

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

この例で、コードは入力コレクション("words")に対して apply を呼び出します。 ParDoPTransform の引数です。.of オペレーションは、各要素に対して実行する DoFn(このケースでは ComputeWordLengthFn())を指定する場所です。

処理ロジックを作成して指定する

ParDo に指定する処理ロジックは、パイプラインの作成に使用している Dataflow SDK で求められる特定のタイプでなければなりません。

Java

SDK クラス DoFn のサブクラスを構築する必要があります。

指定する関数は、複数の Google Compute Engine インスタンスから、独立して呼び出されます。

また、DoFn が、複数の呼び出し間で不変な永続的状態に依存しないようにしてください。Cloud Platform の処理関数のインスタンスは、その関数の他のインスタンスの状態情報にアクセスできない可能性があります。

注: Dataflow SDK では ParDo のバリアントが提供されます。これを使用すると、不変の永続的データを、ユーザーコードの各呼び出しに副入力として渡せます。

Java

DoFn は、入力 PCollection の要素を一度に 1 つずつ処理します。DoFn のサブクラスを作成するとき、入力要素の型と出力要素の型を型パラメータとして指定します。次のコードサンプルは、入力 String を受け取って出力 Integer を生成する、前の例の ComputeWordLengthFn() 関数の定義方法を示しています。

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

DoFn のサブクラスは、要素を処理するメソッド processElement を上書きする必要があります。入力要素を実際に処理するコードをここに指定してください。次のコードサンプルは完全な ComputeWordLengthFn() です。

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

入力コレクションの要素を手動で抽出する必要はありません。Dataflow Java SDK が、各要素を抽出して DoFn サブクラスに渡します。processElement を上書きするとき、上書きメソッドは型 ProcessContext のオブジェクトを受け取る必要があります。これによって、処理しようとする要素にアクセスすることができます。メソッド ProcessContext.element() を使用して、DoFn に渡された要素にアクセスします。

PCollection の要素が Key-Value ペアの場合は、ProcessContext.element().getKey() を使用してキーに、ProcessContext.element().getValue() を使用して値にアクセスできます。

Java

Dataflow SDK for Java は、出力要素を結果の PCollection に自動的にまとめます。ProcessContext オブジェクトを使用して、processElement の結果要素を出力コレクションに出力します。結果コレクションの要素を出力するには、メソッド ProcessContext.output() を使用します。

軽量 DoFn

Dataflow SDK では、DoFn の実装方法を単純化する言語固有の方法が提供されます。

Java

多くの場合、ParDo に対する単純な DoFn 引数を匿名内部クラス インスタンスとして作成できます。DoFn が数行しかない場合は、インラインに指定する方がわかりやすくなります。次のコードサンプルは、ComputeWordLengthFn 関数を含む ParDo を匿名 DoFn として適用する方法を示しています。

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

関数を入力の各要素に適用して要素ごとに正確に 1 つの出力を生成する、上記のような変換の場合は、高度な MapElements 変換を使用できます。MapElements はラムダ関数を受け取るため、この方法は Java 8 では特に簡潔です。

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

同様に、FilterFlatMapElementsPartition の各変換でも Java 8 ラムダ関数を使用できます。これらの変換について詳しくは、Dataflow SDK の作成済み変換をご覧ください。

変換の名前

パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。グラフで変換を認識できるように、わかりやすい名前を変換に付けることが非常に重要です。

Java

.named オペレーションで、パイプラインのこのステップに対応する変換名を指定します。パイプラインを Dataflow Monitoring Interface で確認すると、変換の名前が実行グラフに表示されます。匿名の DoFn インスタンスを ParDo で使用している場合は明示的な名前を指定することが特に重要です。わかりやすい名前はモニタリング インターフェースのステップですぐに見つけられるためです。

副入力

主入力 PCollection に加え、ParDo 変換にその他の入力を副入力として指定できます。副入力は、DoFn が入力 PCollection の要素を処理するたびにアクセスできる、追加の入力です。副入力を指定すると、他のデータのビューが作成され、各要素の処理中に ParDo 変換の DoFn から読み取ることができます。

副入力が役立つのは、ParDo が入力 PCollection の処理中に追加データを注入しなくてはならないが、追加データを実行時に決定する必要がある場合(ハードコーディングできない場合)です。このような値は、入力データによって決まることもあれば、パイプラインの別のブランチに依存することもあります。たとえば、パイプラインが走行しているときにリモート サービスの値を取得して、それを副入力として使用できます。または、パイプラインの別のブランチで計算される値を使用し、それをもう 1 つのブランチの ParDo の副入力として追加できます。

副入力を表す

Java

副入力の型は常に PCollectionView です。 PCollectionViewPCollection を 1 つのエンティティとして表す手段で、副入力として ParDo に渡すことができます。PCollection を表す PCollectionView は、次のいずれかの型にすることができます。

ビューの型 使用状況
View.asSingleton PCollection を個別の値として表します。通常、Combine.globally を使用して PCollection を結合した後で使用します。副入力が 1 つの計算値である場合に使用してください。一般的には、Combine.globally(...).asSingletonView() を使用してシングルトン ビューを作成する必要があります。
View.asList PCollectionList として表します。このビューは、副入力が個別値のコレクションである場合に使用してください。
View.asMap PCollectionMap として表します。このビューは、副入力が Key-Value ペア(PCollection<K, V>)で構成され、各キーに 1 つの値がある場合に使用してください。
View.asMultimap PCollectionMultiMap として表します。このビューは、副入力が Key-Value ペア(PCollection<K, V>)で構成され、各キーに複数の値がある場合に使用してください。

注: 他のパイプライン データと同様に、PCollectionView は一度作成すると変更できません。

ParDo に副入力を渡す

Java

入力を ParDo 変換に渡すには .withSideInputs を呼び出します。DoFn 内では、メソッド DoFn.ProcessContext.sideInput を使用して副入力にアクセスします。

次のコードサンプルは、PCollection<Integer> からシングルトン副入力を作成して、後続の ParDo に渡します。

この例には、個々の語のコレクションを表す words という PCollection<String> と、語の長さを表す PCollection<Integer> があります。後者を使用して、語の最大長の切り捨てをシングルトン値として計算し、計算した値を副入力として ParDo に渡します。ここでは、切り捨て値に基づいて words がフィルタリングされます。

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

副入力とウィンドウ処理

ウィンドウ処理された PCollectionPCollectionView を作成し、そのサイズが無制限であるため 1 つの値(または 1 つのコレクション クラス)に圧縮できない場合、PCollectionViewウィンドウごとに 1 つのエンティティを表します。つまり、PCollectionView は、ウィンドウごとに 1 つのシングルトン、ウィンドウごとに 1 つのリストなどを表します。

Dataflow は主入力要素のウィンドウを使用して、副入力要素の適切なウィンドウを探します。Dataflow は、主入力要素のウィンドウを副入力のウィンドウ セットに投影し、これによって生成されるウィンドウから副入力を使用します。主入力と副入力のウィンドウが同一の場合、投影によって対応する正確なウィンドウが提供されます。ただし、それぞれの入力のウィンドウが異なる場合、Dataflow は投影を使用して、最も適切な副入力ウィンドウを選択します。

Java

たとえば、主入力が 1 分間の固定時間ウィンドウを使用してウィンドウ処理され、副入力が 1 時間の固定時間ウィンドウを使用してウィンドウ処理されている場合、Dataflow は主入力のウィンドウを副入力のウィンドウ セットに対して投影し、適切な 1 時間の副入力ウィンドウから副入力の値を選択します。

副入力に複数のトリガー呼び出しが含まれる場合、Dataflow は最新のトリガー呼び出しの値を使用します。これが特に役立つのは、グローバル ウィンドウが 1 つの副入力を使用し、トリガーを指定する場合です。

副出力

ParDo では常に主出力の PCollection が(apply の戻り値として)生成されますが、ParDo で任意の数の追加出力 PCollection を生成することもできます。複数の出力を生成するように選択した場合は、ParDo により出力 PCollection のすべて(主出力を含む)がバンドルされて返されます。たとえば、Java では出力 PCollection が型安全の PCollectionTuple としてバンドルされます。

副出力用のタグ

Java

副出力の PCollection に要素を排出するには、TupleTag オブジェクトを作成して ParDo によって生成される各コレクションを指定する必要があります。たとえば、ParDo によって 3 つの出力 PCollection(主出力と 2 つの副出力)が生成される場合、3 つの関連した TupleTag を作成する必要があります。

次のコードサンプルでは、主出力と 2 つの副出力を生成する ParDo のために TupleTag を作成する方法を示します。

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

出力タグを ParDo に渡す

ParDo 出力に TupleTag をそれぞれ指定したら、.withOutputTags を呼び出して、そのタグを ParDo に渡す必要があります。最初に主出力のタグを渡して、TupleTagList 内の副出力のタグを渡します。

次に示すのは、前の例に対して 3 つの TupleTag(主出力に 1 つと副出力に 2 つ)を ParDo に渡す方法です。

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

すべての出力(主出力 PCollection を含む)が、results. という PCollectionTuple にバンドルされて返されることに注意してください。

DoFn で副出力に排出する

Java

メソッド ProcessContext.sideOutput を使用すると、ParDoDoFn 内で要素を副出力に排出できます。ProcessContext.sideOutput を呼び出すときに、対象の副出力コレクションの適切な TupleTag を渡す必要があります。

前の例に基づいて、主出力と副出力を排出する DoFn を次に示します。

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

ParDo の後で、返される PCollectionTuple から、結果として生成された主出力と副出力の PCollection を抽出する必要があります。タプルから個々の PCollection を抽出する方法の例は、PCollectionTuple に関するセクションをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。