Korrelation Tausender Zeitreihen-Finanzdatenströme in Echtzeit

In diesem Artikel wird die Verwendung von Dataflow, der verteilten Streamverarbeitungstechnologie von Google, zum Aufbau eines echtzeitnahen Analysesystems beschrieben, das von wenigen simultanen Datenströmen bis zu Tausenden simultanen Datenströmen von Finanzinstrumenten ohne Änderung, Verwaltung oder Infrastrukturaufwand skaliert werden kann.

Die Lösung bietet sehr konsistente, genaue und vollständige Daten, ohne dass eine Batch- oder Mikro-Batchverarbeitung notwendig ist. Obwohl diese Implementierung Korrelationsanalysen nutzt, können Sie die Verarbeitungsmuster, die für andere Anwendungen demonstriert werden, verallgemeinern, wie etwa IOT-Geräte, die Signale von den Geräten in einem Werk senden.

Sicherheit ist immer ein wichtiger Aspekt bei der Arbeit mit Finanzdaten. Google Cloud hilft Ihnen dabei, Ihre Daten auf verschiedene Arten sicher, geschützt und privat zu halten. So sind beispielsweise alle Daten bei der Übertragung und bei Inaktivität verschlüsselt. Google Cloud erfüllt alle Anforderungen von ISO 27001, SOC3, FINRA und PCI.

Beim Großteil der heutigen Datenverarbeitung handelt es sich um eine Batchverarbeitung, die für einen Snapshot von Daten zu einem bestimmten Zeitpunkt ausgeführt wird. Snapshots werden auf einmal verarbeitet und die Ergebnisse in Zyklen generiert, was bei vielen Anwendungsfällen sehr gut funktioniert. Andere Anwendungsfälle erfordern jedoch genaue Ergebnisse aus streambasierten Quellen, aber mit geringer Latenz, wobei Datenpunkte und ihre entsprechenden Ereigniszeiten zu einem in einer Berechnung zu nutzenden Tupel werden. Tools wie Dataflow ermöglichen eine schnelle und korrekte Analyse, wodurch Sie sich auf Ihre Geschäftslogik konzentrieren können, anstatt die Grundfunktionen der Streamverarbeitung zu implementieren.

Diese Lösung illustriert einen Anwendungsfall, der sich mit den Herausforderungen einer führenden Bank beschäftigt. Der Anwendungsfall führt mehrere tausend Live-Streams an Finanzdaten zusammen, um Pearson-Korrelationsberechnungen anzuwenden. Korrelationen ermöglichen es Ihnen, die Beziehung zwischen zwei Zeitreihen zu erkunden. Ein häufig genanntes Beispiel ist die Beziehung zwischen dem Verkauf von Regenschirmen und dem Verkauf von Speiseeis. Stellt man beide in einer Grafik dar, bewegen sie sich für gewöhnlich in entgegengesetzte Richtungen. Massenkorrelationsberechnungen erlauben es Händlern, ungewöhnliche oder umgekehrte Korrelationen zu erkennen, gegen die sie handeln können.

Den Code für die Lösung finden Sie auf GitHub unter https://github.com/GoogleCloudPlatform/data-timeseries-java.

Definitionen

Dieser Artikel stellt die folgenden Begriffe vor:

Batching

Wie dieser Begriff definiert wird, ist abhängig von den Implementierungsdetails. Einige Systeme geben Batchanfragen frei, was als Windowing bezeichnet wird. Hierbei handelt es sich um Verarbeitungszeit-Windowing, eine semantisch weniger nützliche Variante als das Ereigniszeit-Windowing, das in diesem Artikel vorgestellt wird.

Bei einem ausreichend flexiblen System können Sie die semantische Wirkung des Ereigniszeit-Windowings mit der geringen Latenz einer echten Streaming Engine kombinieren, so wie es in dieser Lösung anhand von Dataflow demonstriert wird.

Eine detaillierte Erläuterung finden Sie in den ausgezeichneten Blogs von Tyler Akidau: The World Beyond Batch Streaming 101 und The World Beyond Batch Streaming 102.

