ストリーミング時系列データの処理: 概要

このドキュメントでは、Apache Beam の使用時にストリーミング時系列データを処理する際の主な課題について概説し、このような課題に対処するために、Timeseries Streaming ソリューションの Java ライブラリで使用されているメソッドについて説明します。また、Timelines Streaming で処理されるデータが、機械学習(ML)モデルのトレーニングと予測の一般的なユースケースにどのように適しているかについても説明します。

このドキュメントはシリーズの一部です。

  • ストリーミング時系列データの処理: 概要(このドキュメント)。
  • ストリーミング時系列データの処理: チュートリアル。Timeseries Streaming ソリューションの使い方を説明します。まず、Timeseries Streaming Java ライブラリを使用して時系列データを処理し、次に Timeseries Streaming Python ライブラリを使用してそのデータの予測を取得します。

このドキュメントは、デベロッパーとデータ エンジニアを対象としており、次の知識があることを前提としています。

  • Java プログラミングの中級以上の知識
  • Apache Beam に関する高度な知識
  • ML モデルの開発と使用に関する基本的な知識

時系列データ

時系列は、時間順に並んだ一連のデータポイントです。たとえば、株式取引やモーション カメラのスナップショットなどです。各データポイントは、キーと 1 つ以上の値のペアで表されます。たとえば、銘柄記号がキーで、その時点の株価が値になります。

データポイントが発生すると、データポイントを一定の幅の時間間隔に分割し、それによってその間隔でイベントを表す統計情報を作成できます。このプロセスをウィンドウ処理といいます。たとえば株価であれば、1 分などの時間枠で始値、終値、安値、高値、平均などの分析値や合計値を取得できます。このような指標の計算結果を複雑な指標の構成要素として使用することもできます。

時系列には、複数の関連データ(多変量)または 1 つのデータ(単変量)を含めることができます。たとえば、呼び値と入札価格の両方を含む時系列の株式トランザクションは多変量であり、モノのインターネット(IoT)のセンサーから読み込む温度の時系列データは単変量です。

ストリーミング時系列データの処理における課題

Apache Beam を使用したストリーミング時系列データの処理は、次の制限事項があるため難易度が高くなる可能性があります。

  • ウィンドウ処理を使用する場合、Apache Beam には時間枠間で状態を引き継ぐためのデフォルトのメカニズムはありません。
  • Apache Beam などの分散処理環境では、次に挙げる理由により複数の時間枠でデータの順序を維持することは困難です。

    • 注文する必要があるデータにはすべてアクセスする必要があります。そのデータはワーカー間で分散されている可能性があります。
    • 順序付けを行うには、通常は大規模なデータセットを並べ替える必要があり、リソースを大量に消費する可能性があります。

このような制限は、次のセクションで説明する 2 つの問題の原因となります。

データの欠落を埋める

Apache Beam でストリーミングの時系列データを処理する際に現在の制限によって発生する最初の問題は、最新の既知の値を使用して自動的にデータの欠落を埋められないことです。

次の IoT センサーデータの図に示すように、理想的にはアプリケーションに渡される時系列データには、各時間枠の各キーに存在する値に欠落はありません。

欠損値のない理想的な時系列データを示す図。

ただし、実際には、接続の中断や、センサーの故障によって、欠落が生じる場合があります。この場合、次の図に示すようなデータになります。

欠損値のある典型的な時系列データを示す図。

その結果、機械学習やその他の多くのユースケースでのデータ準備で重要な部分は、指定されたキーに対して null 値を適切な値に置き換えて、このキーの計算などのオペレーションが失敗しないようにすることです。

複数の期間での指標の計算

ストリーミングの時系列データを処理する際に現在の制限によって生じる 2 番目の問題は、Apache Beam で複数の期間をカバーするローリング ウィンドウに依存する指標を計算できないことです。

時系列データから生成する複雑な指標の多くでは、複数の時間枠のデータが必要となるため、データの順序が重要になります。たとえば、1 つ以上の前の期間の状態に依存している計算を行う場合(ある期間と次の期間の値で絶対的な相違を判定することやローリング移動平均の計算など)です。

Timeseries Streaming 参照アプリケーション

