GroupByKey と結合

GroupByKey コア変換は、キー / 値ペアのコレクションを処理するために使用される並行リダクション オペレーションです。GroupByKey は、マルチマップを表すキー / 値ペアの入力 PCollection(キーは同じだが、値が異なるペアが複数含まれるコレクション)に対して使用します。GroupByKey 変換を使用すると、同じキーを共有するマルチマップ内のすべての値を 1 つにまとめることができます。

GroupByKey は、Map/Shuffle/Reduce スタイル アルゴリズムの Shuffle フェーズに似ています。GroupByKey は、一意のキーに関連付けられたすべての値を収集するために使用します。

GroupByKey は、なんらかの共通点を持ったデータを集約するのに適しています。たとえば、同じ郵便番号のお客様からの注文をグループ化したい場合などに使用できます(その場合、各注文の郵便番号がキーで、その他の注文情報が値になります)。あるいは、すべてのユーザーからの問合せをユーザー ID ごとにグループ化したり、問合せの発生日時ごとにグループ化する場合にも使用できます。

また、キー / 値ペアの複数のコレクションで共通のキーが使用されている場合は、(値の型が違っていても)それらのコレクションを結合できます。結合を実行するには、2 つの方法があります。1 つ目の方法は、CoGroupByKey 変換を使用する方法です。これにより、同じキーを共有する複数のコレクション内の(任意の型の)すべての値をグループ化できます。もう 1 つの方法は、ParDo と 1 つ以上副入力を使用して、キー / 値ペアの複数のコレクションを結合する方法です。場合によっては、この 2 番目の方法を使用するほうが、CoGroupByKey を使用するよりも効率的な場合があります。

結合は、相互に関連する情報を含んだ複数のデータセット(複数のソースから取得された場合を含む)がある場合に便利です。たとえば、オークション データを保存した 2 つのファイルがあり、1 つ目のファイルには、オークション ID、入札データ、および価格データが記録されていて、もう 1 つのファイルにはオークション ID と商品説明が含まれているとします。その場合、オークション ID を共通のキーとして使用し、その他の添付データを関連値として使用することで、2 つのデータセットを結合できます。結合後は、各オークション ID に関連付けられたすべての情報(入札、価格、および商品)が 1 つのデータセットにまとめられます。

GroupByKey

ここでは、シンプルなサンプルケースを使用して、GroupByKey の仕組みについて説明します。使用するデータセットは、テキスト ファイルから取得した単語と、それらが出現する行番号で構成されています。この例では、同じ単語(キー)を共有するすべての行番号(値)をグループ化して、特定の単語がテキスト内のどこで出現するかをすべて確認できるようにします。

入力はキー / 値ペアの PCollection です。各単語がキーで、値はファイル内で単語が出現する行番号です。以下は、使用する入力コレクション内のキー / 値ペアの一覧です。

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

GroupByKey 変換は、同じキーを持つすべての値を収集して新しいペアを作成します。このペアは、一意のキーと、入力 PCollection 内でそのキーに関連付けられたすべての値のコレクションで構成されます。上記のキー / 値ペアのコレクションに対して GroupByKey を適用した場合、出力は次のようになります。

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

このように、GroupByKey ではマルチマップ(複数のキーから個別の値へのマップ)をユニマップ(一意のキーから値コレクションへのマップ)に変換できます。

Java

キー / 値ペアの表現に関する注記
Dataflow Java SDK では、キー / 値ペアは KV<K, V> 型のオブジェクトを使用して表されます。KV は、K 型のキーと V 型の値を持つ特殊クラスです。

キーでグループ化されたコレクション(GroupByKey 変換の結果)に対する一般的な処理パターンは、各キーに関連付けられた値を、そのキーに関連付けられた単一のマージ値に結合することです。Dataflow SDK では、このパターン全体(キーでグループ化した後、各キーの値を結合する操作)が Combine 変換としてカプセル化されています。詳細については、コレクションと値の結合を参照してください。

GroupByKey 変換の適用

GroupByKey は、キー / 値ペアの入力 PCollection に対して適用する必要があります。GroupByKey を適用すると、キー / 値ペアの新しい PCollection が返されます。新しいコレクションでは、各キーが一意であり、それらに関連付けられた各値は(実際には)、キーに関連付けられた 1 つ以上の値を含む値ストリームになります。

Java

次のサンプルコードは、GroupByKeyKV<K, V> オブジェクトの PCollection に適用する方法を示したものです。各要素ペアは、K 型のキーと V 型の単一値を表しています。

GroupByKey の戻り値は、KV<K, Iterable<V>> 型の新しい PCollection です。各要素ペアは、キーと値コレクション(Java Iterable)を表しています。

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
Java 8 では GroupByKey.create のパラメータ型を推定できますが、旧バージョンの Java では、それらを明示的に指定しなければならない場合があります。

GroupByKey とウィンドウ処理

Java

入力 PCollection が単一のグローバル ウィンドウではなく、複数のウィンドウに分割されている場合、GroupByKey は通常と少し異なる動作をします。

また、GroupByKey 変換はリダクションの実行時に、各要素が属するウィンドウを考慮に含めます。ウィンドウ(各キー / 値ペアのタイムスタンプによって決定されます)は、事実上セカンダリキーとして機能します。したがって、GroupByKey とウィンドウ処理を併用した場合、キーとウィンドウの両方によってグループ化が実行されます。すべての要素が単一グローバル ウィンドウの一部である場合、GroupByKey上記のシンプルなセマンティクスで操作を実行します。

