Processing time series data: tutorial

Learn how to use the Timeseries Streaming solution to do anomaly detection on time series data.

Overview

The Timeseries Streaming reference application provides Java libraries to process streaming time series data in order to fill in gaps in the data, and to calculate metrics for the data within and across time windows. Timeseries Streaming also provides Python libraries that illustrate one common use case for the processed data, which is using it to get predictions from a machine learning (ML) model.

This tutorial walks you through processing generated sample data with the Java libraries, and outputting the results as TF.Record files and also as messages in a Pub/Sub topic. It then shows you how to use the code in the Python libraries to get anomaly detection predictions from a long short-term memory (LSTM) model. You get batch predictions on the data in TF.Record format, and you get in-stream predictions on the data in the Pub/Sub topic.

This document is part of a series:

This document is intended for developers and data engineers, and assumes that you have the following knowledge:

  • Basic understanding of Java and Python programming
  • Basic understanding of ML model development and use

Objectives

  • Process sample time series data and output it as TF.Record files.
  • Process sample time series data and output it as messages in a Pub/Sub topic.
  • Get batch predictions from the LSTM model on the TF.Records data.
  • Get in-stream predictions from the LSTM model on data published to Pub/Sub.

Costs

This tutorial uses billable components of Google Cloud, including:

  • Compute Engine
  • Dataflow
  • Pub/Sub

Use the pricing calculator to generate a cost estimate based on your projected usage. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Compute Engine, Dataflow, and Pub/Sub APIs.

    Enable the APIs

Create Pub/Sub resources

Create a Pub/Sub topic and subscription. Later in the tutorial, you write processed time series data to the topic, and then get reference the subscription to get in-stream predictions on that data.

Create a topic

  1. Go to the Pub/Sub topics page
  2. Click Create Topic.
  3. For Topic ID, type time-series.
  4. Click Save.

Create a subscription

  1. Go to the Pub/Sub subscriptions page
  2. Click Create Subscription.
  3. For Subscription ID, type time-series.
  4. For Select a Cloud Pub/Sub topic, select projects/<myProject>/topics/time-series.
  5. Scroll down to the bottom of the page and click Create.

Create a Cloud Storage bucket

Create a Cloud Storage bucket. Later in the tutorial, you use this bucket as the temp file location for a Dataflow pipeline.

  1. Open the Cloud Storage browser
  2. Click Create Bucket.
  3. For the bucket name, type time-series-myProject. Replace myProject with the ID of the project you are using to complete this tutorial.
  4. Click Create.

Create the virtual machine instance

Create a Compute Engine virtual machine instance where you can install the Timeseries Streaming solution.

  1. Go to the VM instances page
  2. Click Create instance.
  3. For Name, type time-series.
  4. Under Boot disk, click Change.
  5. For Size (GB), type 20 and then click Select..
  6. For Access scopes in the Identity and API access section, choose Allow full access to all Cloud APIs.
  7. For Firewall, select Allow HTTP traffic. Selecting this adds a network tag to your VM, which associates the firewall rule with the VM. Then, Compute Engine creates the corresponding ingress firewall rule that allows all incoming traffic on tcp:80 (HTTP).
  8. Click the Create button to create and start the instance.

Install required packages

Install the packages needed by Timeseries Streaming, then clone the Timeseries Streaming repository.

  1. Click SSH in the Connect column for the time-series instance.
  2. In the terminal window that opens for the VM instance, update the apt-get tool by running the following command:

    sudo apt-get update
    
  3. Install the packages needed by Timeseries Streaming:

    sudo apt-get install -y default-jdk gradle git python3-dev python3-pip python3-venv
    
  4. Install the pip tool:

    python3 -m pip install --upgrade pip
    
  5. Install the virtualenv module:

    pip3 install virtualenv
    
  6. Update the $PATH variable to include the virtualenv install location. Replace virtualenv-path with the installation path for virtualenv, which is /home/<your_home_directory>/.local/bin by default.

    export PATH="virtualenv-path":$PATH
    

Clone the GitHub repo

Clone the Timeseries Streaming repository:

