データを PCollection
に収集して、そのデータを有限のウィンドウごとにグループ化するとき、Dataflow はなんらかの方法で、各ウィンドウの集計結果をいつ排出するかを決める必要があります。タイムスキューとデータラグがある場合、Dataflow はトリガーと呼ばれるメカニズムを使用して、ウィンドウで「十分な」データがいつ収集されるかを判別した後、ウィンドウの集約結果を排出します。この排出された結果を「ペイン」と呼びます。
Dataflow のトリガー システムでは、システムのデータ処理ニーズに応じて、所定のウィンドウの集計結果をいつ排出するかを決定する方法がいくつか提供されます。たとえば、迅速な更新や一刻を争う更新が必要なシステムでは、N 秒ごとにウィンドウを排出する厳格な時間ベースのトリガーを使用して、データの完全性よりも迅速性を優先します。結果の正確なタイミングよりもデータの完全性が重視されるシステムでは、データに基づくトリガーを使用し、特定の数のデータレコードが収集されるまで待ってからウィンドウを閉じます。
トリガーは、パイプラインで次の 2 種類の条件を扱う際に特に役立ちます。
- トリガーを使用して、遅延データのインスタンスを扱うことができます。
- トリガーを使用して、所定ウィンドウのすべてのデータが届く前に、早期結果を排出することができます。
注: ウィンドウ処理機能で単一グローバル ウィンドウを使用する、制限なしの PCollection
に対してトリガーを設定できます。これが役立つのは、パイプラインで制限なしデータセットについて定期的な更新を生成する場合です(たとえば、現在までに提供されたすべてのデータの、N 秒ごとまたは要素 N 個ごとの移動平均)。
トリガーの種類
Dataflow ではいくつもの事前作成済みトリガーが用意されており、PCollection
に対して設定できます。主に 3 つの種類のトリガーがあります。
- 時間ベースのトリガー。このトリガーは時間基準に対して作動します。時間基準には、イベント時間(各データ要素のタイムスタンプによって示される時間)と処理時間(パイプライン内の所定のステージでデータ要素が処理される時間)があります。
- データ駆動型のトリガー。このトリガーは、データが各ウィンドウに届いたときにデータを調べて、指定したデータ条件を満たすと起動されます。たとえば、ウィンドウのデータ要素が特定の数に到達したときにウィンドウの結果を排出するトリガーを設定できます。
- 複合トリガー。このトリガーは、複数の時間ベースのトリガーとデータ駆動トリガーを論理的な方法で組み合わせたものです。すべてのトリガーの条件が満たされるとき(論理 AND)や、いずれかのトリガーの条件が満たされるとき(論理 OR)などに起動する複合トリガーを設定できます。
時間ベースのトリガー
Dataflow の時間ベーストリガーには AfterWatermark
と AfterProcessingTime
があります。これらのトリガーは、イベント時間または処理時間を時間基準として取り、その時間基準に基づくタイマーを設定します。
AfterWatermark
AfterWatermark
トリガーは「イベント時間」に対して作動します。AfterWatermark
トリガーがウィンドウの内容を排出するのは、データ要素に付けられたタイムスタンプに基づいて、ウォーターマークがウィンドウの境界を通過した後です。ウォーターマークはグローバルな進捗メトリックであり、任意の時点でのパイプライン内の入力の完了レベルを示す Dataflow の概念です。
AfterWatermark
トリガーはウォーターマークがウィンドウの境界を通過した場合「のみ」起動します。これは、所定の時間ベース ウィンドウにすべてのデータが含まれるとシステムで推測されたときに、Dataflow が結果を排出するために使用するプライマリ トリガーです。
AfterProcessingTime
AfterProcessingTime
トリガーは「処理時間」に対して作動します。AfterProcessingTime
トリガーがウィンドウを排出するのは、時間基準(ウィンドウの開始時など)から特定の処理時間が経過したときです。処理時間は、データ要素のタイムスタンプではなくシステム クロックによって決定されます。
AfterProcessingTime
トリガーは、特に単一グローバル ウィンドウなど時間枠が長いウィンドウで、ウィンドウから早期結果をトリガーする場合に役立ちます。
データ駆動型のトリガー
現在、Dataflow では 1 つのデータ駆動型トリガー AfterPane.elementCountAtLeast
のみが提供されます。このトリガーは、連続した要素のカウントに対して作動します。つまり、現在のペインで N 個以上の要素が収集されると起動します。
AfterPane.elementCountAtLeast()
は、単一グローバル ウィンドウの場合に、すべてのデータが収集される前にウィンドウから早期結果を排出するために適した方法です。
デフォルトのトリガー
PCollection
のデフォルト トリガーはイベント時間に基づきます。システムのウォーターマーク(すべてのデータがいつ収集されたかを示す Dataflow の概念)がウィンドウの境界を通過したときに、ウィンドウの結果を排出します。デフォルトのトリガー構成では、排出は 1 回のみ行われ、遅延データは破棄されます。デフォルトのウィンドウ処理とトリガー構成で許可遅延の値は 0 であるためです。この動作を変更する方法について詳しくは、遅延データを扱うをご覧ください。
ウォーターマークはデータソースによって異なり、推定の場合もあります。または、システム割り当てのタイムスタンプがある Pub/Sub などでは、ウォーターマークが、パイプラインによって処理されたデータの正確な境界を示すことができます。
遅延データを扱う
パイプラインが遅延データ(ウォーターマークがウィンドウの境界を通過した後で届いたデータ)を扱う必要がある場合、ウィンドウ処理またはトリガー構成を設定するときに、許可遅延を適用できます。これにより、トリガーが遅延データに反応できるようになります。デフォルトのトリガー構成では、遅延データが届くとすぐに新しい結果が排出されます。
ウィンドウとトリガーを設定するときに .withAllowedLateness()
を使用して、次のように許可遅延を設定します。
PCollection<String> pc = ...; pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardMinutes(30));
この許可遅延は、変換を元の PCollection
に適用した結果として導出されるすべての PCollection
に伝播されます。パイプラインで後から許可遅延を変更する場合は、Window.withAllowedLateness()
を再び明示的に適用します。
トリガーを設定する
Window
変換を使用して PCollection
のウィンドウ処理機能を設定するときに、トリガーも指定できます。
次のようにメソッド .triggering()
を Window.into()
変換の結果に対して呼び出すことで、PCollection
のトリガーを設定します。
Java
PCollection<String> pc = ...; pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .discardingFiredPanes());
このコードサンプルによって PCollection
のトリガーが設定されます。このトリガーは時間ベースで、ウィンドウの最初の要素が処理された 1 分後に各ウィンドウを排出します。コードサンプルの最後の行 .discardingFiredPanes()
は、ウィンドウの累積モードです。
ウィンドウの累積モード
トリガーを指定するときに、ウィンドウの累積モードも設定する必要があります。トリガーが起動すると、ウィンドウの現在の内容をペインとして排出します。トリガーは複数回起動できるため、累積モードによって、トリガーの起動時にシステムがウィンドウ ペインを累積するか破棄するかを決定します。
トリガーの起動時にペインを累積するようにウィンドウを設定するには、トリガーを設定するときに .accumulatingFiredPanes()
を呼び出します。起動対象のペインを破棄するようにウィンドウを設定するには、.discardingFiredPanes()
を呼び出します。
固定時間ウィンドウ処理とデータに基づくトリガーが設定された PCollection
を使用する例について考えてみます(たとえば、各ウィンドウが 10 分間の移動平均を表しているが、それよりも短い間隔で現在の平均値を UI に表示しようとする場合に、この方法を使用できます)。次の条件があるとします。
PCollection
は 10 分の固定時間ウィンドウを使用します。PCollection
の繰り返しトリガーは、要素が 3 個届くたびに起動します。
次の図は、データイベントが PCollection
に届いて、各ウィンドウに割り当てられる様子を示します。

