トリガー

PCollection でデータを収集し、制限付きウィンドウでグルーピングする際には、Dataflow は各ウィンドウの集約結果をいつ排出するかを、なんらかの方法で決定する必要があります。タイムスキューとデータラグがある場合、Dataflow はトリガーと呼ばれるメカニズムを使用して、ウィンドウで「十分な」データがいつ収集されるかを判別した後、ウィンドウの集約結果を排出します。この排出されたそれぞれの結果を「ペイン」と呼びます。

Dataflow のトリガー システムでは、システムのデータ処理ニーズに応じて、所定のウィンドウの集計結果をいつ排出するかを決定する方法がいくつか提供されます。たとえば、即時または時間を慎重に考慮して更新する必要があるシステムでは、データの完全性の即時性を重視し、N 秒ごとにウィンドウを排出する厳密な時間ベースのトリガーを使用します。結果の正確なタイミングよりもデータの完全性を重視するシステムでは、ウィンドウを閉じる前に一定数のデータレコードが蓄積されるのを待つデータベースのトリガーを使用します。

トリガーは、パイプラインで次の 2 種類の条件を扱う際に特に役立ちます。

  • トリガーを使用して、遅延データのインスタンスを扱うことができます。
  • トリガーを使用して、所定ウィンドウのすべてのデータが届く前に、早期に結果を排出できます。

注: ウィンドウ処理機能で単一グローバル ウィンドウを使用する、制限なしの PCollection に対してトリガーを設定できます。これは、パイプラインが制限なしのデータセットの定期的な更新を行う場合に便利です。たとえば、N 秒ごとまたは N 個の要素ごとに更新されて、現在までに提供されたすべてのデータの移動平均などです。

トリガーの種類

Dataflow には、PCollection の設定に使用できる既成のトリガーが数多く用意されています。主に 3 つの種類のトリガーがあります。

  • 時間ベースのトリガー。このトリガーは時間基準に対して作動します。時間基準には、イベント時間(各データ要素のタイムスタンプによって示される時間)と処理時間(パイプライン内の所定のステージでデータ要素が処理される時間)があります。
  • データ駆動型のトリガー。このトリガーは、データが各ウィンドウに届いたときにデータを調べて、指定したデータ条件を満たすと起動されます。たとえば、ウィンドウのデータ要素が特定の数に到達したときにウィンドウの結果を排出するトリガーを設定できます。
  • 複合トリガー。このトリガーは、複数の時間ベースのトリガーとデータ駆動トリガーを論理的な方法で組み合わせたものです。すべてのトリガーの条件が満たされるとき(論理 AND)や、いずれかのトリガーの条件が満たされるとき(論理 OR)などに起動する複合トリガーを設定できます。

時間ベースのトリガー

Dataflow の時間ベースのトリガーには、AfterWatermarkAfterProcessingTime があります。これらのトリガーは、イベント時間または処理時間を時間基準として取り、その時間基準に基づくタイマーを設定します。

AfterWatermark

AfterWatermark トリガーは、イベント時間に対して作動します。AfterWatermark トリガーは、データ要素に付加されたタイムスタンプに基づき、ウィンドウの境界をウォーターマークが通過した後に、ウィンドウのコンテンツを排出します。ウォーターマークはグローバルな進捗メトリックであり、任意の時点でのパイプライン内の入力の完了レベルを示す Dataflow の概念です。

AfterWatermark トリガーはウォーターマークがウィンドウの境界を通過した場合のみ起動します。これは、所定の時間ベース ウィンドウにすべてのデータが含まれるとシステムで推測されたときに、Dataflow が結果を排出するために使用するプライマリ トリガーです。

AfterProcessingTime

AfterProcessingTime トリガーは、処理時間に対して作動します。AfterProcessingTime トリガーは、ウィンドウの開始などの時間参照点から一定の処理時間が経過した後にウィンドウを排出します。処理時間は、データ要素のタイムスタンプではなくシステム クロックによって決定されます。

