ウィンドウ処理

Dataflow SDK は、ウィンドウ処理というコンセプトを使用し、個々の要素のタイムスタンプに基づいて PCollection を再分割します。複数の要素を集計する Dataflow の変換GroupByKeyCombine など)は、暗黙的にウィンドウ単位で動作します。つまり、コレクション全体のサイズが無制限であっても、複数の有限なウィンドウが連続しているものとして各 PCollection を処理します。

Dataflow SDK はトリガーという関連の概念を使用して、制限のないデータが届くときに各有限ウィンドウをいつ「クローズ」するかを決定します。トリガーを使用すると、遅れて届くデータを処理したり、早期結果を提供したりするように、PCollection のウィンドウ処理戦略の高度な設定を行うことができます。詳しくは、トリガーをご覧ください。

ウィンドウ処理の基本

ウィンドウ処理が最も役立つのは制限なし PCollection です。これは、更新され続ける、サイズが不明または無制限のデータセット(ストリーミング データなど)を表します。Dataflow の一部の変換(GroupByKeyCombine など)は共通のキーを基準に複数の要素をグループ化します。通常、このようなグループ化オペレーションでは、データセット全体で同じキーを持つすべての要素がグループ化されます。制限なしデータセットでは、新しい要素が常に追加されているため、すべての要素を収集するのは不可能です。

Dataflow モデルでは、どの PCollection でも論理的なウィンドウに再分割できます。PCollection の各要素は、PCollectionウィンドウ処理機能に応じて 1 つ以上のウィンドウに割り当てられ、個々のウィンドウには有限数の要素が含まれます。この後、グループ化変換は、各 PCollection の要素をウィンドウごとに処理します。たとえば、GroupByKeyPCollection の要素を、キーとウィンドウを基準に暗黙的にグループ化します。Dataflow では同一ウィンドウ内のデータのみがグループ化されます。他のウィンドウのデータはグループ化されません。

ウィンドウ処理の制約

PCollection に対してウィンドウ処理機能を設定すると、その PCollection に次回グループ化変換を適用する際は、要素のウィンドウが使用されます。Dataflow では、ウィンドウの実際のグループ化はニーズに応じて実行されます。Window 変換を使用してウィンドウ処理機能を設定すると、各要素がウィンドウに割り当てられますが、ウィンドウと見なされるのは PCollectionGroupByKey または Combine でグループ化してからです。このためにさまざまな影響がパイプラインに生じることがあります。

次の図 1 のパイプラインの例をご覧ください。

ウィンドウ処理、ParDo、GroupByKey を順に適用するパイプライン
図 1: ウィンドウ処理を適用するパイプライン

このパイプラインでは、PubsubIO を使用して Key-Value ペアを読み取り、制限なし PCollection を作成しています。その後、Window 変換を使用して、ウィンドウ処理機能をこのコレクションに適用します。さらに、ParDo をコレクションに適用してから、その ParDo の結果を GroupByKey を使用してグループ化します。ウィンドウ処理機能は ParDo 変換に影響しません。ウィンドウは、GroupByKey で必要とされるまでは実際に使用されないためです。

ただし、後続の変換は、キーとウィンドウの両方でグループ化されたデータである GroupByKey の結果に対して行われます。

制限付き PCollection でウィンドウ処理を使用する

ウィンドウ処理は、制限付き PCollection のサイズが固定されたデータセットに対して使用することもできます。ただし、ウィンドウ処理では、PCollection の各要素に付けられている暗黙のタイムスタンプのみが考慮されることと、固定データセット(TextIOBigQueryIO など)を作成するデータソースは同じタイムスタンプをすべての要素に割り当てることに注意してください。つまり、デフォルトではすべての要素が単一グローバル ウィンドウに含まれます。同じウィンドウにすべての要素を割り当てた場合、パイプラインは従来の MapReduce バッチ形式で実行されます。

固定データセットに対してウィンドウ処理を使用するために、各要素に独自のタイムスタンプを割り当てることができます。タイムスタンプを要素に割り当てるには、新しいタイムスタンプを付けて各要素を出力する DoFn を含む ParDo 変換を使用します。

制限付き PCollection に対してウィンドウ処理を使用すると、パイプラインがデータを処理する方法に影響することがあります。たとえば、次のパイプラインについて考えてみます。

制限付きコレクションに対して GroupByKey を適用してから ParDo を適用するパイプライン
図 2: GroupByKey および ParDo(ウィンドウ処理なしの制限付きコレクション)