Windowing

Dieser Begriff bezieht sich auf das Aggregieren von Elementen im Kontext ihrer Ereigniszeiten, sprich, wenn sie tatsächlich auftreten. Sowohl echte Streaming- als auch Mikro-Batchanfragesysteme können das Windowing ausführen, obwohl das Mikro-Batching eine zusätzliche Latenz mit sich bringt.

Bedienfenster

Dieser Begriff bezieht sich, wie in der folgenden Abbildung dargestellt, auf einen einzelnen Bereich innerhalb eines gleitenden Fensters.

Diagramm mit Zeitachsen und Objektfenstern
Abbildung 1. Der Begriff Bedienfenster bezieht sich auf einen einzelnen Bereich innerhalb eines gleitenden Fensters.

In Abbildung 1 enthält die x-Achse Bezüge auf drei Zeitreihen, TS-1, TS-2 und TS-3. Die y-Achse zeigt die von t0 bis t5 definierten diskreten Zeitabschnitte. Die Bedienfenster 01 und 02 decken in einem gleitenden Fenstermodus unterschiedliche Zeitabschnitte ab. Die Datenpunkte t2 bis t3 werden zwischen den Bedienfenstern 01 und 02 geteilt.

Kerze

Dieser Begriff wird aus den Kerzencharts abgeleitet, die oft zum Darstellen von Finanzzeitachsen verwendet werden. Jede Kerze enthält Informationen für die folgenden Aggregationen innerhalb eines gegebenen Zeitabschnitts:

  • Der Eröffnungswert innerhalb eines Zeitabschnitts oder der Abschlusswert des vorherigen Zeitabschnitts.
  • Der Abschlusswert innerhalb eines Zeitabschnitts.
  • Der Mindestwert innerhalb eines Zeitabschnitts.
  • Der Maximalwert innerhalb eines Zeitabschnitts.

Diese Lösung legt keine feste Länge für den Zeitabschnitt jeder Kerze fest. Sie können die Länge auf Tage oder Sekunden festlegen, abhängig von den von Ihnen benötigten nachgelagerten Berechnungen.

Schlüsselphasen der Lösung

Diese Lösung baut eine Pipeline in zwei Schlüsselphasen auf, jede mit ihren eigenen untergeordneten Verarbeitungsstufen.

Das folgende Diagramm zeigt die beiden Phasen der Pipeline.

Die Architektur der Pipeline ist in zwei Phasen mit jeweils zwei Schritten unterteilt.
Abbildung 2. In Phase 1 können Sie perfekte Datenrechtecke erstellen. In Phase 2 wird die Korrelation berechnet.

Phase 1: Perfekte Datenrechtecke erstellen

Es wäre einfach, Kerzen nur mit den Daten im Stream zu erstellen. Die Windowing-Funktionen von Dataflow, die sich mit nur wenigen Codezeilen einrichten lassen, können transitive und assoziative Aggregationen für das Auffinden von open-, close-, min- und max-Werten für die einzelnen Kerzen innerhalb eines Abschnitts verwenden.

Die Schwierigkeit besteht darin, dass die Zeitachsen, wie im folgenden Diagramm dargestellt, in jedem Zeitabschnitt unterschiedliche Liquiditätsraten (Ticks) haben.

Diagramm mit Zeitachsen unterschiedlicher Liquidität in jedem Zeitabschnitt.
Abbildung 3. Daten, die während des Zeitabschnitts nicht aktualisiert wurden, werden als fehlende Datenpunkte angezeigt.

Wie in Abbildung 3 zu sehen ist, haben einige Streams Daten in jedem Abschnitt, während bei anderen Datenpunkte fehlen. Wenn Datenpunkte aus einem Zeitabschnitt fehlen, können Sie nicht zuverlässig Korrelationen aufbauen. Das Fehlen eines Werts bedeutet nicht, dass das Asset keinen Preis hat, nur, dass der Preis während dieses Zeitabschnitts nicht aktualisiert wurde.

