何千もの財務時系列ストリームのリアルタイムでの相関

この記事では、Google Cloud Dataflow、Google の分散ストリーム処理テクノロジーを使用して、ほぼリアルタイムの分析システムを構築することについて説明します。このシステムでは、支払方法の少数の同時データ ストリームから何千もの同時データ ストリームまで、変更、管理、インフラストラクチャの作業なしに処理できます。

このソリューションでは、バッチ処理またはマイクロバッチ処理を必要とせずに、強整合性を持つ、正確で完全なデータを提供します。この実装では相関分析が使用されますが、工場で装置から信号を送信する IOT 端末など、他の用途に対して示す処理パターンを一般化することができます。

財務データを扱う際には、セキュリティは常に重要です。Google Cloud Platform は、データを安全かつプライバシーに配慮したいくつかの方法で保持するのに役立ちます。たとえば、伝送時および保存時にはすべてのデータが暗号化されます。Cloud Platform は ISO 27001SOC3、FINRA、PCI に準拠しています。

現在のデータ処理の大部分はバッチ処理であり、ある時点でフリーズされたデータのスナップショットに対して実行されます。スナップショットは一度にすべてが処理され、周期的に結果が生成され、多くのユースケースでうまく機能します。ただし、他のユースケースでは、データポイントとその対応するイベント時刻が計算に使用されるタプルになるストリームベースのソースによる正確ではあるもののレイテンシが低い結果が必要です。Cloud Dataflow などのツールを使用すると、迅速で正確な分析を簡単に行うことができるため、ストリーム処理のプリミティブを実装することなく、ビジネスロジックに集中できます。

このソリューションは、主要銀行が直面している課題に対処するユースケースを示しています。このユースケースでは、数千もの財務データのライブ ストリームをまとめて、ピアソン相関の計算を適用します。相関によって、2 つの時系列の関係を調べることができます。よくある例は、傘の販売とアイスクリームの販売との関係です。これらは互いにグラフ上にプロットされ、一般的に反対方向に移動します。大量の相関計算により、トレーダーは異常な相関や逆相関を特定できます。

ソリューションのコードは、GitHub https://github.com/GoogleCloudPlatform/data-timeseries-java で検索してください。

定義

この記事では以下の用語について説明します。

バッチ処理。 この用語の定義方法は、実装の詳細によって決まります。一部のシステムでは、基礎となるバッチ処理を公開しています。その場合、処理時間のウィンドウ処理を意味します。これは、この記事で説明しているイベント時間のウィンドウ処理ほど有益なバリアントではありません。

十分に柔軟なシステムがあれば、このソリューションで Cloud Dataflow を使用して実証されているように、イベント時間のウィンドウ処理の能力と真のストリーミング エンジンの低レイテンシを組み合わせることができます。

詳細な説明については、Tyler Akidau による優れたブログ「The World Beyond Batch Streaming 101」と「The World Beyond Batch Streaming 102」をご覧ください。

ウィンドウ処理。 この用語は、イベント時間、つまり実際に発生した時間のコンテキストで、要素を集約することを指します。真のストリーミング システムとマイクロバッチ システムの両方でウィンドウ処理を実行できますが、マイクロバッチ処理ではさらにレイテンシが発生します。

オペレーティング ウィンドウ。 この用語は、スライディング ウィンドウ内の単一のペインを指します。図 1 では、x 軸に 3 つの時系列、TS-1、TS-2、TS-3 への参照があります。y 軸は t0 から t5 で定義される個別の時間スライスを示します。オペレーティング ウィンドウ 01 およびオペレーティング ウィンドウ 02 は、スライディング ウィンドウ モードで異なるタイムスライスを対象とします。データ点 t2 から t3 は、オペレーティング ウィンドウ 01 とオペレーティング ウィンドウ 02 で共有されます。

図 1. オペレーティング ウィンドウ」という用語は、スライディング ウィンドウ内の単一のペインを指します。

ローソク。 この用語は、財務時系列を表すために使われるローソク足チャートから派生しました。各ローソクは、特定のタイムスライス内の次の集約に関する情報を保持します。

  • タイムスライス内の開始値、または直前のタイムスライスの終了値。
  • タイムスライス内の終了値。
  • タイムスライス内の最小値。
  • タイムスライス内の最大値。

このソリューションでは、各ローソクに使用されるタイムスライスの固定長を設定しません。ダウンストリームが必要な計算に応じて、長さを日または秒に設定できます。