上記のパイプラインでは、TextIO を使用して Key-Value ペアを読み取り、制限付き PCollection を作成します。次に、GroupByKey を使用してコレクションをグループ化し、グループ化された PCollectionParDo 変換を適用します。この例では、GroupByKey によって一意キーのコレクションが作成され、ParDo がキーごとに 1 回のみ適用されます。

ここで、ウィンドウ処理を使用する同じパイプラインについて考えてみます。

制限付きコレクションに対してウィンドウ処理を適用し、さらに GroupByKey と ParDo を順に適用するパイプライン
図 3: GroupByKey と ParDo(ウィンドウ処理を行った制限付きコレクション)

前と同様に、このパイプラインは Key-Value ペアの制限付き PCollection を作成します。次に、その PCollection に対してウィンドウ処理機能を設定します。このとき、GroupByKey 変換は PCollection の要素をキーとウィンドウの両方でグループ化します。後続の ParDo 変換は、キーごとに複数回、ウィンドウごとに 1 回適用されます。

ウィンドウ処理機能

Dataflow SDK では、PCollection の要素を分割するためにさまざまな種類のウィンドウを定義できます。SDK では次のようないくつかのウィンドウ処理機能が提供されます。

  • 固定時間ウィンドウ
  • スライディング タイム ウィンドウ
  • セッション単位ウィンドウ
  • 単一グローバル ウィンドウ

使用するウィンドウ処理機能によって異なりますが、各要素は論理的には複数のウィンドウに所属できることに注意してください。たとえば、スライディング タイム ウィンドウでは、部分的に重なるウィンドウが作成されるため、1 つの要素が複数のウィンドウに割り当てられることがあります。

固定時間ウィンドウ

ウィンドウ処理の最も単純な形は固定時間ウィンドウです。更新され続ける、タイムスタンプ付きの PCollection に対して、各ウィンドウがたとえば 5 分間分の要素を保持するとします。

固定時間ウィンドウはデータ ストリームの時間間隔を表し、処理対象のデータのバンドルを定義します。5 分間隔で作動するウィンドウについて考えてみてください。制限なし PCollection でタイムスタンプ値が 0:00:00~0:04:59 のすべての要素は最初のウィンドウ、タイムスタンプ値が 0:05:00~0:09:59 のすべての要素は 2 番目のウィンドウのように、順に各ウィンドウに属します。

固定時間ウィンドウ処理を表す図
図 4: サイズ 30 秒の固定時間ウィンドウ

スライディング タイム ウィンドウ

スライディング タイム ウィンドウもデータ ストリームの時間間隔を使用してデータのバンドルを定義しますが、スライディング タイム ウィンドウではウィンドウの一部が重なります。各ウィンドウが 5 分間分のデータを保持するとしても、新しいウィンドウは 10 秒ごとに開始します。スライディング ウィンドウの開始間隔はピリオドと呼ばれます。したがって、この例のウィンドウ サイズは 5 分、ピリオドは 10 秒です。

複数のウィンドウがずれて重なり合うため、データセットのほとんどの要素は複数のウィンドウに属すことになります。このようなウィンドウ処理はデータの移動平均を求めるときに役立ちます。この例では、スライディング タイム ウィンドウを使用し、過去 5 分間分のデータの移動平均を計算して、10 秒ごとに更新します。

スライディングタイムウィンドウ処理を表す図
図 5: スライディング タイム ウィンドウ(ウィンドウ サイズ 1 分、ウィンドウ ピリオド 30 秒)。

セッション ウィンドウ

セッション ウィンドウ機能では、データが集中する部分にウィンドウが定義されます。セッション ウィンドウ処理が役立つのは、時間的に不規則に分散しているデータです。たとえば、ユーザーのマウス操作を表すデータ ストリームでは、アイドル状態の時間帯が長く続き、クリックが特に集中する期間が点在します。セッション ウィンドウ処理では、高度に集中するデータを別のウィンドウにグループ化して、データ ストリームのアイドル状態の部分を除外します。

セッション ウィンドウ処理はキーごとに適用されることに注意してください。つまり、セッションのグループ化では、同じキーを持つデータのみが処理されます。データ コレクションの各キーが、さまざまなサイズの、互いに素であるウィンドウにグループ化されます。

