Build and visualize demand forecast predictions using Datastream, Dataflow, BigQuery ML, and Looker

This document shows you how to replicate and process operational data from an Oracle database into Google Cloud in real time. The tutorial also demonstrates how to forecast future demand, and how to visualize this forecast data as it arrives.

This tutorial is intended for data engineers and analysts who want to use their operational data. It assumes that you're familiar with writing both SQL queries and user-defined functions (UDFs).

This tutorial uses a fictitious retail store named FastFresh to help demonstrate the concepts that it describes. FastFresh specializes in selling fresh produce, and wants to minimize food waste and optimize stock levels across all stores. You use mock sales transactions from FastFresh as the operational data in this tutorial.

The following diagram shows the flow of operational data through Google Cloud.

Flow of operational data in FastFresh (explained in following text).

The operational flow shown in the preceding diagram is as follows:

  • Incoming data from an Oracle source is captured and replicated into Cloud Storage through Datastream.

  • This data is processed and enriched by Dataflow templates, and is then sent to BigQuery.

  • BigQuery ML is used to forecast demand for your data, which is then visualized in Looker.

Google does not provide licenses for Oracle workloads. You are responsible for procuring licenses for the Oracle workloads that you choose to run on Google Cloud, and you are responsible for complying with the terms of these licenses.

Objectives

  • Replicate and process data from Oracle into BigQuery in real time.

  • Run demand forecasting against data that has been replicated and processed from Oracle in BigQuery.

  • Visualize forecasted demand and operational data in Looker in real time.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  3. Enable the Compute Engine, Datastream, Dataflow, and Pub/Sub APIs.

    Enable the APIs

  4. Make sure that you have developer access to a Looker instance. You need this access to set up your own Looker model and dashboards.

    To request a trial, see the Looker free trial demo.

    You must also have the role of Project owner or Editor.

Prepare your environment

  1. In Cloud Shell, define the following environment variables:

    export PROJECT_NAME="YOUR_PROJECT_NAME"
    export PROJECT_ID="YOUR_PROJECT_ID"
    export PROJECT_NUMBER="YOUR_PROJECT_NUMBER"
    export BUCKET_NAME="${PROJECT_ID}-oracle_retail"
    

    Replace the following:

    • YOUR_PROJECT_NAME: the name of your project
    • YOUR_PROJECT_ID: the ID of your project
    • YOUR_PROJECT_NUMBER: the number of your project
  2. Enter the following:

    gcloud config set project ${PROJECT_ID}
    
  3. Clone the GitHub tutorial repository which contains the scripts and utilities that you use in this tutorial:

    git clone \
    https://github.com/caugusto/datastream-bqml-looker-tutorial.git
    
  4. Extract the comma-delimited file containing sample transactions to be loaded into Oracle:

    bunzip2 \
    datastream-bqml-looker-tutorial/sample_data/oracle_data.csv.bz2
    
  5. Create a sample Oracle XE 11g docker instance on Compute Engine by doing the following:

    1. In Cloud Shell, change the directory to build_docker:

        cd datastream-bqml-looker-tutorial/build_docker
      
    2. Run the following build_orcl.sh script:

       ./build_orcl.sh \
       -p YOUR_PROJECT_ID \
       -z GCP_ZONE \
       -n GCP_NETWORK_NAME \
       -s GCP_SUBNET_NAME \
       -f Y \
       -d Y
      

      Replace the following:

      • YOUR_PROJECT_ID: Your Cloud project ID
      • GCP_ZONE: The zone where the compute instance will be created
      • GCP_NETWORK_NAME= The network name where VM and firewall entries will be created
      • GCP_SUBNET_NAME= The network subnet where VM and firewall entries will be created
      • Y or N= A choice to create the FastFresh schema and ORDERS table (Y or N). Use Y for this tutorial.
      • Y or N= A choice to configure the Oracle database for Datastream usage (Y or N). Use Y for this tutorial.

      The script does the following:

      • Creates a new Google Cloud Compute instance.
      • Configures an Oracle 11g XE docker container.
      • Pre-loads the FastFresh schema and the Datastream prerequisites.

    After the script executes, the build_orcl.sh script gives you a summary of the connection details and credentials (DB Host, DB Port, and SID). Make a copy of these details because you use them later in this tutorial.

  6. Create a Cloud Storage bucket to store your replicated data:

    gsutil mb gs://${BUCKET_NAME}
    

    Make a copy of the bucket name because you use it in a later step.

  7. Configure your bucket to send notifications about object changes to a Pub/Sub topic. This configuration is required by the Dataflow template. Do the following:

    1. Create a new topic called oracle_retail:

      gsutil notification create -t projects/${PROJECT_ID}/topics/oracle_retail -f \
      json gs://${BUCKET_NAME}
      

      This command creates a new topic called oracle_retail which sends notifications about object changes to the Pub/Sub topic.

    2. Create a Pub/Sub subscription to receive messages which are sent to the oracle_retail topic:

      gcloud pubsub subscriptions create oracle_retail_sub \
      --topic=projects/${PROJECT_ID}/topics/oracle_retail
      
  8. Create a BigQuery dataset named retail:

    bq mk --dataset ${PROJECT_ID}:retail
    
  9. Assign the BigQuery Admin role to your Compute Engine service account:

    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member=serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
    --role='roles/bigquery.admin'
    