Timeseries Streaming 参照アプリケーションでは、ストリーミング時系列データを処理する際に現在の Apache Beam の制限を回避する方法を示し、時間枠全体でデータの欠落を埋め、指標計算を実行できます。Timeseries Streaming 参照アプリケーションには、一連の変換用の Java ライブラリが含まれています。このライブラリを使用すると、時系列データをこうした方法で処理する Apache Beam パイプラインの開発を簡素化できます。

Timeseries Streaming には、ML モデルからデータの予測を取得できるようにする Python ライブラリも含まれています。ML モデルは、処理済みの時系列データの一般的なユースケースです。具体的には、Keras ソースコードと長短期記憶(LSTM)モデル(異常検出に特に適したモデル)のトレーニング済みバージョンの両方を提供します。

参照を実装する際に ML モデルを選択するかどうかは重要ではありません。ML モデルは、主としてストリーミングの時系列データの処理を最適化することに焦点を合わせているからです。LSTM 異常検出モデルは、結果を解釈するために特定のビジネス知識を必要としないため選ばれました。

データ処理の設計

Timeseries Streaming ソリューションでは、Apache Beam の 3 つの機能を使用して、時系列データを処理する際の課題で説明したデータ処理の課題に対処する方法を紹介します。

まず、Timeseries Streaming はトリガーを使用して、一定の時間枠ごとに集計結果を取得し、それらを単一のグローバル ウィンドウで処理できるようにします。単一グローバル ウィンドウはほぼ無限であるため、そこで集計された結果は一定の時間枠全体でずっと利用可能です。これにより、データの欠落を埋めると同時に、複数の時間枠のデータを必要とする指標を計算することも可能となります。

ただし、集計結果をグローバル ウィンドウに移動すると、データの順序は保証されません。データは通常の状況では受信した順に処理されますが、システムの中断(たとえば、ウォーターマークを阻害するもの)によって順序がばらばらになる可能性があります。これに対処するため、このソリューションではループタイマーを使用します。Timer API コントラクトにより、タイマーが順番に起動することが保証され、集計結果の処理方法の順序を指定できます。

TimeSeries Streaming 用のデータ処理機能は、TimeSeriesPipeline ライブラリに実装されています。

検討した別のアプローチ

代替値を提供するパイプラインの外部にハートビート メッセージを生成してデータの欠落を埋める、という別のアプローチもあります。しかし、この方法には適用が複雑になる問題がいくつかあります。

  • ハートビート メッセージの値は、処理中のすべての時系列について、すべてのパイプライン ワーカーにファンアウトされる必要がありますが、その数が多くなる可能性があります。アプリケーションでは、このようなメッセージの取り込みと送信を処理する必要がありますが、そのデータをワーカーに配布する際にボトルネックが生じる可能性があり、その場合はそれに対処することも必要です。
  • データで使用されるキーは変わる可能性があります。ストリーム内のデータの変化に合わせて新しいキーが表示され、既存のキーが削除されることがあります。アプリケーションは、時系列データを更新するためにキーリストを維持します。たとえば、各時間枠のデータから DISTINCT(Key) 値を取得します。
  • ハートビート メッセージの要件は、開発、モニタリング、管理が必要な別のコンポーネントをアプリケーションに追加する必要があることを意味します。

パイプライン内部のハートビート メッセージを作成すると、これらの問題の最初と 3 番目を回避できます。これは、GenerateSequence を使用することで可能となります。ただし、データにおけるキーの経時的な変化を処理する必要もあります。

対処されたデータ欠落のタイプ

ストリーム データの連続性には 4 つの一般的なパターンがあり、そのうち 3 つは Timeseries Streaming ソリューションで処理します。パターンには次のものがあります。

  • パターン 1: すべての時間枠で特定のキーのデータがストリーム内に存在する。欠落を埋める必要はありません。

    欠損値のない時系列データ。

  • パターン 2: 特定のキーについて一部の時間枠にデータポイントがない。あるいは、そのキーに対してデータが断続的に供給され続ける。

    断続的な値を持つ時系列データ。

    Timeseries Streaming ではこのシナリオを処理するために、ループタイマーを使用してその時間枠のエントリ ポイントを作成し、置換値を提供します。

  • パターン 3: システムでデータ ストリームの読み取りが開始されるブートストラップの段階で、特定のキーのデータがない。

    システム起動時にデータがない時系列。

    Timeseries Streaming ではこのシナリオを扱いません。ストリーミング パイプラインの開始時にキーにデータがない場合、システムはキーの存在を認識できず、ループタイマーは開始されません。

  • パターン 4: ある時点から、データが特定のキーに到着しなくなる。たとえば、信号を送信していた IoT デバイスが取り外された場合など。

    キーに対する値が到着しない時系列。

    Timeseries Streaming では、構成可能な有効期間を設定することで、このシナリオに対応します。有効期間を設定することで、指定した時間内にデータを受信しないキーのループタイマーを停止するようにアプリケーションに指示します。