AfterProcessingTime トリガーは、特に 1 つのグローバル ウィンドウなどの時間枠が長いウィンドウの初期の結果をトリガーするのに便利です。

データドリブンのトリガー

現在、Dataflow では 1 つのデータドリブンのトリガー AfterPane.elementCountAtLeast のみが提供されます。このトリガーは、連続した要素のカウントに対して作動します。つまり、現在のペインで N 個以上の要素が収集されると起動します。

AfterPane.elementCountAtLeast() は、特にグローバル ウィンドウが 1 つの場合に、すべてのデータが累積される前に、初期の結果をウィンドウに排出させるのに適しています。

デフォルトのトリガー

PCollection のデフォルトのトリガーはイベントの時間ベースであり、システムのウォーターマーク(すべてのデータが「そろった」ときの Dataflow の通知)がウィンドウの境界を通過するときにウィンドウの結果を排出します。デフォルトのトリガー構成では、排出は 1 回のみ行われ、遅延データは破棄されます。デフォルトのウィンドウ処理とトリガー構成で許可遅延の値は 0 であるためです。この動作を変更する方法について詳しくは、遅延データを扱うをご覧ください。

ウォーターマークはデータソースによって異なり、推定の場合もあります。または、システム割り当てのタイムスタンプがある Pub/Sub などでは、ウォーターマークが、パイプラインによって処理されたデータの正確な境界を示すことができます。

遅延データを扱う

パイプラインが遅延データ(ウォーターマークがウィンドウの境界を通過した後で届いたデータ)を扱う必要がある場合、ウィンドウ処理またはトリガー構成を設定するときに、許可遅延を適用できます。これにより、トリガーが遅延データに反応できるようになります。デフォルトのトリガー構成では、遅延データが届くとすぐに新しい結果が排出されます。

ウィンドウとトリガーを設定するときに .withAllowedLateness() を使用して、次のように許可遅延を設定します。

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .withAllowedLateness(Duration.standardMinutes(30));

この許可遅延は、変換を元の PCollection に適用した結果として導出されるすべての PCollection に伝播されます。後でパイプラインの許可遅延を変更する場合は、Window.withAllowedLateness() を明示的に再適用できます。

トリガーを設定する

Window の使用によって、PCollection のウィンドウ機能を設定する際は、トリガーを指定することもできます。

次のように、Window.into() 変換の結果に .triggering() メソッドを起動することで PCollection のトリガーを設定します。

Java

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

このコードサンプルによって PCollection のトリガーが設定されます。このトリガーは時間ベースで、ウィンドウの最初の要素が処理された 1 分後に各ウィンドウを排出します。コードサンプルの最後の行 .discardingFiredPanes() は、ウィンドウの累積モードです。

ウィンドウの累積モード

トリガーを指定するときに、ウィンドウの累積モードも設定する必要があります。トリガーが起動すると、ウィンドウの現在の内容をペインとして排出します。トリガーは複数回起動できるため、累積モードによって、トリガーの起動時にシステムがウィンドウ ペインを累積するか破棄するかを決定します。

トリガーの起動時にペインを累積するようにウィンドウを設定するには、トリガーを設定するときに .accumulatingFiredPanes() を呼び出します。起動対象のペインを破棄するようにウィンドウを設定するには、.discardingFiredPanes() を呼び出します。

固定時間ウィンドウ処理とデータに基づくトリガーが設定された PCollection を使用する例について考えてみます(たとえば、各ウィンドウが 10 分間の移動平均を表しているが、それよりも短い間隔で現在の平均値を UI に表示しようとする場合に、この方法を使用できます)。次の条件があるとします。

  • PCollection は 10 分の固定時間ウィンドウを使用します。
  • PCollection の繰り返しトリガーは、要素が 3 個届くたびに起動します。

次の図に、PCollection に到着してウィンドウに割り当てられるデータイベントを示します *。