要素のウィンドウはグループ化のセカンダリキーとして機能しますが、場合によっては、より強力な機能を発揮することもあります。要素が複数のウィンドウに属していて、重複するウィンドウがマージされる場合です。その場合は、さらに複雑なグループ化を処理できます。

以下は、前述のサンプルにウィンドウ処理を適用した場合の例です。

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey は、同じキーとウィンドウを持つすべての要素を収集し、次のような出力コレクションを生成します。

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

ウィンドウが出力グループに影響していることに注目してください。異なるウィンドウ内のキー / 値ペアは、同じグループに分類されていません

CoGroupByKey を使用した結合

CoGroupByKey 変換は、2 つ以上のデータセットのリレーショナル結合を実行します。CoGroupByKey は、入力内の各 PCollection が同じキーのタイプを持つ場合に、キー / 値ペアの複数の PCollection から値を取得し、グループ化します。

Java

Dataflow では、型安全を確保するため、各 PCollectionKeyedPCollectionTuple の一部として渡す必要があります。CoGroupByKey に渡す各入力 PCollectionTupleTag を宣言する必要があります。

CoGroupByKey は、結合後の出力を CoGbkResult オブジェクト内にバンドルします。

ここでは、シンプルなサンプルを使用して、CoGroupByKey の仕組みについて説明します。この例では、2 つの入力コレクションを KeyedPCollectionTuple 内にバンドルします。1 つ目の PCollection<K, V1> については、TupleTag<V1>tag1 として割り当てます。この PCollection 内のキー / 値ペアは次のとおりです。

  key1 &map; v1
  key2 &map; v2
  key2 &map; v3

2 つ目の PCollection<K, V2> については、TupleTag<V2>tag2 として割り当てます。このコレクションの内容は次のとおりです。

  key1 &map; x1
  key1 &map; x2
  key3 &map; x3

結果の CoGbkResult コレクションには、すべての入力コレクションから取得されたそれぞれの一意キーに関連付けられた、すべてのデータが含まれます。返されるデータ型は PCollection<KV<K, CoGbkResult>> で、内容は次のとおりです。

  key1 -> {
    tag1 &map; [v1]
    tag2 &map; [x1, x2]
  }
  key2 -> {
    tag1 &map; [v2, v3]
    tag2 &map; []
  }
  key3 -> {
    tag1 &map; []
    tag2 &map; [x3]
  }

CoGroupByKey の適用後は、適切な TupleTag を使用することで各コレクション内のデータを参照できます。

CoGroupByKey の適用

Java

CoGroupByKey は、キーの付いた PCollectionPCollection<KV<K, V>>)のタプルを入力として受け取ります。その後 CoGroupByKey は、すべての入力 PCollection 内の値を共通のキーでグループ化した、CoGbkResults という特殊な型を出力として返します。複数のコレクションの場合には、TupleTag メカニズムを使用して CoGbkResults のインデックスを作成します。 CoGbkResults オブジェクト内の特定のコレクションにアクセスするには、当初のコレクションに付けられた TupleTag を使用します。

次のサンプルは、個別の PCollection に読み込まれた 2 つのデータセット(通常、異なるソースからのもの)を結合する場合の例です。

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

結果の PCollection<KV<K, CoGbkResult>> では、各キー(すべて K 型)に異なる CoGbkResult が関連付けられます。つまり、CoGbkResultTupleTag<T> から Iterable<T> へのマップだと言えます。

次のサンプルは、CoGroupByKey を使用した後、結果の CoGbkResultParDo で消費する場合の例です。たとえば、ParDo ではその後の処理のためにデータをフォーマットできます。

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

CoGroupByKey と無限 PCollection

Java

ウィンドウ処理戦略が適用された PCollectionCoGroupByKey でグループ化する場合、グループ化するすべての PCollection で同じウィンドウ処理戦略とウィンドウ サイズ設定が使用されている必要があります。たとえば、マージするすべてのコレクションに、5 分間の固定ウィンドウや、30 秒ごとに開始される 4 分間のスライディング ウィンドウなど、(仮定上)同一の設定を適用する必要があります。

パイプラインが CoGroupByKey を使用して PCollection をマージしようとする際、互換性のないウィンドウが含まれていると、Dataflow はパイプラインの作成時に IllegalStateException エラーを生成します。

ParDo と副入力を使用した結合

CoGroupByKey を使用する代わりに、1 つ以上の副入力を使用した ParDo を適用して結合を実行できます。副入力とは、PCollection に対して指定できる、主入力以外の追加入力のことです。DoFn は、入力 PCollection 内の要素を処理するたびに副入力にアクセスできます。

次のような場合には、この方法を使用するほうが、CoGroupByKey を使用するよりも効率的な場合があります。

  • 結合する PCollection のサイズが不均一で、小さい PCollection のほうがメモリにフィットする可能性がある。
  • パイプライン内の複数の場所で複数回結合したい、大きなテーブルがある。その場合、CoGroupByKey を複数回実行する代わりに、副入力を 1 つ作成し、それを複数の ParDo に渡すことができます。たとえば、2 つのテーブルを結合して第 3 のテーブルを生成した後、1 つ目のテーブルを第 3 のテーブルと結合し、その後同様の操作を繰り返すといったことが可能です。

この方法で結合を行うには、いずれかの PCollectionParDo を適用し、その他の PCollection を副入力として渡します。次のサンプルコードは、この種の結合の実行方法を示したものです。

Java

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。