ソリューションの主要段階

このソリューションは、2 つの主要段階でパイプラインを構築し、それぞれの段階に独自のサブ処理段階があります。

図 2. 段階 1 では、完全なデータ矩形を作成します。段階 2 では、相関関係を計算します。

段階 1: 完全なデータ矩形を作成する

ストリーム内のデータを使ってローソクを作りたいだけなら、簡単にできます。2 行のコードで設定できる Cloud Dataflow のウィンドウ処理機能では、タイムスライス内の各ローソクの openclosemin,max の値を検索を、推移集約と結合集約で処理できます。

時系列の難点は、タイムスライスごとに流動率、またはティックが異なることです。図 3 に示すように、一部のストリームには各タイムスライスにデータがあり、他のストリームにはデータポイントがありません。データポイントがタイムスライスから欠落している場合、相関関係を確実に構築することはできません。値がないということは資産に価値がないことを意味するものではなく、このタイムスライスで価格が更新されなかったということだけを意味します。

図 3.タイムスライスで更新されなかったデータは、欠落しているデータポイントとして表示されます。

ソリューションの処理を開始するには、まず完全な矩形のデータ、つまり欠落しているすべてのデータポイントが書き込まれた長方形を作成します。図 4 に示すように、タイムスライス内の各データポイントに対して値が計算されます。

図 4. 完全な矩形のデータには欠けているデータポイントはありません。

完全な矩形は多くのタイプの計算に役立ちます。このソリューションのサンプルでは、単一のデータフローで両方の段階が必要ですが、複数のパイプラインで各段階を使用できます。

段階 2: 相関を計算する

段階 1 の完全なデータ矩形を使用して、データを Pearson の相関ライブラリに読み込んで、目的の結果を得ることができます。この段階での課題は相関計算ではなく、大量のデータのシャッフルを強制する大規模なファンインとファンアウトです。

何千もの時系列がある場合、一意のペアの番号が ((n^2-n)/2) の順序になり、n は処理される時系列の数です。

次の図は、4 つの通貨値を比較したときに作成されるペアの数を示しています。GBP:AUD は、時間の経過にともなったイギリス スターリングとオーストラリア ドルの価格を提供する時系列を示します(中央の行は、通貨自体に対する修正には価値がないため、無視されます。同じ通貨ペアの相関を 2 回計算する必要はないため、緑色のブロックは無視されます)。

図 5. 4 つの通貨値を比較したときに作成されるペアの数。

X 値を受け取ると、次の 6 つの通貨ペアが生成されます。

[{GBP:EUR VS GBP:BRL},
{GBP:EUR VS EUR:AUD},
{GBP:EUR VS GBP:JPY},
{GBP:BRL VS EUR:AUD},
{GBP:BRL VS GBP:JPY},
{EUR:AUD VS GBP:JPY}]

Cloud Dataflow は分散処理システムであるため、シャッフルの問題に直面します。つまり、ファンアウトはファンアウトの前に行わなければなりません。オペレーティング システムごとのデータは多くのマシンに分散されています。

この時点で Cloud Pub/Sub を使用して 2 つのデータフローを接続することは良いアイデアです。これは技術的にも機能的にも可能ですが、RPC コールではコストとレイテンシを計算に組み込みます。このソリューションでは、いくつかの先進的な Cloud Dataflow パターンを使用して、このアクティビティのすべてを非常に低いレイテンシで実行します。

上記で概要を説明したシャッフルの問題の大きさについて詳しくは、付録 A: シャッフルされたバイトの計算をご覧ください。

ソリューションのしくみを確認する

このセクションでは、各段階とソリューション コードの実行について詳しく説明します。

完全なデータ矩形を作成する

完全なデータ矩形を作成するには、次の 2 段階のプロセスを実行します。

  • データストリームを転送してマージする。
  • ローソクを作成する。

データストリームを転送してマージする

社内外およびサードパーティのプロバイダの多くのデータソースは、単一のシステムを介してデータ処理段階に入れる必要があります。

次の表は、この記事で使用される単純化されたデータ構造を示しています。リアル ティック データにはさらに多くのプロパティが含まれています。

タイプ特性
Longタイムスタンプ
文字列キー
Double価格

