ストリーミング パイプライン

制限なし PCollection、または制限なしコレクションにより、ストリーミング パイプラインにおけるデータが表現されます。制限なしコレクションには、Pub/Sub など、継続的に更新されるデータソースのデータが含まれます。

鍵のみを使用して、制限なしコレクション内の要素をグループ化することはできません。データソースには常に新しい要素が追加されるため、ストリーミング データのキーは無限に多くなる可能性があります。ウインドウウォーターマークトリガーを使用して制限なしコレクションの要素を集約できます。

ウィンドウとウィンドウ処理関数

ウィンドウ関数は、制限なしコレクションを論理的な要素、つまりウィンドウに分割します。ウィンドウ関数は、個々の要素のタイムスタンプで制限なしコレクションをグループ化します。各ウィンドウには一定数の要素が入ります。

Apache Beam SDK または Dataflow SQL ストリーミング拡張機能で次のウィンドウを設定します。

タンブリング ウィンドウ

タンブリング ウィンドウとは、データ ストリームを重なりなく分ける一定の時間間隔を表します。

たとえば、30 秒のタンブリング ウィンドウに設定すると、タイムスタンプ値が [0:00:00-0:00:30] の要素が最初のウィンドウに表示されます。2 番目のウィンドウには、[0:00:30-0:01:00] のタイムスタンプ値を持つ要素が表示されます。

次の図は、30 秒のタンブリング ウィンドウに分割された要素を表しています。

期間が 30 秒のタンブリング ウィンドウを表す画像

ホッピング ウィンドウ

ホッピング ウィンドウとは、データ ストリーム内の一定の時間間隔を表します。タンブリング ウインドウは重なりませんが、ホッピング ウインドウは重なることがあります。

たとえば、ホッピング ウィンドウが 10 秒ごとに開始し、1 分間のデータとウィンドウを持つ場合があります。ホッピング ウィンドウの開始間隔はピリオドといいます。この例では、1 分間のウィンドウと 30 秒のピリオドが設定されています。

次の図は、30 秒のピリオドを持つ 1 分間のホッピング ウインドウに分割された要素を表しています。

ウィンドウ期間が 1 分、ウィンドウ ピリオドが 30 秒に設定されたホッピング ウィンドウを表す画像

平均データを取得するには、ホッピング ウィンドウを使用します。30 秒の期間と 1 分のホッピング ウィンドウを使用して、30 秒ごとに平均 1 分間の実行を計算できます。

セッション ウィンドウ

セッション ウィンドウには、別の要素とのギャップ期間に存在する複数の要素が含まれます。ギャップ期間とは、データ ストリームの新しいデータの間隔を表します。ギャップ期間の後にデータを取得すると、そのデータには新しいウィンドウが割り当てられます。

たとえば、セッション ウィンドウでは、ユーザーのマウスの操作を表すデータ ストリームを分割できます。このデータ ストリームでは、長時間アイドル状態が続き、クリックが多い期間が点在します。セッション ウィンドウには、クリックで生成されたデータを含めることができます。

セッション ウィンドウは、各データキーに異なるウィンドウを割り当てます。タンブル ウィンドウとホッピング ウィンドウには、データキーに関係なく、指定された期間内のすべての要素が含まれます。

次の図は、セッション ウィンドウに分割された要素を表しています。

最小ギャップ期間が設定されたセッション ウィンドウを表す画像

ウォーターマーク

ウォーターマークとは、Dataflow でウィンドウのすべてのデータが必要になるしきい値を表します。新しく受信したデータのタイムスタンプがウォーターマークより古い場合、データは遅延データとみなされます。

Dataflow では、次の理由からウォーターマークを追跡します。

  • データを時間順や予測可能な間隔で受信できるとは限りません。
  • データイベントは、生成時と同じ順序でパイプラインに提供されるとは限りません。

データソースによってウォーターマークが決まります。Apache Beam SDK では、遅延データを許可できます。Dataflow SQL は遅延データを処理しません。

トリガー

トリガーは、データが到着したときに集計結果をいつ出力するかを決定します。デフォルトでは、ウォーターマークがウィンドウ末尾を過ぎると結果が出力されます。

Apache Beam SDK を使用すると、ストリーミング パイプラインの各コレクションのトリガーを作成または変更できます。Dataflow SQL でトリガーを設定することはできません。

Apache Beam SDK では、次の条件を組み合わせて動作するトリガーを設定できます。

  • 各データ要素のタイムスタンプで表されるイベントの時刻。
  • パイプラインの任意のステージでデータ要素が処理される時間。
  • コレクション内のデータ要素の数。