Wenn Sie mit der Arbeit an der Lösung beginnen, erstellen Sie, wie in der folgenden Abbildung dargestellt, zuerst perfekte Datenrechtecke, das heißt, Rechtecke, in denen alle fehlenden Datenpunkte ausgefüllt sind.

Datenpunkt in jedem Zeitabschnitt.
Abbildung 4. Ein perfektes Datenrechteck hat keine fehlenden Datenpunkte.

Wie in Abbildung 4 dargestellt wird für jeden der Datenpunkte im Zeitabschnitt ein Wert berechnet.

Perfekte Rechtecke eignen sich für viele Berechnungstypen. Obwohl das Beispiel in dieser Lösung beide Phasen innerhalb eines einzigen Datenflusses erfordert, kann jede Phase in verschiedenen Pipelines verwendet werden.

Phase 2: Korrelationen berechnen

Mithilfe der vollständigen Datenrechtecke aus Phase 1 können Sie die Daten in eine Pearson-Korrelationsbibliothek einspeisen, um die gewünschten Ergebnisse zu erhalten. Die Herausforderung in dieser Phase ist nicht die Korrelationsberechnung, sondern das große Fan-In und Fan-Out, welches das Verschieben großer Datenmengen erfordert.

Bei Tausenden von Zeitachsen hat die Anzahl eindeutiger Paare die Reihenfolge ((n^2-n)/2), wobei n für die Anzahl der verarbeiteten Zeitachsen steht.

Die folgende Abbildung zeigt die Anzahl der Paare, die beim Vergleichen von vier Währungswerten erstellt werden.

Diagramm, das vier Währungswerte vergleicht.
Abbildung 5. Anzahl der Paare, die beim Vergleichen von vier Währungswerten erstellt werden.

GBP:AUD gibt die Zeitachse an, die die Wechselkurse von britischem Pfund und australischem Dollar im Laufe der Zeit gegenüberstellt. (Die mittlere Zeile wird ignoriert, da eine Währungskorrektur gegen sich selbst keinen Wert hat. Die grünen Blöcke werden ignoriert, da man die Korrelation der gleichen Währungspaare nicht zweimal berechnen muss.)

Die Aufnahme von X-Werten generiert die folgenden sechs Währungspaare:

[{GBP:EUR VS GBP:BRL},
{GBP:EUR VS EUR:AUD},
{GBP:EUR VS GBP:JPY},
{GBP:BRL VS EUR:AUD},
{GBP:BRL VS GBP:JPY},
{EUR:AUD VS GBP:JPY}]

Da es sich bei Dataflow um ein verteiltes Verarbeitungssystem handelt, ergibt sich nun ein Sortierungsproblem. Das bedeutet: Ein Fan-In muss vor einem Fan-Out auftreten. Die Daten pro Betriebssystem werden auf viele Maschinen verteilt.

Es mag vorteilhaft erscheinen, Pub/Sub an dieser Stelle zum Verbinden von zwei Datenflüssen zu verwenden. Obwohl dies technisch und funktional möglich ist, würden die RPC-Anrufe Kosten und Latenz bei den Berechnungen verursachen. Diese Lösung nutzt einige erweiterte Dataflow-Muster, um alle diese Aktivitäten bei geringer Latenz durchzuführen.

Weitere Informationen zur Größe des oben dargestellten Sortierproblems finden Sie unter Anhang A: Zufällig angeordnete Bytes berechnen.

Wiederholung der Angaben zur Funktionsweise der Lösung

Dieser Abschnitt beschäftigt sich intensiv mit den Details jeder Phase und der Ausführung des lösungsspezifischen Codes.

Perfekte Datenrechtecke erstellen

Der Prozess zur Erstellung perfekter Datenrechtecke erfolgt in zwei Schritten:

  • Datenströme transportieren und zusammenführen.
  • Kerzen erstellen.

Datenströme transportieren und zusammenführen

Viele Datenquellen, sowohl interne als auch von Drittanbietern, müssen durch ein einzelnes System in die Datenverarbeitungsstufen geführt werden.