sudo git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git

Copy the example model to Cloud Storage

Copy the model artifacts for the solution to a Cloud Storage bucket. Replace myBucket with the Cloud Storage bucket you created.

You use this model to process time series data in the Dataflow pipeline.

  1. Copy the model folder:

    gsutil cp -r dataflow-sample-applications/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/saved_model_example/serving_model_dir gs://myBucket/serving_model_dir
    
  2. Copy the transform graph folder:

    gsutil cp -r dataflow-sample-applications/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/tf_transform_graph_dir gs://myBucket/tf_transform_graph_dir
    

Build the Java libraries

Build the Timeseries Streaming Java libraries so you can use them to process data.

  1. Switch to the Timeseries Streaming Java libraries directory by running the following command:

    cd dataflow-sample-applications/timeseries-streaming/timeseries-java-applications
    
  2. Build the libraries:

    sudo ./gradlew build
    

    The build process takes approximately 5 minutes to complete.

Process time series data using the Java libraries

Use the steps in this section to process autogenerated sample data.

To use real data instead of generated data, you would need to build a data adapter to convert your data to a TSDataPoint object, as illustrated in the SimpleDataStreamGenerator library.

Generate TF.Record output

  1. Make a directory to contain the generated TF.Record files by running the following command:

    sudo mkdir /tmp/simple-data
    
  2. Generate TF.Record files by running the following command. Let the command run for a couple of minutes, then type Ctrl+C to stop the process.

    sudo ./gradlew run_example --args='--enablePrintMetricsToLogs --interchangeLocation=/tmp/simple-data --withOutliers=true'
    
  3. Review the command output:

    ls /tmp/simple-data/data
    

    You should see results similar to the following:

    'seconds: 1602707513'$'\n''-seconds: 1602707518'$'\n''-1602707513000-1602707518000-
    2020-10-14T20:31:53.000Z-2020-10-14T20:31:58.000Z-00001-of-00005.tfrecord'
    'seconds: 1602707514'$'\n''-seconds: 1602707519'$'\n''-1602707514000-1602707519000-
    2020-10-14T20:31:54.000Z-2020-10-14T20:31:59.000Z-00001-of-00005.tfrecord'
    'seconds: 1602707515'$'\n''-seconds: 1602707520'$'\n''-1602707515000-1602707520000-
    2020-10-14T20:31:55.000Z-2020-10-14T20:32:00.000Z-00002-of-00005.tfrecord'
    'seconds: 1602707516'$'\n''-seconds: 1602707521'$'\n''-1602707516000-1602707521000-
    2020-10-14T20:31:56.000Z-2020-10-14T20:32:01.000Z-00003-of-00005.tfrecord'
    

Generate Pub/Sub output

Generate Pub/Sub messages and publish them to the time-series topic by running the following command. Let the command run for a couple of minutes, then type Ctrl+C to stop the process. Replace myProject with the ID of the project you are using to complete this tutorial.

sudo ./gradlew run_example --args='--withOutliers=true --pubSubTopicForTSAccumOutputLocation=projects/myProject/topics/time-series'

Set up the Python environment

Create a Python virtual environment, then install the TimeSeries Streaming Python libraries that contain the sample code for requesting predictions from the LSTM model.

  1. Switch to your home directory by running the following command:

    cd ~/
    
  2. Set up the Python virtual environment:

    virtualenv -p python3.7 streaming-tf-consumer
    
  3. Activate the virtual environment:

    source streaming-tf-consumer/bin/activate
    
  4. Grant required permissions on the dataflow-sample-applications directory:

    sudo chmod -R 777 dataflow-sample-applications
    
  5. Switch to the Timeseries Streaming Python libraries directory:

    cd dataflow-sample-applications/timeseries-streaming/timeseries-python-applications
    
  6. Install the Python sample code:

    pip install -e .
    

Get predictions

Use the procedures in this section to get batch predictions on the data in the TF.Record files you generated, and to get in-stream predictions on the data you published to the time-series Pub/Sub topic.

Get batch predictions