固定時間ウィンドウが設定された PCollection に届くキーごとのデータの図
図 1: 固定時間ウィンドウが設定されている PCollection 内のデータイベント。

注: 図を単純にするために、すべてのイベントが順序どおりにパイプラインに届くと仮定します。

便宜上、キー X が関連付けられた値のみを取り上げます。

累積モード

トリガーを .accumulatingFiredPanes に設定すると、トリガーは起動するたびに次の値を排出します(トリガーは、要素が 3 個届くたびに起動します)。

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [5, 8, 3, 15, 19, 23]
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]

破棄モード

トリガーを .discardingFiredPanes に設定すると、起動するたびに次の値を排出します。

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [15, 19, 23]
    Third trigger firing:  [9, 13, 10]

累積と破棄の影響

ここで、キーごとの計算をパイプラインに追加します。トリガーが起動するたびに、パイプラインは、ウィンドウ内の各キーに関連付けられているすべての値の平均値を計算する Combine.perKey を適用します。

ここでも、キー X を取り上げます。

トリガーを .accumulatingFiredPanes に設定すると:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [5, 8, 3, 15, 19, 23]
      Average after second trigger firing: 12.167
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
      Average after third trigger firing: 11.667

トリガーを .discardingFiredPanes に設定すると:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [15, 19, 23]
      Average after second trigger firing: 19
    Third trigger firing:  [9, 13, 10]
      Average after third trigger firing: 10.667

平均値を計算する Combine.perKey は、それぞれのケースで異なる結果を作成します。

一般的に、.accumulatingFiredPanes に設定されたトリガーは、すでにトリガーで処理された要素も含めて、所定のウィンドウのすべてのデータを出力します。.discardingFiredPanes に設定されたトリガーは、トリガーの最後の起動時以降の増分変更を出力します。累積モードが最も適しているのは、要素の結合または更新を行うグループ化オペレーションの前です。それ以外の場合は、破棄モードを使用してください。

トリガーの継続

トリガーを指定した PCollection に集約変換(GroupByKeyCombine.perKey など)を適用する場合には、GroupByKey または Combine.perKey によって新しい出力 PCollection が生成されることに注意してください。入力コレクションに設定したトリガーはこの新しい出力コレクションには伝播しません

代わりに、Dataflow SDK は、入力コレクションに指定したトリガーに基づいて、出力 PCollection に対して同等のトリガーを作成します。新しいトリガーは、入力 PCollection に対するオリジナルのトリガーの指定とほぼ同じ速度で、できるだけ迅速に要素を排出しようとします。ユーザーが入力トリガーに指定したパラメータに基づいて、Dataflow によって新しいトリガーのプロパティが決定されます。

  • AfterWatermark の継続トリガーは、デフォルトではオリジナルのトリガーと同一です。AfterWatermark トリガーに、早期起動または遅延起動が指定されている場合、継続の早期起動または遅延起動は、オリジナル トリガーの早期起動または遅延起動を継続します。
  • AfterProcessingTime のデフォルトの継続トリガーは、結合された要素の同期処理時間の後で起動し、遅延をさらに伝播することはありません。たとえば、AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour) のようなトリガーがあるとします。GroupByKey の後で、出力コレクションのために Dataflow が用意するトリガーは、キーごとに同一の調整済み時間に同期されますが、1 時間の遅延を保持することはありません。
  • AfterCount のデフォルト継続トリガーは、要素ごとに起動します。たとえば、入力コレクションに対する AfterCount(n) は出力コレクションに対する AfterCount(1) になります。

GroupByKey または Combine.perKeyPCollection 出力のために Dataflow が生成するトリガーが十分でないと思われる場合は、そのコレクションに対して新しいトリガーを明示的に設定する必要があります。

トリガーを組み合わせる