注: 図を単純にするために、すべてのイベントが順序どおりにパイプラインに届くと仮定します。
便宜上、キー X が関連付けられた値のみを取り上げます。
累積モード
トリガーが .accumulatingFiredPanes
に設定された場合、トリガーは起動するたびに次のように値を排出します(このトリガーは要素が 3 個届くごとに起動します)。
Key X: First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
破棄モード
トリガーが .discardingFiredPanes
に設定された場合、トリガーは起動するたびに次のように値を排出します。
Key X: First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]
累積と破棄の影響
ここで、キーごとの計算をパイプラインに追加します。トリガーが起動するたびに、パイプラインは、ウィンドウ内の各キーに関連付けられているすべての値の平均値を計算する Combine.perKey
を適用します。
ここでも、キー X を取り上げます。
トリガーが .accumulatingFiredPanes
に設定された場合:
Key X: First trigger firing: [5, 8, 3] Average after first trigger firing: 5.3 Second trigger firing: [5, 8, 3, 15, 19, 23] Average after second trigger firing: 12.167 Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10] Average after third trigger firing: 11.667
トリガーが .discardingFiredPanes
に設定された場合:
Key X: First trigger firing: [5, 8, 3] Average after first trigger firing: 5.3 Second trigger firing: [15, 19, 23] Average after second trigger firing: 19 Third trigger firing: [9, 13, 10] Average after third trigger firing: 10.667
平均値を計算する Combine.perKey
では各ケースで異なる結果が生成されることに注意してください。
一般的に、.accumulatingFiredPanes
に設定されたトリガーは、「すでにトリガーで処理された要素も含めて」、所定のウィンドウのすべてのデータを出力します。.discardingFiredPanes
に設定されたトリガーは、トリガーの最後の起動時以降の増分変更を出力します。累積モードが適しているのは、要素の結合または更新を行うグループ化オペレーションの前です。それ以外の場合は、破棄モードを使用してください。
トリガーの継続
トリガーを指定した PCollection
に集約変換(GroupByKey や Combine.perKey など)を適用する場合には、GroupByKey
または Combine.perKey
によって新しい出力 PCollection
が生成されることに注意してください。入力コレクションに設定したトリガーはこの新しい出力コレクションには伝播しません。
代わりに、ユーザーが入力コレクションに指定したトリガーに基づき、Dataflow SDK によって出力 PCollection
用の同等のトリガーが作成されます。新しいトリガーは、入力 PCollection
に対するオリジナルのトリガーの指定とほぼ同じ速度で、できるだけ迅速に要素を排出しようとします。ユーザーが入力トリガーに指定したパラメータに基づいて、Dataflow によって新しいトリガーのプロパティが決定されます。
AfterWatermark
の継続トリガーは、デフォルトではオリジナルのトリガーと同一です。AfterWatermark
トリガーに、早期起動または遅延起動が指定されている場合、継続の早期起動または遅延起動は、オリジナル トリガーの早期起動または遅延起動を継続します。AfterProcessingTime
のデフォルトの継続トリガーは、結合された要素の同期処理時間の後で起動し、遅延をさらに伝播することはありません。たとえば、AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour)
というトリガーがあるとします。GroupByKey
の後で、出力コレクションのために Dataflow が用意するトリガーは、キーごとに同一の調整済み時間に同期されますが、1 時間の遅延を保持することはありません。AfterCount
のデフォルト継続トリガーは、要素ごとに起動します。たとえば、入力コレクションに対するAfterCount(n)
は出力コレクションに対するAfterCount(1)
になります。
GroupByKey
または Combine.perKey
の PCollection
出力のために Dataflow が生成するトリガーが十分でないと思われる場合は、そのコレクションに対して新しいトリガーを明示的に設定する必要があります。
トリガーを組み合わせる
Dataflow では、複数のトリガーを組み合わせて複合トリガーを作成できます。Dataflow の複合トリガー システムを使用すると、複数のトリガーを論理的に組み合わせることができます。結果を繰り返し排出するトリガー、最大で 1 回だけ排出するトリガー、または他のカスタム条件に応じて排出するトリガーを指定することもできます。
複合トリガーのタイプ
Dataflow には次の複合トリガーが含まれています。
AfterWatermark.pastEndOfWindow
には早期起動または遅延起動を追加できます。Repeatedly.forever
は、永続的に実行するトリガーを指定します。トリガーの条件が満たされると常にウィンドウの結果を排出し、その後リセットして最初から開始します。Repeatedly.forever
を.orFinally
と組み合わせて、繰り返しトリガーが停止する条件を指定すると役立ちます。AfterEach.inOrder
は、複数のトリガーを特定のシーケンスで起動するように組み合わせます。シーケンスの 1 つのトリガーが起動するとウィンドウが排出され、シーケンス内の次のトリガーに進みます。AfterFirst
は複数のトリガーを取り、引数であるトリガーの「いずれか」が最初に満たされたときに排出します。これは、複数トリガーの論理 OR 演算と同じです。AfterAll
は複数のトリガーを取り、引数であるトリガー「すべて」が満たされたときに排出します。これは、複数トリガーの論理 AND 演算と同じです。orFinally
は、最後の条件として使用できます。どのトリガーも最後に 1 回だけ起動し、再び起動することはありません。
AfterWatermark.pastEndOfWindow との組み合わせ
特に有効な複合トリガーは、すべてのデータが届いたとシステムが推定したとき(つまり、ウォーターマークがウィンドウの境界を通過したとき)に 1 回起動するトリガーと、次のいずれかまたは両方を組み合わせたものです。
- 予測起動。ウォーターマークがウィンドウの境界を通過する前に起動して、部分的な結果を早く処理できます。
- 遅延起動。ウォーターマークがウィンドウの境界を通過した後に発生して、遅れて届いたデータを処理できます。
このパターンを AfterWatermark.pastEndOfWindow
を使用して表すことができます。たとえば、この後に示す例のトリガーコードは次の条件で起動します。
- すべてのデータが届いたとシステムが推測したとき(ウォーターマークがウィンドウの境界を通過するとき)。
- 10 分の遅延後、遅れたデータが届くたび。
- 2 日後。対象のデータがこれ以上届くことはないと仮定して、トリガーが実行を停止します。
Java
.apply(Window .triggering(AfterWatermark .pastEndOfWindow() .withLateFirings(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(10)))) .withAllowedLateness(Duration.standardDays(2)));
他の複合トリガー
他の種類の複合トリガーを構築することもできます。次のコードサンプルは、ペインの要素が 100 個以上になったときか 1 分経過するごとに起動する単純な複合トリガーです。
Java
Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast(100), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
トリガーの文法
次の文法は、トリガーを組み合わせて複合トリガーを作成するさまざまな方法を説明しています。
TRIGGER ::= ONCE_TRIGGER Repeatedly.forever(TRIGGER) TRIGGER.orFinally(ONCE_TRIGGER) AfterEach.inOrder(TRIGGER, TRIGGER, ...) ONCE_TRIGGER ::= TIME_TRIGGER WATERMARK_TRIGGER AfterPane.elementCountAtLeast(Integer) AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...) AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...) TIME_TRIGGER ::= AfterProcessingTime.pastFirstElementInPane() TIME_TRIGGER.alignedTo(Duration) TIME_TRIGGER.alignedTo(Duration, Instant) TIME_TRIGGER.plusDelayOf(Duration) TIME_TRIGGER.mappedBy(Instant -> Instant) WATERMARK_TRIGGER ::= AfterWatermark.pastEndOfWindow() WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER) WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER) Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())