コレクションと値の結合

パイプラインの中では、データ内の値のコレクションを結合したり、マージする必要が生じたりすることがよくあります。たとえば、特定の月の受注額(ドル値)で構成された売上データのコレクションがあるとします。その場合、パイプライン内ですべての受注値を 1 つの値に結合して、その月の合計受注額、最大受注額、または平均受注額を表す必要が生じることもあるでしょう。このようなデータを取得するには、コレクション内の値を結合します。

Dataflow SDK には、パイプラインの PCollection オブジェクト内の値を結合したり、キーでグループ化された値を結合したりするために使用できるオペレーションが複数用意されています。

Combine コア変換は、PCollection 内の要素や値を結合するために使用できるさまざまな汎用メソッドをカプセル化します。Combine には、PCollection 全体に対して機能するバリアントや、Key-Value ペアの PCollection に含まれる個別の値ストリームを結合するバリアントがあります。また Combine には、特定の数値結合演算(合計値、最小値、最大値、平均値など)に対応するサブクラスもあります。

Combine 変換を適用する際には、要素や値を結合するための実際のロジックを含んだ関数を提供する必要があります。詳細については、このセクションで後述する結合関数の作成と指定を参照してください。

この意味で、Combine 変換はデベロッパーが提供した処理関数内のロジックを各要素に適用する ParDo 変換に似ています。

PCollection を単一値に結合する

特定の PCollection に含まれるすべての要素を 1 つの値に結合して、1 つの要素を含んだ新しい PCollection としてパイプライン内で表すことができます。PCollection の要素を結合するには、グローバル結合変換を使用します。

Combine は、入力 PCollectionウィンドウ処理を使用して分割されている場合、通常と異なる動作をします。その場合、グローバル結合はウィンドウごとに単一の要素を返します。

グローバル結合は、デベロッパーが指定した結合関数を使用して各要素値を結合します。詳細については、このセクションで後述する結合関数の作成と指定を参照してください。

Dataflow SDK には、一般的な数値結合オペレーション(合計値、最小値、最大値、平均値など)に対応する、事前構築済みの結合関数が用意されています。これらの関数をグローバル結合で使用すると、独自の結合関数を作成する手間が省けます。詳細については、付属の統計結合オペレーションの使用を参照してください。

Java

次のサンプルコードは、Combine.globally 変換を apply して、Integer 型の PCollection の単一合計値を生成する方法を示したものです。

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

この例での Sum.SumIntegerFn() は、変換で入力 PCollection 内の要素を結合するために使用する CombineFn です。結果の PCollectionsum)には 1 つの値が格納されます。これは、入力 PCollection 内のすべての要素の合計値です。

グローバルなウィンドウ処理

入力 PCollection でデフォルトのグローバル ウィンドウ処理を使用する場合、Dataflow はデフォルトの動作として、1 つの項目を含んだ PCollection を返します。その項目の値は、Combine の適用時に結合関数内に指定したアキュムレータから取得されます。たとえば、Dataflow の Sum 結合関数はゼロ値(空値の合計)を返し、Min は最大値または無限値を返します。

入力が空の場合に Combine が空の PCollection を返すようにするには、Combine 変換を適用する際に .withoutDefaults を指定します。以下はそのサンプルコードです。

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

非グローバルのウィンドウ処理

PCollection非グローバルのウィンドウ処理関数を使用する場合、Dataflow はデフォルトの動作を行いません。したがって、Combine を適用する際に、次のいずれかのオプションを指定することが必要となります。

  • .withoutDefaults を指定する。この場合、入力 PCollection 内の空のウィンドウは、出力コレクションでも空になります。
  • .asSingletonView を指定する。この場合、出力は直ちに PCollectionView へと変換されます。これは、それぞれの空ウィンドウが副入力として使用される場合のデフォルト値になります。通常、このオプションは、パイプラインの Combine の結果が後にパイプライン内で副入力として使用される場合にのみ、使用する必要があります。

キーでグループ化されたコレクション内の値を結合する

キーでグループ化されたコレクションを(GroupByKey 変換を使用するなどして)作成した後には、各キーに関連付けられた値のコレクションを 1 つの値に結合するのが一般的なパターンです。

前述の GroupByKey の例で言うと、キーでグループ化された PCollectiongroupedWords)は次のような内容でした。

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

上記の PCollection では、各要素に文字列キー(「cat」など)と、その値の整数のイテラブル(1 つ目の要素では [1, 5, 9])が含まれており、各整数は、元のテキスト内でのキーの出現回数を表しています。パイプラインの次の処理ステップで値が(個別に考慮されるのではなく)結合される場合は、整数のイテラブルを結合して、各キーとペアになる単一の結合値を作成できます。