Die folgende Tabelle beschreibt die in diesem Artikel verwendete vereinfachte Datenstruktur. Tatsächliche Tick-Daten verfügen über weitere Eigenschaften.

TypEigenschaft
Longtimestamp
StringSchlüssel
DoppeltPreis

Die Datenströme werden über Pub/Sub aufgenommen, das Millionen von Nachrichten pro Sekunde in einem einzelnen Thema verarbeiten kann und die Daten anschließend an Dataflow überträgt. Pub/Sub speichert die Daten für bis zu 7 Tage und löscht die Daten erst aus einem aktiven Abo, wenn der Abonnent den Eingang der Nachricht bestätigt hat. Pub/Sub fungiert als Stoßdämpfer des Systems und bewältigt alle unerwarteten massiven Spitzen auf dem Markt.

Mithilfe von Identitäts- und Zugriffsverwaltungssteuerelementen können Sie zuerst ein Thema in einem Projekt erstellen, das einem Datenanbieter gehört, und dann Personen, Gruppen oder Dienstkonten die Möglichkeit gewähren, Abos für dieses Thema abzuschließen.

Kerzen erstellen

Zuerst erstellen Sie in einem dreistufigen Prozess die Kerzen:

  • Erstellen Sie die ersten Aggregationsobjekte, die fehlende Werte ignorieren.
  • Erstellen Sie Platzhalter-Ticks für alle Streams, denen im Zeitabschnitt Werte fehlen.
  • Füllen Sie die Platzhalter-Ticks mit den Werten aus dem vorherigen Zeitabschnitt.
Erste Aggregationsobjekte erstellen

Folgendes sind die übergeordneten Arbeitsschritte dieser Transformation:

  • Alle Themen auslesen.
  • Fenster in Kerzengröße basierend auf der Ereigniszeit.
  • Gruppieren nach Schlüssel, zum Beispiel GBP:EUR, GBP:BRL, EUR:AUD, GBP:JPY.
  • Zusammenfassen für Close und Min/Max.

Die ersten drei Schritte erfordern relativ wenige Codezeilen, da sich Dataflow für Sie um alle groben Arbeiten kümmert. Der Code verwendet die folgenden Datenflussprimitive:

Für Schritt 4 können Sie mehrere Methoden der Transformation verwenden. Bei der Ausführung von Aggregationen wird die Nutzung von standardmäßigen Funktionen empfohlen, außer Sie müssen während der Aggregation eine benutzerdefinierte Geschäftslogik anwenden, was hier der Fall ist. Die Aggregation umfasst das Abrufen der Min- und Max-Werte sowie des Elements mit dem höchsten Zeitstempel zur Bestimmung des Close-Werts.

Mit den Dataflow-Wasserzeichen können Sie sich darauf verlassen, dass Sie die richtigen Kerzen mit Close-, Min- und Max-Werten für jeden Zeitabschnitt haben, anhand der Daten, die in Pub/Sub eingespeist wurden. Im nächsten Schritt erfahren Sie, wie man den Open-Wert berechnet und mit Zeitreihen umgeht, die in diesem Zeitabschnitt nicht gezählt wurden.

Fehlende Werte erkennen und Platzhalter-Datenpunkte erstellen

In diesem Schritt füllen Sie die in Abbildung 3 gezeigten Lücken durch eine Aufteilung Ihres Dataflow-Codes in zwei Zweige aus. Zweig 1 enthält die Kerzen der Live-Werte. Zweig 2 kombiniert alle Live-Werte, um die eindeutige Schlüsselliste in diesem Zeitabschnitt zu finden. Vergleichen Sie Zweig 2 mit einer Liste aller erwarteten Werte, um einen Datensatz zu erhalten, der alle fehlenden Datenpunkte für diesen Zeitabschnitt enthält. Mit dieser Liste setzen Sie Testkerzen in den Stream.

Platzhalter-Datenpunkte mit Daten ausfüllen