データストリームは Cloud Pub/Sub で使用され、1 つのトピックで毎秒何百万というメッセージを処理し、データを Cloud Dataflow にダウンストリームで転送できます。Cloud Pub/Sub では、最大 7 日間のデータを保持し、サブスクライバーがメッセージの受信を確認した後にアクティブなサブスクリプションからのデータだけを削除します。Cloud Pub/Sub はシステムの緩衝器として機能し、市場の予期しない大規模なスパイクに対処します。

ID とアクセス管理コントロールを使用すると、まずデータ プロバイダが所有するプロジェクトにトピックを作成し、そのトピックに対するサブスクリプションを個人、グループ、またはサービスアカウントに対して作成する機能を付与できます。

ローソクを作成する

最初にローソクを作成します。これは次の 3 段階のプロセスです。

  • 不足している値を無視する初期集約オブジェクトを作成する。
  • タイムスライスの値が欠落しているストリームのプレースホルダ ティックを生成する。
  • プレースホルダ ティックに前のタイムスライスの値を入力する。
初期集約オブジェクトを作成する

この変換の高度な手順は次のとおりです。

  • すべてのトピックから読み取る。
  • イベント時間に基づいてローソクサイズにウィンドウ処理する。
  • キーごとにグループ化する。たとえば、GBP:EUR, GBP:BRL, EUR:AUD, GBP:JPY
  • CloseMin/Max を集約する。

最初の 3 段階では、Cloud Dataflow ですべての実装作業が行われるため、比較的少数のコード行しか必要とされません。このコードでは、次のデータフロー プリミティブを使用しています。

変換の段階 4 に使用できるメソッドがいくつかあります。集約を実行するときは、集約中にカスタム ビジネス ロジックを適用する必要がない限り、すぐに使える機能を使用することをおすすめします。集約には、MinMax に加えて、Close 値を特定する最高のタイムスタンプがある要素が含まれます。

Cloud Dataflow ウォーターマークを使用すると、Cloud Pub/Sub に push されたデータに基づいて、各タイムスライスの CloseMin、および Max の値でローソクを修正したことを確信できます。次のステップでは、Open 値を計算して、このタイムスライスでオンになっていない時系列を処理する方法を説明します。

欠損値を検出し、プレースホルダ データ ポイントを生成する

このステップでは、Cloud Dataflow コードを 2 つのブランチに分割して、図 3 に示すギャップを埋めます。ブランチ 1 には、ライブ値のローソクが含まれています。ブランチ 2 では、すべてのライブ値を結合して、このタイムスライス内の一意のキーのリストを見つけます。ブランチ 2 をすべての期待値のリストと比較して、このタイムスライスのすべての欠落データポイントが含まれるセットを取得します。このリストを使用して、ダミーローソクをストリームに挿入します。

プレースホルダのデータポイントにデータを入力する

これまでに、ライブ値とダミー値の両方が含まれる完全な矩形のデータが得られました。ライブローソクには Open 値が入力されていませんが、ダミー値にはタイムスタンプ値以外の値は含まれません。このステップでは、前回のタイムスライスの Close 値によって、現在のライブローソクおよびダミーローソクの Open 値が入力されます。この時点では、ダミーローソクに Open 値があるため、次のアクションは Open 値をダミーローソクの CloseMin/Max に伝搬することです。

BigQuery テーブルとしてのデータソース
図 6. TS-4 を除くすべてが完了しました。

図 6 に示すように、完全な矩形が完成しました。タイムスライス t0 から t1 でデータフローが開始されたときに値がなかった TS-4 は例外です。これはブート ストラップの問題です。データが格納されている場所に応じて、外部のストレージ システム(Cloud Bigtable など)を呼び出して、データフローを開始する前に最後の値を取得する必要があります。

データの矩形が完成したので、処理は段階 2 に移ります。

このようなデータの矩形の作成は、相関分析に役立つだけでなく、完全な矩形に基づく完全な値のセットが必要な場合はいつでも、Cloud Dataflow で計算値をストレージ(たとえば、BigQuery や Cloud Bigtable)に push して分岐することによって値を格納できます。

相関関係を計算する

相関関係の計算は、3つ のステップからなるプロセスです。

  • オペレーティング ウィンドウを作成する。
  • 相関関係を計算する。
  • 結果を公開して保存する。

オペレーティング ウィンドウを作成する

段階 1 では、多くのホストにデータを配置しました。ローソクからデータ配列を作成するには、最初に各オペレーティング ウィンドウのすべてのキーからすべてのローソクを 1 つの場所に集める必要があります。次に、大規模なデータがファンアウトする前に、各オペレーティング ウィンドウのデータ パッケージを複数の場所にコピーする必要があります。

