Correlating Thousands of Financial Time Series Streams in Real Time

This article describes using Google Cloud Dataflow, Google's distributed stream-processing technology, to build a near real-time analytics system that can scale from a few simultaneous data streams to thousands of simultaneous data streams of financial instruments with zero change, administration, or infrastructure work.

The solution provides strongly consistent, accurate, and complete data with no need for batch or micro-batch processing. Although this implementation uses correlation analysis, you can generalize the processing patterns it demonstrates to other uses, such as IOT devices sending signals from equipment in a factory.

Security is always important when working with financial data. Google Cloud Platform helps to keep your data safe, secure, and private in several ways. For example, all data is encrypted during transmission and when at rest, and the Cloud Platform is ISO 27001, SOC3, FINRA, and PCI compliant.

The bulk of data processing today is batch processing, which is performed on a snapshot of data frozen in time. Snapshots are processed all at once and results are generated in cycles, which works well for many use cases. Other use cases, however, require accurate but low-latency results from stream-based sources, where data points and their corresponding event times become a tuple to be used in the computation. Tools like Cloud Dataflow make it easier to produce fast and accurate analysis, so you can concentrate on the business logic, instead of implementing the primitives of stream processing.

This solution illustrates a use case that addresses challenges faced by a leading bank. The use case brings together several thousand live streams of financial data to apply Pearson correlation computations. Correlations allow you to explore the relationship between two time series. A common example is the relationship between the sales of umbrellas and the sales of ice cream. Plotted on a graph against each other, they generally move in the opposite direction. Mass correlation calculations allow traders to identify unusual correlations or inverse correlations, which they can trade against.

Find the code for the solution on GitHub at https://github.com/GoogleCloudPlatform/data-timeseries-java.

Definitions

This article introduces the following terms:

Batching. How this term is defined depends on implementation detail. Some systems expose underlying batching and call it windowing, in which case it means processing- time windowing, a semantically less useful variant than the event-time windowing that this article discusses.

Given a sufficiently flexible system, you can combine the semantic power of event-time windowing with the low latency of a true streaming engine, as this solution demonstrates with Cloud Dataflow.

For a detailed explanation, refer to the excellent blogs by Tyler Akidau: The World Beyond Batch Streaming 101 and The World Beyond Batch Streaming 102.

Windowing. This term refers to aggregating elements in the context of their event times, that is, when they actually occurred. Both true streaming and micro-batch systems can perform windowing, although micro- batching introduces additional latency.

Operating Window. This term refers to a single pane within a sliding window. In Figure 1, the x axis contains references to three time series, TS-1, TS-2, and TS-3. The y axis shows the discrete time slices, defined by t0 to t5. Operating Windows 01 and Operating Window 02 cover different time slices in a sliding window mode. The data points t2 to t3 are shared between Operating Window 01 and Operating Window 02.

Figure 1. The term operating window refers to a single pane within a sliding window.

Candle. This term is derived from the Candlestick charts that are often used to represent financial time series. Each candle holds information for the following aggregations within a given time slice:

  • The opening value within a time slice, or the close value of the preceding time slice.
  • The closing value within a time slice.
  • The minimum value within a time slice.
  • The maximum value within a time slice.

This solution does not set a fixed length of the time slice used for each candle. You can set the length to days or seconds, depending on the computations you need downstream.

Key phases of the solution

This solution constructs a pipeline in two key phases, each with its own sub-processing stages.

Figure 2. In Phase 1, you create perfect data rectangles. In Phase 2, you compute the correlations.

Phase 1: Create perfect data rectangles

If you only wanted to create candles using the data in the stream, it would be easy. The Cloud Dataflow windowing functions, which you can set up with a couple of lines of code, can deal with the transitive and associative aggregations of finding the open, close, min, and max values for each candle within a time slice.

The difficulty is that the time series have different liquidity rates, or ticks, in each time slice. As shown in Figure 3, some streams have data in each time slice, and others have missing data points. When data points are missing from a time slice, you cannot reliably build correlations. The absence of a value does not mean that the asset does not have a price, just that the price was not updated during this time slice.

Figure 3.Data that was not updated during the time slice shows as missing data points.

When you start working through the solution, first you create perfect rectangles of data, that is, rectangles with all the missing data points filled in. A value is computed for each of the data points in the time slice, as shown in Figure 4.

Figure 4. A perfect rectangle of data has no missing data points.

Perfect rectangles are useful for many types of computations, so although the sample in this solution requires both phases within a single dataflow, you can use each phase in different pipelines.

