複数の PCollection を処理する

一部の Dataflow SDK 変換では、複数の PCollection オブジェクトを入力として受け取ることや、複数の PCollection オブジェクトを出力として生成することが可能です。Dataflow SDK では、複数の PCollection オブジェクトをバンドルするいくつかの方法が用意されています。

同じデータ型を格納する PCollection オブジェクト用に、Dataflow SDK には Flatten 変換または Partition 変換も用意されています。Flatten は複数の PCollection オブジェクトをマージして 1 つの論理的な PCollection にし、Partition は 1 つの PCollection を固定数の小さなコレクションに分割します。

同じ型を格納する PCollection

Java

すべて同じデータ型を格納する複数の PCollection オブジェクト(Java の PCollection<String> など)は、クラス PCollectionList を使用してカプセル化できます。

PCollectionList は同じデータ型(StringIntegerなど)を格納するオブジェクト PCollectionのコレクションを表します。たとえば、Flatten 変換は PCollectionList を受け取り、すべてのコレクション内のすべての要素を 1 つの論理的な PCollection に結合します。

同様に、Partition 変換は、パーティショニング機能(入力のパーセンタイル グループへの分割など)に基づき、1 つの PCollection を分割して複数の PCollection オブジェクトを含む PCollectionList を生成します。

次のサンプルコードは、String 要素を含む個別の PCollection オブジェクトから PCollectionList を生成する方法を示したものです。

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

次のコードサンプルのように、PCollectionList 内の個別の PCollection オブジェクトにインデックスごとにアクセスできます。インデックスはゼロ始まりです。

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

PCollectionList のすべての PCollection オブジェクトは同じデータ型を含む必要がありますが、データ エンコーディングは同じでなくてもかまいません。PCollectionList に含まれる 2 つの PCollection<Integer> オブジェクトが、異なるコーディング方法(ビッグ エンディアンとリトル エンディアンなど)を使ってもかまいません。

異なる型を格納する PCollection

一部の高度な変換は、複数の入力を受け取って、さまざまな型の複数の出力を生成します。たとえば、副出力を含む ParDo 変換は、さまざまなデータ型を含む複数の PCollection オブジェクトを生成できます。

Java

Dataflow Java SDK は、タグ付きタプルシステムを使用して PCollection オブジェクトのコレクションを表します。このようなタグ付きタプルは型の安全性維持に役立ちます。PCollection オブジェクトは PCollectionTuple クラスに含まれます。タプル内の PCollection ごとに、関連する TupleTag を作成する必要があります。TupleTag によって、PCollectionTuple の中の PCollection のそれぞれをインデックス登録、取得、識別できます。

注: 異なる型(PCollection<String>PCollection<Integer> など)が格納されている可能性のある一連の PCollection オブジェクトを表す場合、PCollectionTuple を使用する必要があります。PCollection オブジェクトすべてに同じデータ型が含まれる場合は、PCollectionList の使用を検討してください。

Dataflow Java SDK では各 PCollectionTupleTupleTag によってキーが指定されます。TupleTag は、異なる型を含むタプルをインデックス付けするために Dataflow Java SDK に含まれるクラスです。異なる型を含むタプルを変換の入力と出力として使用するとき、タプルクラスと TupleTag オブジェクトの組み合わせによって、適切な型の安全性が提供されます。

PCollectionTuple を作成する際は、タプル内の各 PCollectionTupleTag も作成する必要があります。各 TupleTag の型は、タプル内の各 PCollection の型と一致する必要があります。

TupleTag 型を使用すると、タプル内の各 PCollection の静的な型を追跡できます。

次のサンプルコードは、それぞれが StringIntegerIterable<String> の値を含む 3 つの PCollection オブジェクトを含む PCollectionTuple を生成する方法を示したものです。

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

特定の PCollection をタプルから抽出するには、タプルを生成したときにコレクションに使用した TupleTag を渡して、メソッド PCollectionTuple.get を使用します。

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