まず、Cloud Dataflow ウィンドウプ リミティブを使用して、段階 1 で生成された値の移動ウィンドウを作成します。これは 1 行のコードのみで実行します。

 Window.into(SlidingWindows.of())

このオペレーティング ウィンドウの値は多くのホストに分散していますが、相関関係のために、すべてのデータを 1 つの場所にファンインする必要があります。

ファンアウト中に生成されるデータ量と管理オーバーヘッドは、オペレーティング ウィンドウ自体に含まれるデータよりもはるかに大きくなります。ファンアウトの前に、各オペレーティング ウィンドウの複数のコピーを配布して、複数のホストが処理を実行できるようにする必要があります。このために、Cloud Dataflow のウィンドウ付き SideInputs を使用して、キーのペアのサブセットを処理しているマシンにデータの複数のコピーを送信します。

これで、図 7 に示すように、複数のホストでのファンアウトに必要なすべてのデータの完全コピーが作成されました。

複数のホストへのファンアウトの処理
図 7. 複数のホストへのファンアウトの処理。

次に、すべての相関関係のデータタプルを生成する変換を記述します。

相関関係を計算する

この段階では、大変な作業が行われます。作業パケットには、このオペレーティング ウィンドウの相関関係を計算するために必要なすべてのデータが含まれています。各作業パケットは完全な矩形です。相関計算の両側に等しい数の値が含まれ、計算を実行するために相関アルゴリズム ライブラリに渡すことができます。次に、自社のビジネスにとって興味深い計算を選択する必要があります。このデータを視覚表現に使用する場合は、指定した ABS(correlation) 値を超える相関関係のみが含まれるように、プッシュ転送されるデータの量を減らすことがあります。または、すべての値を配信システムおよびストレージ システムに push することもできます。これは、生成できる (n^n-n)/2 計算に対応するためのコストの単純な関数です。

結果を公開して格納する

この段階での要件は、データをダウンストリームにする使用によって異なります。結果は 3 つの場所に送信できます。これらの場所はすべてオプションですが、相互に排他的ではありません。

  • Cloud Pub/Sub。 取り込みを実行するだけでなく、Cloud Pub/Sub は疎結合システム間のような役割も果たします。処理されたデータを他のシステムに送信して使用することができます。たとえば、ABS(0.2) 値を上回るすべての相関関係を他のシステムに送信する場合があります。
  • BigQuery。 処理するデータや後で SQL インターフェースを使用してアクセスするデータを BigQuery に配置します。
  • Cloud Bigtable。低レイテンシのストレージや、Cloud Bigtable で大規模なデータセットの非常に小さなサブセット(キーのルックアップや範囲スキャン)を迅速に取得する場合に使用するデータを配置します。

次のステップ

付録: シャッフルされたバイトの計算

この付録では、説明を簡単にするために 5 つの時系列値を使用しています。

オペレーティング ウィンドウが 10 分で、長さが 1 分のローソクを使用すると仮定します。この場合、各相関関係について、時系列ごとに 10 個のデータポイントが必要です。説明を簡単にするために、各データポイントが 1 バイトであると仮定します。これにより、キーごとに 10 * 1 = 10 バイトとなります。5 つの時系列があり、オペレーティング ウィンドウごとに 5 * 10 = 50 バイトとなります。

シナリオ 1: ファンイン直後のファンアウト

ファンアウト中にいくつの値が作成されますか?5 つしかないので、次のようになります。

{1-2, 1-3, 1-4, 1-5, 2-3, 2-4, 2-5, 3-4, 3-5, 4-5} = 10 ペアまたは ((n^2-n)/2)。n は時系列の数、(5^2 - 5)/2 = 10

各ペアに 10 * 2 = 20 バイトのデータがあり、10 個のペアがあるため、システムの周囲をシャッフルする 200 バイトのデータがあります。

悪くはありませんが、1,000 個の時系列で同じ計算を行うと、(1,000^1,000-1,000)/2 = 499,500 となり、499,500 * 10 * 2 = 9,990,000 バイトをシャッフルすることになります。

ペア単位のファンアウトを使用せずにオペレーティング ウィンドウ内のデータを見ると、1,000 * 10 = 1,000 バイトしか移動しません。オペレーティング ウィンドウを複数のパーティションにコピーし、それらのパーティションで相関関係を作成する作業を行うと効率的です。