Phase 2: Compute correlations

Using the complete data rectangles from Phase 1, you can feed the data into a Pearson's correlation library to produce the results you want. The challenge in this phase is not the correlation computations, but the large fan-in and fan- out, which forces shuffling of a large amount of data.

Given thousands of time series, the number of unique pairs is in the order of ((n^2-n)/2), where n is the number of time series being processed.

The following figure shows the numbers of pairs created when you compare four currency values. GBP:AUD indicates the time series that provides the price of UK Sterling versus the Australian dollar over time. (The central line is ignored because the correction of the currency against itself is not valuable. The green blocks are ignored because there is no need to compute the correlation of the same currency pairs twice.)

Figure 5. Numbers of pairs created when you compare four currency values.

Picking up the X values generates the following six currency pairs:

[{GBP:EUR VS GBP:BRL},
{GBP:EUR VS EUR:AUD},
{GBP:EUR VS GBP:JPY},
{GBP:BRL VS EUR:AUD},
{GBP:BRL VS GBP:JPY},
{EUR:AUD VS GBP:JPY}]

Because Cloud Dataflow is a distributed processing system, you now confront a shuffle problem: that is, a fan-in must occur before the fan-out. The data per operating system is spread across many machines.

It might seem like a good idea to use Cloud Pub/Sub to connect the two dataflows at this point. While that is technically and functionally possible, the RPC calls would introduce cost and latency into the computations. This solution uses a few advanced Cloud Dataflow patterns to perform all this activity with very low latency.

For more information about the size of the shuffle problem outlined above, see Appendix A: Calculating bytes shuffled.

Review how the solution works

This section digs into the details of each phase and running the solution code.

Creating perfect data rectangles

Creating perfect data rectangles is a two-step process:

  • Transport and merge the data streams.
  • Create the candles.

Transporting and merging the data streams

Many sources of data, both internal and from third-party providers, must be funneled through a single system into the data-processing stages.

The following table describes the simplified data structure used in this article. Real tick data will contain more properties.

TypeProperty
Longtimestamp
Stringkey
Doubleprice

The data streams are consumed through Cloud Pub/Sub, which can process millions of messages per second in a single topic, transferring the data downstream to Cloud Dataflow. Cloud Pub/Sub retains data for up to 7 days, only removing data from an active subscription after a subscriber acknowledges receipt of the message. Cloud Pub/Sub acts as the system's shock absorber, dealing with any unexpected massive spikes in the market.

Using identity and access management controls, you can first create a topic in a project owned by a data provider and then grant the ability to create subscriptions against that topic to individuals, groups, or service accounts.

Creating the candles

First you create the candles, which is a three-step process:

  • Create the initial aggregation objects, which ignore missing values.
  • Generate placeholder ticks for any streams that are missing values in the time slice.
  • Populate placeholder ticks with values from the previous time slice.
Create initial aggregation objects

Here are the high-level steps in this transform:

  • Read from all topics.
  • Window into candle-size based on event time.
  • Group by key, for example, GBP:EUR, GBP:BRL, EUR:AUD, GBP:JPY.
  • Aggregate for Close and Min/Max.

The first three steps require relatively few lines of code because Cloud Dataflow takes care of all the plumbing work for you. The code uses the following dataflow primitives:

There are a few methods you can use for step 4 of the transform. When performing aggregations, it is best practice to use out-of-the-box functions, unless you need to apply custom business logic during the aggregation, which is the case here. The aggregation includes obtaining the Min and Max as well as the element with the highest timestamp to determine the Close value.

Using Cloud Dataflow watermarks, you can be confident that you have correct candles with Close,Min, and Max values for each time slice, based on the data that was pushed into Cloud Pub/Sub. In the next step you'll see how to compute the Open value and deal with time series that have not ticked in this time slice.

Detect missing values and generate placeholder data points

In this step you fill the gaps shown in Figure 3 by splitting your Cloud Dataflow code into two branches. Branch 1 contains the candles of the live values. Branch 2 combines all the live values to find the unique list of keys in this time slice. You compare Branch 2 with a list of all expected values to get a set that contains all the missing data points for this time slice. Using this list, you inject dummy candles into the stream.

Populate placeholder data points with data

Now you have a perfect rectangle of data that contains both live and dummy values. The live candles do not have the Open value populated, while the dummy values contain no values other than the timestamp. In this step the Close value of the previous time slice populates the Open value of the current live and dummy candles. Because the dummy candles now have an Open value, the next action is to propagate the Open value to the Close and Min/Max of the dummy candles.