含まれる指標

Timeseries Streaming ソリューションには、次の指標が含まれます。

指標 説明
単純移動平均 データの長期的な傾向を特定するために使用します。
指数移動平均 データの短期的な傾向を特定するために使用します。
標準偏差 データポイントのばらつきを把握し、リスクや変動を評価するために使用します。
ボリンジャー バンド 相対的な価格の変動を把握するために使用します。
相対強度インデックス 過去と現在の価格の傾向を把握するために使用します。

これらの指標は TimeSeriesMetricsLibrary で実装されます。

ソリューションに他の指標を追加するには、ストリームの新しい指標の作成の手順を行います。

機械学習の設計

ストリーミング時系列データに機械学習を適用すると、複雑になる場合があります。通常、データは正しい形状になっている必要があり、欠落があってはなりません。トレーニングに使用するデータと、予測のために送信されたデータを同じ方法で処理することが、データ コンテンツと形状の一貫性の維持に役立ち、モデルのパフォーマンス向上につながります。

Timeseries Streaming ソリューションでは、指定された LSTM モデルは Timeseries Streaming Java ライブラリで処理された時系列データでトレーニングされています。コードを実行して予測を取得すると、使用するサンプルデータが同じ Java ライブラリで処理されます。これによって、データの形状が正しく、欠落がないことが保証されます。

また、処理済みの出力の形状がわかっていて、特徴量の抽出がデータ処理ステップの一環として行われるため、TFX パイプラインで使用された LSTM の例は、他のモデルに置き換えることが可能です。ただしこれは、同じ形状のデータを使用している場合に限ります。必要に応じて、Java ライブラリを使用して独自のデータを処理し、それを使用して独自のモデルをトレーニングすると、それをこの方法で使用できます。

Timeseries Streaming ソリューションでは、TFX Basic 共有ライブラリRunInference 変換を利用することで、インストリーム予測を実行するのに必要なコードが簡素化されます。この変換では、model_spec.proto の構成設定を変更して、ローカルモデルではなく、リモートでホストされるモデルをターゲットにすることもできます。

ワークフロー

Timeseries Streaming ソリューションでは、次のワークフローに従います。

  1. 時系列データを読み取ります。データは、Timeseries Streaming で使用するプロトコル バッファ形式にする必要があります。プロトコル バッファ形式は TS.proto ライブラリに実装されています。

    プロトコル バッファ形式を使用すると、次のメリットがあります。

    • パイプライン内で使用される時系列オブジェクトもパイプラインの外部に存在します。これにより、時系列データを他のシステムに統合できます。
    • Integer、Float、Double、String の各データ型を任意に組み合わせた時系列データが提供されるため、Timeseries Streaming では変換せずに処理できます。
  2. データの欠落を埋め、指標を計算する処理を実行します。

  3. データを 1 つまたは複数の形式で出力して、アプリケーションで使用できるようにします。SimpleDataStreamGenerator.java ライブラリを使用すると、処理済みのデータを次の形式で出力できます。

    • ログファイル。
    • TF.Record ファイル。この形式の一般的な用途は、TensorFlow モデルからバッチ予測をリクエストする場合です。
    • BigQuery テーブルの行。
    • Pub/Sub トピックのメッセージ。この形式の一般的な用途は、モデルからインストリーム予測をリクエストする場合です。
  4. 提供された LSTM モデルを使用して、処理済みの時系列データの予測を取得します。TF.Record 形式に出力されたデータに対するバッチ予測を取得できます。また、Pub/Sub トピックに公開されたデータのインストリーム予測を取得することもできます。

次のステップ