複数の PCollection を処理する

一部の Dataflow SDK 変換では、複数の PCollection オブジェクトを入力として受け取ったり、複数の PCollection オブジェクトを出力として生成したりできます。Dataflow SDK では、複数の PCollection オブジェクトをバンドルするいくつかの方法が用意されています。

同じデータ型を格納する PCollection オブジェクト用に、Dataflow SDK には Flatten 変換または Partition 変換も用意されています。 Flatten は複数の PCollection オブジェクトをマージして 1 つの論理的な PCollection にし、Partition は 1 つの PCollection を固定数の小さなコレクションに分割します。

同じ型を格納する PCollection

Java

すべて同じデータ型を格納する複数の PCollection オブジェクト(Java の PCollection<String> など)は、クラス PCollectionList を使用してカプセル化することができます。

PCollectionList は、同じデータ型(String または Integer など)を格納する PCollection オブジェクトのコレクションです。たとえば、Flatten 変換は PCollectionList を受け取り、すべてのコレクション内のすべての要素を 1 つの論理的な PCollection に結合します。

同様に、Partition 変換は、パーティショニング機能(入力のパーセンタイル グループへの分割など)に基づき、1 つの PCollection を分割して複数の PCollection オブジェクトを含む PCollectionList を生成します。

次のコードサンプルは、String 要素を含む個々の PCollection オブジェクトから PCollectionList を作成する方法を示します。

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollectionList 内の個々の PCollection オブジェクトにアクセスするには、次のコードサンプルに示すようにインデックス(0 から始まる)を使用します。

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

PCollectionList のすべての PCollection オブジェクトは同じデータ型を含む必要がありますが、データ エンコーディングは同じでなくてもかまいません。PCollectionList に含まれる 2 つの PCollection<Integer> オブジェクトが、異なるコーディング方法(ビッグ エンディアンとリトル エンディアンなど)を使ってもかまいません。

異なる型を格納する PCollection

一部の高度な変換は、複数の入力を受け取って、さまざまな型の複数の出力を生成します。たとえば、副出力を含む ParDo 変換は、さまざまなデータ型を含む複数の PCollection オブジェクトを生成することができます。

Java

Dataflow Java SDK は、タグ付きタプルシステムを使用して PCollection オブジェクトのコレクションを表します。このようなタグ付きタプルは型の安全性を維持するために役立ちます。PCollection オブジェクトは PCollectionTuple クラスに含まれます。タプル内の PCollection ごとに、関連する TupleTag を作成する必要があります。TupleTag を使用して、PCollectionTuple 内の各 PCollection のインデックス付け、取得、特定を行います。

注: 異なる型(PCollection<String>PCollection<Integer> など)が格納されている可能性のある一連の PCollection オブジェクトを表す場合、PCollectionTuple を使用する必要があります。PCollection オブジェクトすべてに同じデータ型が含まれる場合は、PCollectionList の使用を検討してください。

Dataflow Java SDK では各 PCollectionTupleTupleTag によってキーが指定されます。 TupleTag は、異なる型を含むタプルをインデックス付けするために Dataflow Java SDK に含まれるクラスです。異なる型を含むタプルを変換の入力と出力として使用するとき、タプルクラスと TupleTag オブジェクトの組み合わせによって、適切な型の安全性が提供されます。

PCollectionTuple を作成するとき、タプルに含まれる PCollection ごとに TupleTag も作成する必要があります。各 TupleTag の型は、タプル内の各 PCollection の型と一致する必要があります。

TupleTag 型を使用すると、タプル内の各 PCollection の静的な型を追跡できます。

次のコードサンプルでは、3 つの PCollection オブジェクトを含む PCollectionTuple の作成方法を示しています。オブジェクトにそれぞれ StringIntegerIterable<String> の値が含まれます。

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

タプルから特定の PCollection を抽出するには、メソッド PCollectionTuple.get を使用して、タプルを作成したときにそのコレクションで使用した TupleTag を渡します。

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