data source as a BigQuery table
Figure 6. All but TS-4 are now complete.

As shown in Figure 6, this completes our perfect rectangles — with the exception of TS-4, which did not have a value when the dataflow started in time slice t0 to t1. This is a bootstrap issue; depending on where data is stored, the solution must call out to the external storage systems, for example Cloud Bigtable, to obtain the last value before the dataflow started.

Now that you have perfect rectangles of data, processing moves to Phase 2.

Creating these data rectangles is not only useful for correlation analysis. You can store the values by branching with Cloud Dataflow pushing the computed values into storage, for example BigQuery or Cloud Bigtable, whenever you need a complete set of values based on the perfect rectangles.

Computing correlations

Computing your correlations is a three-step process:

  • Create the operating windows.
  • Compute the correlations.
  • Publish and store the results.

Creating the operating windows

Phase 1 placed the data on many hosts. To build the data arrays from the candles, you must first gather all the candles from all of the keys for each operating window in a single location. Next, you must copy each operating window's data package to multiple locations before the large-scale data fan-out.

First you create a moving window of the values generated in Phase 1 using the Cloud Dataflow windowing primitives. This takes only a single line of code:

 Window.into(SlidingWindows.of())

The values for this operating window are spread across many hosts, but for your correlations you must fan-in all this data to a single location.

The data volume and admin overhead generated during the fan-out is orders of magnitude larger than the data contained in the operating window itself. You should distribute multiple copies of each operating window before the fan-out, allowing multiple hosts to do the processing. For this you use Cloud Dataflow's windowed SideInputs to send multiple copies of the data to the machines that are processing for a subset of the key pairs.

Now you have a full copy of all the data you need for the fan-out on multiple hosts, as shown in Figure 7.

Processing fans out to multiple hosts
Figure 7. Processing fans out to multiple hosts.

Next, you write a transform that generates the data tuples for all of the correlations.

Computing the correlations

At this stage the hard work is done. The work packets contain all the data needed to calculate the correlations for this operating window. Each work packet is a perfect rectangle. Both sides of the correlation computation contain an equal number of values, which can be passed to the correlation algorithm library to carry out the computation. Next you must make a choice about which computations are interesting for your business. If you want to use this data for visual representation, you might want to reduce the amount of data pushed forward to contain only correlations that are above an ABS(correlation) value that you specify. Or you can push all the values into the publishing and storage systems. This is a simple function of cost to accommodate the (n^n-n)/2 computations that can be generated.

Publishing and storing the results

The requirements for this stage depend on the use you'll make of the data downstream. You can send the results to three locations, all of which are optional and not mutually exclusive:

  • Cloud Pub/Sub. As well as performing ingestion, Cloud Pub/Sub can also act as the glue between the loosely coupled systems. You can send the processed data to other systems to consume; for example, you might send all correlations with more than the value of ABS(0.2) to other systems.
  • BigQuery. Place any data that you want to process or access later using a SQL interface into BigQuery.
  • Cloud Bigtable Place any data that you want to use for low-latency storage, or where you might want to get at a very small subset of a larger dataset quickly (key lookups as well as range scans), in Cloud Bigtable.

Next steps

Appendix: Calculating bytes shuffled

This appendix uses five time-series values for simplicity of illustration:

Assume that the operating window is 10 minutes, and you want to use candles that are 1 minute long. In this case, for each correlation you need 10 data points per time series. For simplicity, assume that each data point is 1 byte, which gives you 10 * 1 = 10 bytes per key. There are 5 time series, which gives you 5 * 10=50 bytes per operating window.

Scenario 1: Fan-out directly after the fan-in

How many values are created during the fan-out? Because there are only 5, this will look like the following:

{1-2, 1-3, 1-4, 1-5, 2-3, 2-4, 2-5, 3-4, 3-5, 4-5} = 10 pairs Or ((n^2-n)/2) where n is the number of time series, (5^2 - 5)/2 = 10

Each pair has 10 * 2 = 20 bytes of data and there are 10 pairs, so you have 200 bytes of data to shuffle around the system.

Not bad, but if you do the same calculation for 1000 time series, you have (1000^1000-1000)/2 = 499500, which gives you 499500 * 10 * 2 = 9,990,000 bytes to shuffle around.

Looking at the data in an operating window without the pairwise fan out, you have only: 1000 * 10 = 10000 bytes to move around. It's more efficient to copy the operating window to multiple partitions and then do the work of creating correlations on those partitions.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...