Replicate Oracle data to Google Cloud with Datastream

Datastream supports the synchronization of data to Google Cloud databases and storage solutions from sources such as MySQL and Oracle.

In this section, you use Datastream to backfill the Oracle FastFresh schema and to replicate updates from the Oracle database to Cloud Storage in real time.

Create a stream

  1. In console, navigate to Datastream and click Create Stream. A form appears.

    Fill in the form as follows, and then click Continue:

    • Stream name: oracle-cdc

    • Stream ID: oracle-cdc

    • Source type: Oracle

    • Destination type: Cloud Storage

    • All other fields: Retain the default value

  2. In the Define & Test Source section, select Create new connection profile. A form appears.

    Fill in the form as follows, and then click Continue:

    • Connection profile name: orcl-retail-source

    • Connection profile ID: orcl-retail-source

    • Hostname: <db_host>

    • Port: 1521

    • Username: datastream

    • Password: tutorial_datastream

    • System Identifier (SID): XE

    • Connectivity method: Select IP allowlisting.

  3. Click Run Test to verify that the source database and Datastream can communicate with each other, and then click Create & Continue.

    You see the Select Objects to Include page, which defines the objects to replicate, specific schemas, tables, and columns and be included or excluded.

    If the test fails, make the necessary changes to the form parameters and then retest.

  4. Select the following: FastFresh > Orders, as shown in the following image:

    Checkboxes with **FastFresh** and **Orders** selected.

  5. To load existing records, set the Backfill mode to Automatic, and then click Continue.

  6. In the Define Destination section, select Create new connection profile. A form appears.

    Fill in the form as follows, and then click Create & Continue:

    • Connection Profile Name: oracle-retail-gcs

    • Connection Profile ID: oracle-retail-gcs

    • Bucket Name: The name of the bucket that you created in Prepare your environment.

  7. Keep the Stream path prefix blank, and for Output format, select JSON. Click Continue.

  8. On the Create new connection profile page, click Run Validation, and then click Create.

    The output is similar to the following:

    Output validating details.

Create a Dataflow job using the Datastream to BigQuery template

In this section, you deploy the Dataflow Datastream to BigQuery streaming template to replicate the changes captured by Datastream into BigQuery.

You also extend the functionality of this template by creating and using UDFs.

Create a UDF for processing incoming data

You create a UDF to perform the following operations on both the backfilled data and all new incoming data:

  • Redact sensitive information such as the customer payment method.

  • Add the Oracle source table to BigQuery for data lineage and discovery purposes.

This logic is captured in a JavaScript file that takes the JSON files generated by Datastream as an input parameter.

  1. In the Cloud Shell session, copy and save the following code to a file named retail_transform.js:

    function process(inJson) {
    
       var obj = JSON.parse(inJson),
       includePubsubMessage = obj.data && obj.attributes,
       data = includePubsubMessage ? obj.data : obj;
    
       data.PAYMENT_METHOD = data.PAYMENT_METHOD.split(':')[0].concat("XXX");
    
       data.ORACLE_SOURCE = data._metadata_schema.concat('.', data._metadata_table);
    
       return JSON.stringify(obj);
    }
    
  2. Create a Cloud Storage bucket to store the retail_transform.js file and then upload the JavaScript file to the newly created bucket:

    gsutil mb gs://js-${BUCKET_NAME}
    
    gsutil cp retail_transform.js \
    gs://js-${BUCKET_NAME}/utils/retail_transform.js
    