空のタプルを作成する必要がある場合は、メソッド PCollectionTuple.empty を使用できます。このメソッドによって、所定のパイプラインに関連付けられた空のタプルが作成されます。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

PCollectionTuple.getAll メソッドを使用して、タプル内のすべての PCollection オブジェクトの Map を、関連付けられた TupleTag オブジェクトと一緒に取得できます。

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

PCollectionTuple.has メソッドを使用して、タプルが特定の TupleTag に関連付けられた PCollection を含むかを確認できます。

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Flatten を使用して PCollection をマージする

パイプラインに同じデータ型を含む複数の PCollection オブジェクトがある場合は、それらをFlatten 変換を使用して単一の論理的 PCollection にマージしてください。

Flatten 変換を適用する

Java

Flatten は、任意の数の特定の型の PCollection オブジェクトの PCollectionList を取得し、リストの PCollection オブジェクトのすべての要素を含む単一の PCollection を返します。

次のサンプルコードは、Flatten 変換を apply して複数の PCollection<String> オブジェクトを単一の PCollection<String> オブジェクトにマージする方法を示したものです。この例では、最初に、マージされるすべての PCollection オブジェクトを含む PCollectionList を生成します。

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

Java 7 では、汎用ファクトリ メソッド Flatten.pCollections を使用する場合、各入力 PCollection が保持する要素の型に対応する型パラメータを指定する必要があります。

マージされたコレクションのデータ エンコーディング

デフォルトでは、出力 PCollectionコーダーは、入力 PCollectionList の最初の PCollection のコーダーと同じです。ただし、入力 PCollection オブジェクトそれぞれは異なるコーダーを使用できます(オブジェクトすべてが選択した言語で同一のデータ型を含む場合)。

ウィンドウ処理されたコレクションをマージする

Flatten を使用して、ウィンドウ処理戦略が適用された PCollection オブジェクトをマージする場合、マージ対象のすべての PCollection オブジェクトで「互換性のあるウィンドウ戦略」とウィンドウ サイズを使用する必要があります。たとえば、マージしようとするすべてのコレクションは、同一の 5 分間の固定ウィンドウか、30 秒ごとに開始する 4 分間のスライド ウィンドウを使用する必要があります。

パイプラインが Flatten を使用して PCollection オブジェクトをマージしようとする際、互換性のないウィンドウが含まれていると、Dataflow はパイプラインの作成時に IllegalStateException エラーを生成します。

Partition を使用して PCollection を分割する

Partition コア変換を使用して、1 つの論理 PCollection の要素を「N 個の」パーティションに分割できます。その結果の個々のパーティションは PCollection であり、これらのパーティションは、PCollection オブジェクトのリストとして一緒にバンドルされます。

Partitionを使用すると、単一の PCollection をパーセンタイル グループなどの論理グループに分割できます。これが役立つのは、異なるパーセンタイル グループごとにパイプラインが別の処理を実行する必要がある場合などです。

Partition 変換を適用する

Partition は、指定するパーティショニング機能に応じて PCollection の要素を分割します。パーティショニング機能に含まれるロジックにより、入力 PCollection の要素を、結果の各パーティション PCollection に分割する方法が決まります。

注: パーティションの数はグラフ作成時に決定する必要があります。たとえば、実行時にコマンドライン オプションとしてパーティション数を渡すことができます(これがパイプライン グラフの作成に使用されます)。ただし、パイプラインの中ではパーティション数を判別できません(たとえば、パイプライン グラフの作成後に計算されたデータに基づくため)。

Java

次のサンプルコードは、Student 型のオブジェクト PCollection をパーセンタイル グループに分割します。

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;
          }}));

Partition 変換を引数として apply に渡す際には、要求した結果のパーティションの数と、パーティショニング機能を表す PartitionFn と一緒に、int の値を提供する必要があります。この例では、PartitionFn インラインを定義しています。

apply からの戻り値は、個々の PCollection オブジェクトとして、それぞれの結果のパーティションを含む PCollectionList です。次のとおり、get メソッドによって、PCollectionList から各パーティションを抽出できます。

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);