値の結合方法を示す実際のロジックは、デベロッパーが指定します。たとえば、キーと整数値をキーでグループ化したコレクションの場合は、各キーに関連付けられた整数のコレクションを合計するよう指定することもできます(これは、値が出現回数を表す場合などに便利です)。このサンプルでは値が行番号を表しているので、最小値を使用して、初回出現箇所を示すことができます。

Java

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

結合 PerKey の使用

Dataflow SDK では、上記のパターン全体を GroupByKey で表し、値のコレクションをマージすることを、Combine PerKey 変換として表します。

Combine PerKey では、このパターンの両方のステップを実行します。これを GroupByKey の場合と同様に、Key-Value ペアの PCollectionapply します。Combine PerKey 変換はその後、デベロッパーが指定した結合関数を、各キーに関連付けられたすべての値のセットに適用します。結合オペレーションでは、Key-Value ペアの新しい PCollection が生成され、一意のキーと、各キーに対応する結合値が格納されます。

Combine PerKey は、オペレーションの一部として GroupByKey 変換を実行します。そのため、PCollectionウィンドウ処理を使用して分割される場合には、GroupByKey と同様の動作をします。つまり、Combine PerKey は per key とウィンドウを結合するものと言えます。

この種の関数(つまり、コレクションをキーでグループ化し、値と結合する関数)は、ParDo を作成して実行することもできますが、CombineFn 変換ではより構造化されたセマンティクスが使用されるため、Cloud Dataflow サービスでの結合オペレーションをより効率的に実行できます。

Combine PerKey に提供する結合関数は、結合的減少関数か、CombineFn のサブクラスである必要があります。CombineFn の作成方法について詳しくは、このセクションで後述する結合関数の作成と指定をご覧ください。

Java

Combine.perKey を使用する際には、入力 PCollection 内のキーと値の型に応じて型パラメータを渡し、出力 PCollection 内の結合値が入力 PCollection 内の値と異なる型である場合には、追加の型パラメータを渡します。

また、各値コレクションに適用する結合ロジックを含んだ結合関数を、Combine.perKey に渡す必要があります。通常、結合関数はアウト オブ ラインで定義します。

次のサンプルコードは、Combine.perKey 変換の apply 方法を示したものです。このサンプルでは、入力 PCollection がキーによってグループ化され、各キーに関連付けられた Double 値のコレクションが単一の合計値に結合されます。

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Combine.perKey は 3 つの型パラメータを取っています。String はキーのタイプ用で、Double は値用です。これは、各キーの値のコレクションが Double 型で、結果の結合値も Double 型であるためです。

次のサンプルコードは、結合値の型がキーごとの値の当初のコレクションとは異なる場合に、Combine.perKey 変換を apply する方法を示しています。この例では、入力 PCollectionString 型のキーと Integer 型の値が含まれ、結合値は、キーごとの値全体の平均値を表す Double です。

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

Java では、CombineFn の型からこれらの型パラメータを推定できることが少なくありません。Java 8 では、以前のバージョンの Java よりもこの推定機能が改善されています。

結合関数の作成と指定

Combine 変換を使用する際には、バリアントにかかわらず、複数の要素を 1 つの値に結合する方法を指定した処理ロジックを提供する必要があります。

結合関数を設計する際には、関数が特定のキーのすべての値に対して必ずしも 1 回だけ呼び出されるとは限らないことに注意してください。入力データ(値コレクションを含む)は複数のワーカー インスタンスに分散される可能性があるため、結合関数は、値コレクションのサブセットに対して部分結合を実行するために、複数回呼び出される可能性があります。一般的に、結合はツリー構造内で繰り返し適用される可能性があります。ツリー構造は明示されないため、結合関数は可換かつ結合的である必要があります

結合関数の作成時には、いくつかのオプションを指定できます。単純な結合オペレーション(合計値など)は通常、一連の値を同じ型の単一値に変換するシンプルな関数として実装できます。作成する関数は、可換かつ結合的である必要があります。つまり、結合関数 f と一連の値 v1, ..., vn の場合、論理的には、f(v1, v2, ... , vn) = f(v1, v2, ... , f(vk, ... , vn)) = f(f(v1, v2), ... , f(vk, ... , vn)) = f(f(vk, ... , vn), ... , f(v2, v1)) のようになります。

より複雑な結合オペレーションでは、累積型が入力 / 出力型と異なる CombineFn のサブクラスを作成しなければならない場合もあります。最後に、Dataflow SDK では、各種の統計的結合を処理する一般的な関数も提供されています(SumMinMaxMean など)。詳細については、付属の統計結合オペレーションの使用を参照してください。

シンプルな関数を使用したシンプルな結合

Java