Create a Dataflow job

  1. In Cloud Shell, create a dead-letter queue (DLQ) bucket:

    gsutil mb gs://dlq-${BUCKET_NAME}
    

    This bucket is used by Dataflow.

  2. Create a service account for the Dataflow execution and assign the account the following roles: Dataflow Worker, Dataflow Admin, Pub/Sub Admin, BigQuery Data Editor,BigQuery Job User, and Datastream Admin.

    gcloud iam service-accounts create df-tutorial
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/dataflow.admin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/dataflow.worker"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/pubsub.admin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/datastream.admin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/storage.admin"
    
  3. Create a firewall egress rule to let Dataflow VMs communicate, send, and receive network traffic on TCP ports 12345 and 12346 when autoscale is enabled:

    gcloud compute firewall-rules create fw-allow-inter-dataflow-comm \
    --action=allow \
    --direction=ingress \
    --network=GCP_NETWORK_NAME  \
    --target-tags=dataflow \
    --source-tags=dataflow \
    --priority=0 \
    --rules tcp:12345-12346
    
  4. Create and run a Dataflow job:

    export REGION=us-central1
    gcloud dataflow flex-template run orders-cdc-template --region ${REGION} \
    --template-file-gcs-location "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery" \
    --service-account-email "df-tutorial@${PROJECT_ID}.iam.gserviceaccount.com" \
    --parameters \
    inputFilePattern="gs://${BUCKET_NAME}/",\
    gcsPubSubSubscription="projects/${PROJECT_ID}/subscriptions/oracle_retail_sub",\
    inputFileFormat="json",\
    outputStagingDatasetTemplate="retail",\
    outputDatasetTemplate="retail",\
    deadLetterQueueDirectory="gs://dlq-${BUCKET_NAME}",\
    autoscalingAlgorithm="THROUGHPUT_BASED",\
    mergeFrequencyMinutes=1,\
    javascriptTextTransformGcsPath="gs://js-${BUCKET_NAME}/utils/retail_transform.js",\
    javascriptTextTransformFunctionName="process"
    

    Check the Dataflow console to verify that a new streaming job has started.

  5. In Cloud Shell, run the following command to start your Datastream stream:

    gcloud datastream streams update oracle-cdc \
    --location=us-central1 --state=RUNNING --update-mask=state
    
  6. Check the Datastream stream status:

    gcloud datastream streams list \
    --location=us-central1
    

    Validate that the state shows as Running. It may take a few seconds for the new state value to be reflected.

    Check the Datastream console to validate the progress of the ORDERS table backfill.

    The output is similar to the following:

    Object status with **Running** icon showing.

    Because this task is an initial load, Datastream reads from the ORDERS object. It writes all records to the JSON files located in the Cloud Storage bucket that you specified during the stream creation. It will take about 10 minutes for the backfill task to complete.

Analyze your data in BigQuery

After a few minutes, your backfilled data replicates into BigQuery. Any new incoming data is streamed into your datasets in (near) real time. Each record is processed by the UDF logic that you defined as part of the Dataflow template.

The following two new tables in the datasets are created by the Dataflow job:

  • ORDERS: This output table is a replica of the Oracle table and includes the transformations applied to the data as part of the Dataflow template.

    The output is similar to the following:

    Transformations applied to data.

  • ORDERS_log: This staging table records all the changes from your Oracle source. The table is partitioned, and stores the updated record alongside some metadata change information, such as whether the change is an update, insert, or delete.

    The output is similar to the following:

    Transformations applied to data.

BigQuery lets you see a real-time view of the operational data. You can also run queries such as a comparison of the sales of a particular product across stores in real time, or combining sales and customer data to analyze the spending habits of customers in particular stores.

Run queries against your operational data

  1. In BigQuery, run the following SQL to query the top three selling products:

    SELECT product_name, SUM(quantity) as total_sales
    FROM `retail.ORDERS`
    GROUP BY product_name
    ORDER BY total_sales desc
    LIMIT 3
    

    The output is similar to the following:

    Total sales for the three most-sold three selling products, arranged from highest to lowest total sales.

  2. In BigQuery, run the following SQL statements to query the number of rows on both the ORDERS and ORDERS_log tables:

    SELECT count(*) FROM `hackfast.retail.ORDERS_log`
    SELECT count(*) FROM `hackfast.retail.ORDERS`
    

    With the backfill completed, both statements return the number 520217.

Build a demand forecasting model in BigQuery ML