最も単純なセッション ウィンドウ処理では、最小ギャップ時間が指定されます。最小ギャップ時間よりも前に届くすべてのデータは同じウィンドウにグループ化されます。最小ギャップ時間が経過してからデータが届くと、新しいウィンドウが開始されます。

セッション ウィンドウ処理を表す図
図 5: 最小ギャップ時間が設定されたセッション ウィンドウ。各データキーのウィンドウがデータ分布に応じて異なることに注意してください。

単一グローバル ウィンドウ

デフォルトでは PCollection のすべてのデータは単一グローバル ウィンドウに割り当てられます。データセットが固定サイズの場合は、PCollection のグローバル ウィンドウをデフォルトのままにしておくことができます。PCollection の要素がすべて単一グローバル ウィンドウに属する場合、パイプラインはほとんどバッチ処理ジョブ(MapReduce ベースの処理での)と同様に動作します。

他のウィンドウ処理機能

Dataflow SDK では、固定、スライディング、セッション、グローバルの各ウィンドウに加えて、カレンダー ベースのウィンドウなど他のウィンドウ処理機能も提供されます。

Java

Dataflow SDK for Java で使用できるウィンドウ処理機能の一覧は、com.google.cloud.dataflow.sdk.transforms.windowing をご覧ください。

PCollection のウィンドウ処理機能を設定する

PCollection のウィンドウ処理機能を設定するには Window 変換を適用します。Window 変換を適用するときは WindowFn を指定する必要があります。WindowFn によって決定されるウィンドウ処理機能(固定時間ウィンドウやスライディング タイム ウィンドウなど)は、PCollection が後続のグループ化変換のために使用します。

Dataflow SDK には、基本的なウィンドウ処理機能として、いくつかの WindownFn が事前定義されています。高度なケースでは、独自の WindowFn を定義することもできます。

Window は、技術的にはすべての変換と同様に、入力 PCollection を取り、各要素が 1 つ以上の論理的な有限ウィンドウに割り当てられた新しい PCollection を出力します。

固定時間ウィンドウを設定する

次のコードサンプルは、Window を適用して PCollection を、長さが 1 分ずつの固定ウィンドウに分割する方法を示します。

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

スライディング タイム ウィンドウを設定する

次のコードサンプルは、Window を適用して PCollection をスライディング タイム ウィンドウに分割する方法を示します。各ウィンドウの長さは 30 分で、新しいウィンドウは 5 秒ごとに開始します。

Java

  PCollection<String> items = ...;
  PCollection<String> sliding_windowed_items = items.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

セッション ウィンドウを設定する

次のコードサンプルは、Window を適用して PCollection をセッション ウィンドウに分割する方法を示します。ここでは、最小 10 分の時間ギャップで各セッションを区切ります。

Java

  PCollection<String> items = ...;
  PCollection<String> session_windowed_items = items.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

セッションはキー別であるので注意してください。コレクションのキーごとに、データの分布に応じて独自のセッション グループが形成されます。

単一グローバル ウィンドウを設定する

PCollection が制限付き(固定サイズ)の場合、すべての要素を単一グローバル ウィンドウに割り当てることができます。次のコードサンプルは、PCollection の単一グローバル ウィンドウを設定する方法を示します。

PCollection の単一グローバル ウィンドウを設定するには、Window 変換を適用するときに new GlobalWindows()WindowFn として渡します。次のコードサンプルは、Window を適用して PCollection を単一グローバル ウィンドウに割り当てる方法を示します。

Java

  PCollection<String> items = ...;
  PCollection<String> batch_items = items.apply(
    Window.<String>into(new GlobalWindows()));

タイムスキュー、データラグ、遅延データ

どのようなデータ処理システムでも、データイベントの発生(データ要素そのもののタイムスタンプで判別される「イベント時間」)と、パイプラインの任意のステージでの実際のデータ要素の処理(要素を処理するシステムのクロックで判別される「処理時間」)には一定の時間差があります。

完璧なシステムがあるとすれば、各データ要素のイベント時間と処理時間が同一になるか、少なくとも差分が一定になるでしょう。ただし、現実にはどのようなコンピューティング システムでも、データの生成と配信はいくつもの時間的な制約の影響を受けます。大規模システムまたは分散システム(たとえば、顧客注文またはログファイルを生成するウェブ フロントエンドの分散コレクション)では、データイベントがウェブ上のさまざまな場所で生成されたのと同じ順序でパイプラインに届く保証はありません。