Dataflow では、複数のトリガーを組み合わせて複合トリガーを作成できます。Dataflow の複合トリガー システムを使用すると、複数のトリガーを論理的に組み合わせることができます。結果を繰り返し排出するトリガー、最大で 1 回だけ排出するトリガー、または他のカスタム条件に応じて排出するトリガーを指定することもできます。

複合トリガーのタイプ

Dataflow には次の複合トリガーが含まれています。

  • 早い起動または遅い起動を AfterWatermark.pastEndOfWindow に追加できます。
  • Repeatedly.forever は、永続的に実行するトリガーを指定します。トリガーの条件が満たされると常にウィンドウの結果を排出し、その後リセットして最初から開始します。Repeatedly.forever.orFinally を組み合わせると、反復トリガーを停止させる条件の指定に役立つことがあります。
  • AfterEach.inOrder は、複数のトリガーを特定のシーケンスで起動するように組み合わせます。シーケンスの 1 つのトリガーが起動するとウィンドウが排出され、シーケンス内の次のトリガーに進みます。
  • AfterFirst は複数のトリガーを取り、引数であるトリガーのいずれかが最初に満たされたときに排出します。これは、複数トリガーの論理 OR 演算と同じです。
  • AfterAll は複数のトリガーを取り、引数であるトリガーすべてが満たされたときに排出します。これは、複数トリガーの論理 AND 演算と同じです。
  • orFinally は、最後の条件として使用できます。どのトリガーも最後に 1 回だけ起動し、再び起動することはありません。

AfterWatermark.pastEndOfWindow との組み合わせ

特に有効な複合トリガーは、すべてのデータが届いたとシステムが推定したとき(つまり、ウォーターマークがウィンドウの境界を通過したとき)に 1 回起動するトリガーと、次のいずれかまたは両方を組み合わせたものです。

  • 予測起動。ウォーターマークがウィンドウの境界を通過する前に起動して、部分的な結果を早く処理できます。
  • 遅延起動。ウォーターマークがウィンドウの境界を通過した後に発生して、遅れて届いたデータを処理できます。

AfterWatermark.pastEndOfWindow を使用して、このパターンを表すことができます。たとえば、この後に示すトリガーコードの例は次の条件で起動します。

  • すべてのデータが届いたとシステムが推測したとき(ウォーターマークがウィンドウの境界を通過するとき)。
  • 10 分の遅延後、遅れたデータが届くたび。
  • 2 日後。対象のデータがこれ以上届くことはないと仮定して、トリガーが実行を停止します。

Java

  .apply(Window
      .triggering(AfterWatermark
           .pastEndOfWindow()
           .withLateFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(10))))
      .withAllowedLateness(Duration.standardDays(2)));

他の複合トリガー

他の種類の複合トリガーを構築することもできます。次のコードサンプルは、ペインの要素が 100 個以上になったときか 1 分経過するごとに起動する単純な複合トリガーです。

Java

Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(100),
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

トリガーの文法

次の文法は、トリガーを組み合わせて複合トリガーを作成するさまざまな方法を説明しています。

TRIGGER ::=
   ONCE_TRIGGER
   Repeatedly.forever(TRIGGER)
   TRIGGER.orFinally(ONCE_TRIGGER)
   AfterEach.inOrder(TRIGGER, TRIGGER, ...)

ONCE_TRIGGER ::=
  TIME_TRIGGER
  WATERMARK_TRIGGER
  AfterPane.elementCountAtLeast(Integer)
  AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)
  AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)

TIME_TRIGGER ::=
  AfterProcessingTime.pastFirstElementInPane()
  TIME_TRIGGER.alignedTo(Duration)
  TIME_TRIGGER.alignedTo(Duration, Instant)
  TIME_TRIGGER.plusDelayOf(Duration)
  TIME_TRIGGER.mappedBy(Instant -> Instant)

WATERMARK_TRIGGER ::=
  AfterWatermark.pastEndOfWindow()
  WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER)
  WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER)

Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())