BigQuery ML can be used to build and deploy demand forecasting models using the ARIMA_PLUS algorithm. In this section, you use BigQuery ML to build a model to forecast the demand for products in the store.

Prepare your training data

You use a sample of the data that you backfilled to train the model. In this case, you use data from a one-year period. The training data shows the following:

  • The name of the product (product_name)
  • How many units of each product were sold (total_sold)
  • The number of products sold each hour (hourly_timestamp)

Do the following:

  1. In BigQuery, run the following SQL to create and save the training data to a new table named training_data:

    CREATE OR REPLACE TABLE `retail.training_data`
    AS
       SELECT
           TIMESTAMP_TRUNC(time_of_sale, HOUR) as hourly_timestamp,
           product_name,
           SUM(quantity) AS total_sold
       FROM `retail.ORDERS`
           GROUP BY hourly_timestamp, product_name
           HAVING hourly_timestamp BETWEEN TIMESTAMP_TRUNC('2021-11-22', HOUR) AND
    TIMESTAMP_TRUNC('2021-11-28', HOUR)
    ORDER BY hourly_timestamp
    
  2. Run the following SQL to verify the training_data table:

    SELECT * FROM `retail.training_data` LIMIT 10;
    

    The output is similar to the following:

    Output of query showing hourly timestamp, product names, and total sold.

Forecast demand

  1. In BigQuery, run the following SQL to create a time-series model that uses the ARIMA_PLUS algorithm:

    CREATE OR REPLACE MODEL `retail.arima_plus_model`
           OPTIONS(
             MODEL_TYPE='ARIMA_PLUS',
             TIME_SERIES_TIMESTAMP_COL='hourly_timestamp',
             TIME_SERIES_DATA_COL='total_sold',
             TIME_SERIES_ID_COL='product_name'
           ) AS
    SELECT
       hourly_timestamp,
       product_name,
       total_sold
    FROM
     `retail.training_data`
    

    The ML.FORECAST function is used to forecast the expected demand over a horizon of n hours.

  2. Run the following SQL to forecast the demand for organic bananas over the next 30 days:

    SELECT * FROM ML.FORECAST(MODEL `retail.arima_plus_model`,  STRUCT(720 AS horizon))
    

    The output is similar to the following:

    Output from ML.FORECAST query, with forecast values for bags of bananas broken down by timestamp.

    Because the training data is hourly, the horizon value will use the same unit of time when forecasting (hours). A horizon value of 720 hours will return forecast results over the next 30 days.

    Because this tutorial uses a small sample dataset, further investigation into the accuracy of the model is out of scope for this tutorial.

Create a view for visualization in Looker

  1. In BigQuery, run the following SQL query to create a view to unionize the actual and forecasted sales for organic bananas:

    CREATE OR REPLACE VIEW `retail.orders_forecast` AS (
    SELECT
    timestamp,
    product_name,
    SUM(forecast_value) AS forecast,
    SUM(actual_value) AS actual
    from
    (
    SELECT
       TIMESTAMP_TRUNC(TIME_OF_SALE, HOUR) AS timestamp,
       product_name,
       SUM(QUANTITY) as actual_value,
       NULL AS forecast_value
       FROM `retail.ORDERS`
       GROUP BY timestamp, product_name
    UNION ALL
    SELECT
           forecast_timestamp AS timestamp,
           product_name,
           NULL AS actual_value,
           forecast_value,
               FROM ML.FORECAST(MODEL `retail.arima_plus_model`,
                   STRUCT(720 AS horizon))
           ORDER BY timestamp
    )
    GROUP BY timestamp, product_name
    ORDER BY timestamp
    )
    

    This view lets Looker query the relevant data when you explore the actual and forecasted data.

  2. Run the following SQL to validate the view:

    SELECT * FROM `retail.orders_forecast`
    WHERE PRODUCT_NAME='Bag of Organic Bananas'
    AND TIMESTAMP_TRUNC(timestamp, HOUR) BETWEEN TIMESTAMP_TRUNC('2021-11-28', HOUR) AND TIMESTAMP_TRUNC('2021-11-30', HOUR)
    LIMIT 100;
    

    The output is similar to the following:

    Looker query output.

    As an alternative to BigQuery views, you can also use the built-in derived tables capabilities in Looker. These include built-in derived tables and SQL-based derived tables. For more information, see Derived tables in Looker

Visualize operational and forecasted data in Looker