Nun ist ein perfektes Datenrechteck mit Live- und Testwerten entstanden. Bei den Live-Kerzen ist der Open-Wert nicht ausgefüllt. Die Testwerte enthalten hingegen nur den Zeitstempel. In diesem Schritt füllt der Close-Wert aus dem vorherigen Zeitabschnitt den Open-Wert der aktuellen Live- und Testkerzen aus. Da die Testkerzen jetzt einen Open-Wert haben, besteht der nächste Schritt darin, den Close- und Min/Max-Wert der Testkerzen mit dem Open-Wert zu befüllen.

Datenquelle als eine BigQuery-Tabelle.
Abbildung 6. Nur TS-4 ist noch unvollständig.

Wie in Abbildung 6 dargestellt, vervollständigt dies die perfekten Rechtecke mit Ausnahme von TS-4, das keinen Wert hatte, als der Datenfluss im Zeitabschnitt t0 bis t1 begann. Hierbei handelt es sich um ein Bootstrap-Problem. Je nachdem, wo die Daten gespeichert sind, muss sich die Lösung an die externen Speichersysteme wenden, zum Beispiel an Cloud Bigtable, um den letzten Wert vor Beginn des Datenflusses zu erhalten.

Jetzt, nachdem perfekte Datenrechtecke erstellt wurden, geht die Verarbeitung in Phase 2 über.

Das Erstellen dieser Datenrechtecke ist nicht nur für die Korrelationsanalyse nützlich. Wenn Sie einen vollständigen Wertesatz anhand der perfekten Rechtecke benötigen, können Sie die Werte speichern. Erstellen Sie hierzu eine Verzweigung mit Dataflow und schieben Sie die berechneten Werte in den Speicher, zum Beispiel BigQuery oder Bigtable.

Korrelationen berechnen

Die Berechnung Ihrer Korrelationen erfolgt in drei Schritten:

  • Bedienfenster erstellen.
  • Korrelationen berechnen.
  • Ergebnisse veröffentlichen und speichern.

Bedienfenster erstellen

Phase 1 platzierte die Daten auf vielen Hosts. Um aus den Kerzen Datensätze zu bilden, müssen Sie erst die Kerzen aller Schlüssel für jedes Bedienfenster an einem Ort sammeln. Anschließend müssen Sie das Datenpaket jedes Bedienfensters an mehrere Orte kopieren, bevor der großangelegte Daten-Fan-Out stattfinden kann.

Zuerst erstellen Sie mithilfe der Windowing-Primitive von Dataflow aus den in Phase 1 erzeugten Werten ein bewegliches Fenster. Dafür wird nur eine Codezeile benötigt:

 Window.into(SlidingWindows.of())

Die Werte für das Bedienfenster werden auf mehrere Hosts verteilt. Sie müssen jedoch für Ihre Korrelationen all diese Daten an einem einzigen Ort platzieren.

Das während des Fan-Outs generierte Datenvolumen und der Admin-Overhead sind größer als die im Bedienfenster selbst enthaltenen Daten. Sie sollten mehrere Kopien jedes Bedienfensters vor dem Fan-Out bereitstellen, um mehreren Hosts die Verarbeitung zu ermöglichen. Nutzen Sie dafür SideInputs im Fenstermodus von Dataflow, um mehrere Kopien von Daten an Maschinen zu senden, die die Verarbeitung für eine Untergruppe von Schlüsselpaaren übernehmen.

Jetzt haben Sie eine vollständige Kopie aller Daten, die Sie für den Fan-Out auf mehreren Hosts benötigen, wie in Abbildung 7 dargestellt.

Die Verarbeitung verteilt sich auf mehrere Hosts.
Abbildung 7. Die Verarbeitung verteilt sich auf mehrere Hosts.

Schreiben Sie als Nächstes eine Transformation, die Daten-Tupeln für alle Korrelationen generiert.

Korrelationen berechnen