たとえば、固定時間ウィンドウ処理を使用する PCollection があるとします。ウィンドウの長さは 5 分間です。Dataflow は、各ウィンドウで、イベント時間のタイムスタンプが所定のウィンドウ範囲(たとえば、最初のウィンドウは 0:00~4:59)に含まれるすべてのデータを収集する必要があります。この範囲外のタイムスタンプのデータ(5:00 以降のデータ)は別のウィンドウに属します。

ただし、データが常に正しい時間の順序でパイプラインに届くとは保証されません。また、常に予測可能な間隔で届くとも保証されません。Dataflow が追跡するウォーターマークは、特定のウィンドウのすべてのデータがパイプラインに届くと期待するタイミングを表すシステムの概念です。届いたデータのタイムスタンプがウォーターマーク後の場合、遅延データと見なされます。

前述の例で、データのタイムスタンプ(イベント時間)とデータがパイプラインに届く時間(処理時間)の差を約 30 秒と仮定する単純なウォーターマークがあるとします。この場合、Dataflow は最初のウィンドウを 5:30 に閉じます。5:34 に届いたデータレコードのタイムスタンプが 0:00~4:59 ウィンドウに対応している(たとえば 3:38)場合、そのレコードは遅延データです。

注: ラグタイム / タイムスキューを見積もるごく単純なウォーターマークを使用していると便宜的に仮定しました。実際には、PCollection のデータソースによってウォーターマークが決まります。さらに精密なウォーターマークや複雑なウォーターマークがあります。

タイムスキューと遅延データを管理する

遅延データを許可するには、PCollection のウィンドウ処理戦略を設定するときに .withAllowedLateness オペレーションを呼び出します。次のコードサンプルは、ウィンドウ境界から 2 日後までの遅延データを許可するウィンドウ処理戦略を示します。

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .withAllowedLateness(Duration.standardDays(2)));

.withAllowedLatenessPCollection に対して設定すると、許可遅延を適用した最初の PCollection から導出される後続の PCollection にもその許可遅延が伝播されます。パイプラインの許可遅延を後で変更するには、Window.withAllowedLateness() を明示的に再適用する必要があります。

また、Dataflow の Triggers API を使用して、PCollection のウィンドウ処理方式の高度な設定を行うこともできます。トリガーを使用すると、ウィンドウによる遅延要素の排出方法も含めて、個々のウィンドウでいつ集計を行って結果をレポートするかを正確に決定できます。

注: Dataflow のウィンドウ処理とトリガーのデフォルトの方式では、遅延データは破棄されます。パイプラインが遅延データのインスタンスを確実に処理するためには、PCollection のウィンドウ処理戦略を設定し、それに応じて PCollection のトリガーを設定するときに、.withAllowedLateness を明示的に設定する必要があります。

PCollection の要素にタイムスタンプを追加する

PCollection の要素に新しいタイムスタンプを割り当てるには、自分が設定するタイムスタンプを新しい要素と一緒に出力する ParDo 変換を適用します。タイムスタンプの割り当てが役立つのは、Dataflow のウィンドウ処理機能を使用しようとする際に、データセットが暗黙のタイムスタンプなしにソースから届く場合です(TextIO のファイルなど)。

この方法が適しているのは、データセットにタイムスタンプ データが含まれるが、そのタイムスタンプが Dataflow データソースによって生成されない場合です。たとえば、パイプラインが入力ファイルのログレコードを読み取り、各ログレコードにタイムスタンプ フィールドが含まれているとします。パイプラインはファイルからレコードを読み取るため、ファイルソースによってタイムスタンプが自動的に割り当てられることはありません。各レコードのタイムスタンプ フィールドを解析し、ParDo 変換を使用して、PCollection の各要素にタイムスタンプを付けることができます。

Java

タイムスタンプを割り当てるには、ParDo 変換で使用する DoFn が、(主出力コレクションへの要素の排出に使用する通常の ProcessContext.output ではなく)ProcessContext.outputWithTimestamp を使用して要素を出力する必要があります。次のコードサンプルは、新しいタイムスタンプと一緒に要素を出力する DoFn を含む ParDo を示しています。

  PCollection<LogEntry> unstampedLogs = ...;
  PCollection<LogEntry> stampedLogs =
      unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
        public void processElement(ProcessContext c) {
          // Extract the timestamp from log entry we're currently processing.
          Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
          // Use outputWithTimestamp to emit the log entry with timestamp attached.
          c.outputWithTimestamp(c.element(), logTimeStamp);
        }
      }));
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。