空のタプルを作成する必要がある場合は、メソッド PCollectionTuple.empty を使用できます。このメソッドによって、所定のパイプラインに関連付けられた空のタプルが作成されます。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

メソッド PCollectionTuple.getAll を使用して、タプル内のすべての PCollection オブジェクトの Map と、関連付けられた TupleTag オブジェクトを一緒に取得できます。

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

メソッド PCollectionTuple.has を使用すると、所定の TupleTag に関連付けられた PCollection がタプルに含まれるかどうかをチェックできます。

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Flatten を使用して PCollection をマージする

同じ型を含む複数の PCollection オブジェクトがパイプラインにある場合、Flatten 変換を使用してそれらをマージして 1 つの論理的な PCollection にすることができます。

Flatten 変換を適用する

Java

Flatten は、所定の型の PCollection を任意の数だけ含む PCollectionList を受け取り、そのリスト内の PCollection オブジェクトのすべての要素を含む 1 つの PCollection を返します。

次のコードサンプルは、Flatten 変換の apply を実行して、複数の PCollection<String> オブジェクトをマージして 1 つの PCollection<String> にする方法を示します。この例では、最初に、マージする PCollection オブジェクトすべてを含む PCollectionList を作成します。

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

Java 7 では、汎用ファクトリ メソッド Flatten.pCollections を使用するときは、各入力 PCollection に含まれる要素の型に対応する型パラメータを指定する必要があります。

マージされたコレクションのデータ エンコーディング

デフォルトでは、出力 PCollectionコーダーは、入力 PCollectionList の最初の PCollection のコーダーと同じです。ただし、入力 PCollection オブジェクトそれぞれは異なるコーダーを使用できます(オブジェクトすべてが選択した言語で同一のデータ型を含む場合)。

ウィンドウ処理されたコレクションをマージする

Flatten を使用して、ウィンドウ処理戦略が適用された PCollection オブジェクトをマージするときは、マージ対象のすべての PCollection オブジェクトに「互換性のあるウィンドウ戦略」とウィンドウ サイズを使用する必要があります。たとえば、マージしようとするすべてのコレクションは、同一の 5 分間の固定ウィンドウか、30 秒ごとに開始する 4 分間のスライド ウィンドウを使用する必要があります。

パイプラインが Flatten を使用して、互換性のないウィンドウの PCollection オブジェクトをマージしようとすると、パイプラインが作成されたときに Dataflow によって IllegalStateException エラーが生成されます。

Partition を使用して PCollection を分割する

Partition コア変換を使用して、1 つの論理 PCollection の要素を「N 個の」パーティションに分割できます。生成される各パーティションは PCollection になり、これらのパーティションは PCollection オブジェクトのリストとしてバンドルされます。

Partition を使用して、1 つの PCollection を論理グループ(パーセンタイル グループなど)に分割することができます。これが役立つのは、異なるパーセンタイル グループごとにパイプラインが別の処理を実行する必要がある場合などです。

Partition 変換を適用する

Partition は、指定するパーティショニング機能に応じて PCollection の要素を分割します。パーティショニング機能に含まれるロジックにより、入力 PCollection の要素を、結果の各パーティション PCollection に分割する方法が決まります。

注: パーティションの数はグラフ作成時に決定する必要があります。たとえば、実行時にコマンドライン オプションとしてパーティション数を渡すことができます(これがパイプライン グラフの作成に使用されます)。ただし、パイプラインの中ではパーティション数を判別できません(たとえば、パイプライン グラフの作成後に計算されたデータに基づくため)。

Java

次のコードサンプルは、型が Student の オブジェクトの PCollection をパーセンタイル グループに分割します。

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;
          }}));

Partition 変換を引数として apply に渡すとき、必要な結果パーティション数を含む int 値と、パーティショニング機能を表す PartitionFn を指定する必要があります。この例では、PartitionFn インラインを定義しています。

apply の戻り値は、結果パーティションそれぞれを個別の PCollection オブジェクトとして含む PCollectionList です。次のように、get メソッドを使用すると PCollectionList から各パーティションを抽出できます。

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。