シンプルな結合関数(Integer 値のコレクションを単一の Integer に合計するなど)の場合は、SerializableFunction インターフェースを実装するシンプルな関数を作成できます。具体的には、SerializableFunctionIterable<V>V 型の単一値に変換します。

次のサンプルコードは、Integer 値のコレクションを合計するシンプルな結合関数を示したものです。関数 SumInts がインターフェース SerializableFunction を実装しています。

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

CombineFn を使用した高度な結合

より複雑な結合関数では、SDK クラス CombineFn のサブクラスを定義できます。結合関数で追加の前処理や後処理を実行する必要があり、それによって出力型が変わる場合は、より高度なアキュムレータを使用したり、キーへの考慮が必要になる可能性があるため、CombineFn を使用する必要があります。

計算の分散性について考慮してください。各要素の完全なキー / 値コレクションが、同じ Compute Engine インスタンスにいつでも存在するとは限りません。すべての値を考慮しなければ適切に実行できない計算については、CombineFn を使用する必要があります。

たとえば、特定のキーに関連付けられた値コレクションの平均値を計算する必要があるとします。平均値を計算するには、結合関数ですべての値を合計した後、加算した値の数でその合計を除算する必要があります。ただし、除算が複数の場所で実行される場合、結果の平均値は誤ったものになります。CombineFn を使用すれば、Cloud Dataflow サービスで合計の実行と要素数の両方を累積し、すべての要素が累積されるまで、最終的な計算(このケースでは除算)を保留できます。

一般的な結合オペレーションは 4 つのオペレーションで構成されます。CombineFn のサブクラスを作成する場合は、対応するメソッドをオーバーライドして 4 つのオペレーションを行う必要があります。

  • アキュムレータを作成する
  • 入力を追加する
  • アキュムレータをマージする
  • 出力を抽出する

なんらかの理由により、結合関数からキー / 値コレクション ペア内のキーにアクセスする必要が生じた場合は、代わりに KeyedCombineFn を使用できます。 KeyedCombineFn は、キーを追加の型パラメータとして渡します。

アキュムレータを作成する: 新しい「ローカル」アキュムレータを作成します。サンプルのケースでは、平均値を求めるために、ローカル アキュムレータが値の合計オペレーションの実行回数(最終的な平均値を求める除算処理の分子の値)と、現在までに合計された値の数(分母の値)を追跡します。このオペレーションは、分散的な場所から何度でも呼び出される可能性があります。

入力を追加する: 入力要素をアキュムレータに追加し、アキュムレータ値を返します。サンプルでは、このオペレーションによって合計値を更新し、カウント数を増分しています。また、このオペレーションは並行的に呼び出されることがあります。

アキュムレータをマージする: 複数のアキュムレータを 1 つのアキュムレータにマージします。これにより、複数のアキュムレータで保持されているデータが、最終計算の前に結合されます。平均値の計算の場合、除算の各部を表すアキュムレータがすべてマージされます。このオペレーションも、出力時に何度でも呼び出される可能性があります。

出力を抽出する: 最終計算を実行します。平均値を計算する場合は、すべての値を結合した合計値を、合計された値の数で除算します。このオペレーションは、最終的にマージされたアキュムレータで 1 回だけ呼び出されます。

次のサンプルコードは、平均値を計算する CombineFn の定義方法を示したものです。

Java

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

CombineFn をサブクラス化するために必要な型パラメータとジェネリクスの詳細については、CombineFn の API for Java リファレンス ドキュメントをご覧ください。

アキュムレータ型 AccumT は、エンコード可能である必要があります。データ型のデフォルト コーダーを指定する方法については、データ エンコーディングを参照してください。

付属の統計的結合関数の使用

Java

Dataflow SDK には、基本的な数学的結合関数を提供するクラスが複数用意されています。デベロッパーは、Combine.globallyCombine.perKey を使用するときの結合関数のように、これらのクラスの結合関数を使用できます。使用できる関数には、次が含まれます。

  • Sum: すべての値を加算して、単一の合計値を生成します。
  • Min: すべての利用可能値から最小値だけを保持します。
  • Max: すべての利用可能値から最大値だけを保持します。
  • Mean: すべての利用可能値から単一の平均値を計算します。

各クラスでは、IntegerLong、および Double 型の結合関数が定義されています(Mean クラスを除く。このクラスでは、汎用の累積結合関数が定義されています。この関数には型パラメータを渡す必要があります)。

たとえば、Sum.SumIntegerFn を使用すると、Integer 値(この例ではキーごとの値)のコレクションを合計できます。

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

また、Max.MaxLongFn を使用すると、PCollection<Long> 内の最大 Long 値を計算できます。以下はその例です。

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

SumMinMaxMean クラスはすべて、com.google.cloud.dataflow.sdk.transforms パッケージで定義されています。詳細については、Cloud Dataflow Java API リファレンスを参照してください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。