In this section, you visualize the contents of orders_forecast view by creating a graph in Looker.

Set up a BigQuery connection

  1. In Cloud Shell, create a service account with the BigQuery Data Editor and BigQuery Job User roles:

    gcloud iam service-accounts create looker-sa
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:looker-sa@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:looker-sa@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"
    
    gcloud iam service-accounts keys create looker_sa_key.json \
    --iam-account=looker-sa@${PROJECT_ID}.iam.gserviceaccount.com
    
  2. Download the key in JSON format. The key is downloaded to the Cloud Shell directory.

  3. Open Looker and navigate to Admin > Database > Connections > Add Connections.

  4. Fill in the Connection Settings form as follows:

    • Name: rds_cdc_retail
    • Dialect: Google BigQuery Standard SQL
    • Project Name: your_project_name
    • Dataset: retail
    • Service account email: The name of the account that you created in an earlier step.
    • Persistent Derived Tables: Select the checkbox to enable this feature.
  5. Upload the service account key that you downloaded to the Service Account JSON/P12 File field.

  6. Click Test These Settings to validate the connection and then click Create Connection.

Create a new LookML project

LookML is a lightweight modeling language that you can use to build models of your data. You use these models to tell Looker how to query your data. In this section, you build a LookML model to reflect the BigQuery dataset schema that you use in this tutorial. Looker constructs SQLs queries against this BigQuery dataset to extract the relevant data for your model.

  1. In the Looker console, click Go to Develop > Manage LookML Projects > New LookML Project.

  2. Fill in the form as follows:

    • Starting point: Select Generate Model from Database Schema

    • Connection: rds_cdc_retail

    • Build views from: Select All Tables

    This project will generate queries using the BigQuery connection that you created in the previous section.

  3. Click Create Project. You are directed to the Explore page. As part of the project creation, model and view files are created that represent the BigQuery tables.

    The output is similar to the following:

    Output for `orders_forecast.view`.

    The model files describe which tables to use and the relationships between them.

    The view files describe how to access the fields in your table and let you create customized dimensions, aggregates, and relationships across your data.

  4. Copy the following code to the orders_forecast.view file and add a new measure to it:

     measure: actual_sum {
          type: sum
          sql: ${TABLE}.actual ;;
        }
     measure: forecast_sum {
         type: sum
         sql: ${TABLE}.forecast ;;
         }
    

    This measure summarizes the respective actual_sum and forecast_sum fields in the BigQuery table.

  5. Click Save Changes.

Create a Looker Explore

In this section, you select the visualization type for your Looker ML project.

Explores can be used to run queries and build visualizations against the dimensions and measures defined in the data model. To learn more about Looker Explores, see Explore parameters.

  1. In the Looker console, select Explore Orders Forecast from the orders_forecast.view drop-down menu.

    This selection lets you explore and compare the forecasted and actual sales of organic bananas over a period of time.

  2. Make the following selections in the Explore (under the All Fields tab):

    1. Select the following filters: Actual Sum and Forecast Sum.

    2. Select the filter icon next to Product Name and set the following:

      • Orders Forecast Product Name: is equal to: Bag of Organic Bananas

      The filter that you see is similar to the following:

      Filter for products.

    3. Expand the Timestamp field and select the Time dimension

    4. In the Timestamp field, select the filter icon besides Date and set the following filters in the Orders Forecast Timestamp Date field:

      • is in range: 2021-11-28
      • until (before): 2021-11-30
  3. Click Run. The results from BigQuery are displayed in columnar format.

    The output is similar to the following:

    BigQuery results output.

  4. To visualize these results, click Visualization and then Line Chart.

  5. On the top right, click the settings icon and then click Save to Dashboard.

  6. Name the tile Forecasted Data and then select New Dashboard.

    The output is similar to the following:

    Line chart comparing actual and forecast number of orders for `Bag of organic bananas`.

  7. When you're prompted, name the dashboard FastFresh Retail Dashboard.

  8. Navigate to the new dashboard by clicking the following link text: FastFresh Retail Dashboard.

    You see the following output:

    The newly created dashboard in Looker.

You can continue to explore Looker by adding additional titles to the FastFresh dashboard.

Looker dashboards support filtering and alerts and can be shared across your organization or scheduled to be periodically sent out to select teams. To enable others in your organization to use the dashboard, you need to push the LookML model to production, however, this is out of the scope of this tutorial.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

  1. In the console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next