Get batch predictions on the time series data that you output as TF.Record files by running the following command. Replace myBucket with the Cloud Storage bucket you created.

python ml_pipeline_examples/sin_wave_example/inference/batch_inference.py --saved_model_location=gs://myBucket/serving_model_dir --tf_transform_graph_dir=gs://myBucket/tf_transform_graph_dir --tfrecord_folder=/tmp/simple-data/data/*

You should see results similar to the following:

{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 26), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 31), 'timeseries_x-value-LAST': {'input_value': 1.2381953, 'output_value': 1.2465595, 'raw_data_array': '[79.86 81.92 83.87 85.72 87.46]', 'diff': 0.008364201, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': -1.407084, 'output_value': -1.4521656, 'diff': 0.045081615}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': -0.077018194, 'output_value': -0.036538757, 'diff': 0.040479437}}
{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 20, 56), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 1), 'timeseries_x-value-LAST': {'input_value': 0.027333755, 'output_value': -0.02873822, 'raw_data_array': '[-12.19  -8.72  -5.23  -1.75   1.75]', 'diff': 0.056071974, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': 1.4148479, 'output_value': 1.4649682, 'diff': 0.050120354}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': 0.0711496, 'output_value': 0.00040806632, 'diff': 0.070741534}}
{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 38), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 43), 'timeseries_x-value-LAST': {'input_value': 1.4099848, 'output_value': 1.5344787, 'raw_data_array': '[97.44 98.16 98.77 99.25 99.62]', 'diff': 0.12449384, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': -0.36180413, 'output_value': -0.32122365, 'diff': 0.04058048}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': -1.370246, 'output_value': -1.4247555, 'diff': 0.0545094}}

Get in-stream predictions

Get in-stream predictions on the time series data in the time-series Pub/Sub topic by running the following command. Replace myProject with the ID of the project you are using to complete this tutorial, and replace myBucket with the Cloud Storage bucket you created.

python ml_pipeline_examples/sin_wave_example/inference/stream_inference.py --saved_model_location=gs://myBucket/serving_model_dir --tf_transform_graph_dir=gs://myBucket/tf_transform_graph_dir --pubsub_subscription=projects/myProject/subscriptions/time-series --runner=DataflowRunner --project=myProject --temp_location=gs://myBucket --region=us-central1 --setup_file=./setup.py

To confirm the messages have been processed, take the following steps:

  1. Open the Dataflow Jobs page
  2. Click on the running job to see the job graph.
  3. In the job graph, click on the last job step, ParDo(CallableWrapperDoFn).
  4. Review the Input collections and Output Data Freshness metrics. You should see results similar to the following:

    Input and output metrics for the Dataflow pipeline processing the time series data.

Cleaning up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project containing the resources, or keep the project but delete just those resources.

Either way, you should remove those resources so you won't be billed for them in the future. The following sections describe how to delete these resources.

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the components

If you don't want to delete the project, use the following sections to delete the billable components of this tutorial.

Stop the Dataflow job

  1. Open the Dataflow Jobs page
  2. In the jobs list, click time-series.
  3. On the job details page, click Stop.
  4. Select Cancel.
  5. Click Stop job.

Delete the Pub/Sub topic and subscription

  1. Open the Pub/Sub Subscriptions page
  2. Select the checkbox of the time-series subscription.
  3. Click Delete.
  4. In the overlay window that appears, confirm you want to delete the subscription and its contents by clicking Delete.
  5. Click Topics.
  6. Select the checkbox of the time-series topic.
  7. Click Delete.
  8. In the overlay window that appears, type delete and then click Delete.

Delete the Compute Engine VM instance

  1. Open the Compute Engine VM instances page
  2. Select the checkbox of the time-series VM instance.
  3. Click Delete.
  4. In the overlay window that appears, confirm you want to delete the subscription and its contents by clicking Delete.

Delete the Cloud Storage bucket

  1. Open the Cloud Storage browser
  2. Select the checkbox of the time-series-<myProject> bucket.
  3. Click Delete.
  4. In the overlay window that appears, type DELETE and then click Confirm.

What's next