In diesem Stadium ist der schwierigste Teil bereits erledigt. Die Arbeitspakete enthalten alle Daten, die zur Berechnung der Korrelationen für dieses Bedienfenster benötigt werden. Jedes Arbeitspaket ist ein perfektes Rechteck. Beide Seiten der Korrelationsberechnung enthalten die gleiche Anzahl an Werten, die für die Korrelation zur Algorithmusbibliothek weitergeleitet werden können, um die Berechnung vorzunehmen.

Nun müssen Sie entscheiden, welche Berechnungen für Ihr Unternehmen interessant sind. Wenn Sie diese Daten für eine visuelle Darstellung nutzen möchten, wird empfohlen, die Datenmenge zu reduzieren, sodass nur Korrelationen enthalten sind, die über einem von Ihnen festgelegten ABS(correlation)-Wert liegen. Alternativ können Sie alle Werte in die Veröffentlichungs- und Speichersysteme schieben. Dies ist eine einfach zu erzeugende Kostenfunktion für (n^n-n)/2-Berechnungen.

Die Ergebnisse veröffentlichen und speichern

Die Anforderungen für diesen Schritt sind abhängig davon, wofür Sie die nachgelagerten Daten nutzen. Sie können die Ergebnisse an drei Orte schicken, wobei alle optional sind und sich nicht gegenseitig ausschließen:

  • Pub/Sub. Pub/Sub kann nicht nur die Datenaufnahme durchführen, sondern auch als Verbindung zwischen den lose gekoppelten Systemen fungieren. Sie können die verarbeiteten Daten für die Aufnahme an andere Systeme schicken. Beispielsweise können Sie alle Korrelationen mit mehr als einem Wert von ABS(0.2) an andere Systeme schicken.
  • BigQuery. Geben Sie alle Werte, die Sie später mit der SQL-Schnittstelle verarbeiten oder aufrufen möchten, in BigQuery ein.
  • Bigtable. Platzieren Sie in Bigtable alle Daten, die Sie für die Speicherung mit geringer Latenz verwenden möchten oder aus denen Sie schnell eine sehr kleine Teilmenge eines größeren Datasets abrufen möchten (Schlüsselsuchen sowie Bereichsüberprüfungen).

Weitere Informationen

Anhang: Zufällig angeordnete Bytes berechnen

Dieser Anhang verwendet zur Vereinfachung der Darstellung fünf Zeitreihenwerte:

Nehmen wir an, dass Sie ein 10-Minuten-Bedienfenster haben und Sie Kerzen verwenden möchten, die 1 Minute lang sind. In diesem Fall benötigen Sie für jede Korrelation 10 Datenpunkte pro Zeitreihe. Gehen wir der Einfachheit halber davon aus, dass jeder Datenpunkt 1 Byte ist, was 10 x 1 = 10 Bytes pro Schlüssel ergibt. Es gibt 5 Zeitreihen, d. h. 5 x 10 = 50 Bytes pro Bedienfenster.

Szenario 1: Fan-Out direkt nach Fan-In

Wie viele Werte werden während des Fan-Out erstellt? Da es nur 5 gibt, könnte das Ergebnis in etwa so aussehen:

{1-2, 1-3, 1-4, 1-5, 2-3, 2-4, 2-5, 3-4, 3-5, 4-5} = 10 Paare oder ((n^2-n)/2), wobei n die Anzahl der Zeitachsen ist, (5^2 - 5)÷2 = 10

Jedes Paar hat 10 x 2 = 20 Bytes Daten und es gibt 10 Paare. Es müssen also 200 Bytes Daten im System sortiert werden.

Wenn Sie jedoch die Berechnung für 1.000 Zeitreihen durchführen, ergeben sich (1.000^1.000-1.000)/2 = 499.500, d. h. 499.500 x 10 x 2 = 9.990.000 Byte zum Sortieren.

Betrachtet man die Daten im Bedienfenster ohne den paarweisen Fan-Out, haben Sie nur 1.000 x 10 = 10.000 Byte zum Verschieben. Es ist effizienter, das Bedienfenster in mehrere Partitionen zu kopieren und dann die Korrelationen für diese Partitionen zu erstellen.