Preparing ML-ready data for personalization

This tutorial provides a set of Dataflow pipelines that you can run in specific order to create machine learning (ML) datasets from data stored in business tools such as Google Analytics 360 (GA360) or customer relationship management (CRM) systems.

Businesses are increasingly aware of the value of sophisticated ML models. Propensity models, customer lifetime value models, recommender systems, response/uplift models, and next best action models can help businesses acquire and retain customers through personalization.

However, extra work is often required to build such models from data stored in tools such as GA360 or CRM systems. Although these tools can help businesses organize and transform their data to meet business objectives at the aggregate level, usually the data must be re-engineered for ML modeling.

The system that this tutorial demonstrates can help you do the following:

  • Structure data into a generic format for ML processing.
  • Analyze data to ensure that it's ready for ML processing.
  • Create a dataset that you can use to build binary-classification ML models such as propensity models, response/uplift models, and recommender systems.

In this tutorial, you set up Dataflow to create ML-ready data snapshots for dates that slide at set intervals. Each data snapshot consists of the following:

  • ML-ready training features that reflect what happened in the past (relative the snapshot date) over a predefined historical window (called the lookback window).
  • An ML-ready binary training label that reflects what might happen in the future (relative to the snapshot date) over a predefined prediction window.

This tutorial is intended for data engineers and data scientists who have a standard knowledge of running Dataflow pipelines and a fundamental knowledge of ML principles.

Architecture

The following diagram illustrates the architecture for the setup that this tutorial describes.

Architecture for pipelines that output ML datasets.

This process includes the following steps:

  • Step 1: Read GA360 or CRM data into Cloud Storage.
  • Step 2 (optional): Explore the dataset in BigQuery.
  • Step 3: Window the dataset into Cloud Storage.
  • Step 4: Generate features to output an ML-ready dataset in BigQuery.

These steps produce an ML-ready dataset that you can use to train an ML model with tools such as AutoML Tables, TensorFlow, or a preferred ML modeling tool.

Objectives

Costs

For this tutorial, the cost for using the Google Analytics 360 Sample Dataset is minimal. The cost for implementing this solution in production depends on your dataset size. To keep the pipeline for this tutorial as efficient as possible, the pipeline uses standard Dataflow VM machines. For more information, see Dataflow pricing.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  6. Enable the Dataflow and Cloud Storage APIs.
  7. Enable the APIs

Preparing your environment

In this tutorial, you run commands in Cloud Shell.

  1. In the Cloud Console, open Cloud Shell.

    Open Cloud Shell

    Cloud Shell gives you access to the command line in Google Cloud, and it includes the Cloud SDK and other tools that you need for Google Cloud development. Cloud Shell can take several minutes to initialize.

  2. Set default variables:

    export PROJECT_ID=$(gcloud config get-value project)
    export BUCKET_NAME=${PROJECT_ID}-mldwp
    export REGION=YOUR_REGION
    export ZONE=YOUR_ZONE
    export DATASET=mldwp_dataset
    

    Replace the following:

    • YOUR_REGION: a region of your choosing, for example, us-west1
    • YOUR_ZONE: a zone of your choosing, for example, us-west1-b

      For more information, see Geography, regions, and zones.

  3. Create a working bucket:

    gsutil mb -p ${PROJECT_ID} -l ${REGION} gs://${BUCKET_NAME}
    
  4. Create the relevant BigQuery dataset:

    bq mk ${DATASET}
    
  5. Clone the ML Data Windowing Pipeline open source project:

    git clone https://github.com/GoogleCloudPlatform/cloud-for-marketing
    
  6. Go to cloud-for-marketing/tree/master/marketing-analytics/predicting/ml-data-windowing-pipeline.

Install Apache Maven

  1. Download and install the Java Development Kit (JDK) version 9 or later. Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  2. Download and install the Cloud SDK.
  3. Following Maven's installation guide for your specific operating system, download and install Apache Maven.

Step 1: Create AVRO files from BigQuery tables

In the ML Data Windowing Pipeline project, the UserSessionPipeline.java pipeline is optimized to read data directly from the Google Analytics 360 BigQuery output table between a set of specified dates. In this step, you run the UserSessionPipeline.java pipeline to digest the GA360 data into AVRO files that you then use in Step 2 (optional) and Step 3.

When you call the UserSessionPipeline.java pipeline in Cloud Shell, use the arguments in the following table.

Flag Description Example argument

--inputBigQuerySQL

The SQL query used to select data from the Google Analytics table in BigQuery. Ensure that the pipeline can access the dataset and tables.

SELECT * FROM bigquery-public-data.google_analytics_sample.ga_sessions_*

--predictionFactName

The name of the BigQuery column that contains the value (label of the ML dataset) that you want to predict.

hits.eventInfo.eventAction

--predictionFactValues

The value in the BigQuery column that represents a positive label of what you want to predict. You can define the positive label with multiple values that are separated by commas.

Add to Cart

--outputSessionsAvroPrefix

The location on Cloud Storage to output the AVRO files to. You created the parent bucket in Preparing your environment earlier in this tutorial.

gs://${BUCKET_NAME}/usersession-output/

  1. In Cloud Shell, run the UserSessionPipeline.java pipeline on Dataflow:

    export RUN_ID=`date -I`/`date +%s`
    
    mvn -Pdataflow-runner compile exec:java \
    -D_MAIN_CLASS=UserSessionPipeline \
    -Dexec.args="--runner=DataflowRunner \
        --project='${PROJECT_ID}' \
        --inputBigQuerySQL='SELECT * FROM \`bigquery-public-data.google_analytics_sample.ga_sessions_*\`' \
        --predictionFactName='hits.eventInfo.eventAction' \
        --predictionFactValues='Add to Cart' \
        --outputSessionsAvroPrefix='gs://${BUCKET_NAME}/usersession-output/${RUN_ID}'"
    

    This command creates a unique RUN_ID folder for the output in order to prevent the pipeline from overwriting files from previous runs.

    The output is similar to the following:

    [INFO] --------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] --------------------------------------------------------------------
    [INFO] Total time:   19.760 s
    [INFO] Finished at:  2020-02-19T10:50:57+11:00
    [INFO] --------------------------------------------------------------------
    
  2. Verify that your job created staging files in the Cloud Storage folder gs://${BUCKET_NAME}/usersession-output/:

    gsutil ls gs://${BUCKET_NAME}/usersession-output/${RUN_ID}*
    

    The output is similar to the following:

    gs://mldatawindowingpipeline/usersession-output/
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00000-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00001-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00002-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00003-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00004-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00005-of-00025.avro
    gs://mldatawindowingpipeline/usersession-output/2020-10-27/1603838232-00006-of-00025.avro
    

Step 2 (optional): Store data to BigQuery for exploration

Although this next pipeline (DataVisualizationPipeline.java) is optional, it outputs your data so that you can explore it before processing it further. Using data from Step 1, this pipeline generates the following BigQuery tables (if they already exist, they are updated). These tables let you plot and check your data for consistency over time.

  • A facts table. This table includes a list of variables and values for each activity time for each user (based on the original data).
  • An instance table. This table includes selected ML instances for each snapshot date (also called the effective date). That is, for each snapshot date, the pipeline outputs data on all existing users that performed an action before that date along with the following information:
    • A label within a predefined prediction window (hasPositiveLabel column)
    • Days between the first available activity and the snapshot date (daysSinceFirstActivity column)
    • Days since the last available activity and the snapshot date (daysSinceLatestActivity column)

The combination of a user and snapshot date defines an instance snapshot. The snapshot dates are moved by a predefined sliding time, as the following diagram shows.

A snapshot instance that shows the lookback and prediction windows for a given date.

The snapshotStartDate and snapshotEndDate define the first and the last snapshot dates to extract, and maximumLookaheadTime and minimumLookaheadTime define the prediction window with respect to the current snapshot date. You supply arguments for these parameters as inputs in this step. These arguments are further explained in the following table.

When you call the DataVisualizationPipeline.java pipeline in Cloud Shell, use the arguments in the following table.

Flag Description Example argument
--inputAvroSessionsLocation The location of the AVRO files from Step 1. gs://${BUCKET_NAME}/usersession-output/
--snapshotStartDate First snapshot date to consider, in dd/mm/yyyy format.

31/01/2017

--snapshotEndDate

Last snapshot date to consider, in dd/mm/yyyy format.

31/01/2018

--slideTimeInSeconds

The time interval (in seconds) to slide snapshot dates by. 604800 (for 7 days)

--minimumLookaheadTimeInSeconds

The time (in seconds) for the prediction window, to start from the current snapshot date. 86400 (for a prediction window that starts 86400 seconds from the current snapshot date, that is, the next day)

--maximumLookaheadTimeInSeconds

The time (in seconds) for the prediction window to end, starting from the current snapshot date. The length of the prediction window is the difference between maximumLookaheadTimeInSeconds and minimumLookaheadTimeInSeconds. 1209600 (for a prediction window of 14 days)

--stopOnFirstPositiveLabel

Stop considering a user once they have a positive label in the subsequent snapshots.

true

--outputBigQueryFactsTable

The location of the BigQuery facts table. Note the format of the argument:

${PROJECT_ID}.${DATASET}.TABLE_ID

Replace the following:
  • ${DATASET}: ID of your BigQuery dataset
  • TABLE_ID: ID of your BigQuery facts table

${PROJECT_ID}.${DATASET}.ga_facts_table

--outputBigQueryUserActivityTable

The location of your BigQuery instance table. Note the format of the argument:

${PROJECT_ID}.${DATASET}.INSTANCE_ID

Replace the following:
  • ${DATASET}: ID of your BigQuery dataset
  • INSTANCE_ID: ID of your BigQuery instance table

${PROJECT_ID}.${DATASET}.ga_instance_table

  1. In Cloud Shell, run the DataVisualizationPipeline.java pipeline on Dataflow:

    mvn -Pdataflow-runner compile exec:java \
    -D_MAIN_CLASS=DataVisualizationPipeline \
    -Dexec.args="--runner=DataflowRunner \
        --project=${PROJECT_ID} \
        --inputAvroSessionsLocation='gs://${BUCKET_NAME}/usersession-output/${RUN_ID}*' \
        --snapshotStartDate='31/01/2017' \
        --snapshotEndDate='31/01/2018' \
        --slideTimeInSeconds='604800' \
        --minimumLookaheadTimeInSeconds='86400' \
        --maximumLookaheadTimeInSeconds='1209600' \
        --stopOnFirstPositiveLabel='true' \
        --outputBigQueryFactsTable='${PROJECT_ID}:${DATASET}.ga_facts_table' \
        --outputBigQueryUserActivityTable='${PROJECT_ID}:${DATASET}.ga_instance_table'"
    
  2. Verify that Dataflow created the two BigQuery tables:

    bq ls ${PROJECT_ID}:${DATASET}
    

    The output is similar to the following:

          tableID         Type     Labels      Time Partitioning      Clustered Fields
    ------------------- -------- ----------  -------------------  --------------------
     ga_facts_table      TABLE
     ga_instance_table   TABLE
    

After Dataflow creates the tables, you can analyze the consistency and integrity of your data. The following analyses can help you check for random spikes in data over time, or for inconsistent data trends and data points:

  • Analyze the size and the prediction label distribution of instances over time.
  • Analyze daysSinceFirstActivity and daysSinceLastActivity data to understand and define a meaningful lookback window period.
  • Filter the instances based on prediction label imbalance or label values, or by daysSinceLastActivity to select active instances for model building.

Step 3: Create windows of data

Using the AVRO files from Step 1, you can use the WindowingPipeline.java pipeline to build snapshots of snapshot dates sliding at set intervals. This pipeline windows the data based on specified lookback and prediction windows that slide along the timeline of snapshot dates, as the following diagram shows.

Snapshot dates sliding at set intervals.

The terms used in the diagram are defined in the following table of arguments for this step. The dataset is now ready to be aggregated into features in Step 4.

The WindowingPipeline.java pipeline also outputs windowed data for every GA session instead of a snapshot date. This data can be useful if you want to generate features at the GA session level.

When you call the WindowingPipeline.java pipeline in Cloud Shell, use the arguments in the following table.

Flag Description Example argument

--inputAvroSessionsLocation

The location of the AVRO files from Step 1.

gs://${BUCKET_NAME}/usersession-output/

--snapshotStartDate

First snapshot date to consider, in dd/mm/yyyy format.

31/01/2017

--snapshotEndDate

Last snapshot date to consider, in dd/mm/yyyy format.

31/01/2018

--slideTimeInSeconds

The time interval (in seconds) to slide snapshot dates by. 604800 (for 7 days)

--minimumLookaheadTimeInSeconds

The time (in seconds) for the prediction window, to start from the current snapshot date. 86400 (for a prediction window that starts 86400 seconds from the current snapshot date, that is, the next day)

--maximumLookaheadTimeInSeconds

The time (in seconds) for the prediction window to end, starting from the current snapshot date. The length of the prediction window is the difference between maximumLookaheadTimeInSeconds and minimumLookaheadTimeInSeconds. 1209600 (for a prediction window of 14 days)

--stopOnFirstPositiveLabel

Stop considering a user after they have a positive label in the subsequent snapshots.

true

--lookbackGapInSeconds

Gap (in seconds) to add between the current snapshot date and the lookback window. To simulate a real-life scoring scenario such as data lags, this setting is typically set to one day. 86400 (for 1 day)

--windowTimeInSeconds

The size (in seconds) of the lookback window. 7776000 (for 90 days)

--outputSlidingWindowAvroPrefix

The location on Cloud Storage to output the AVRO files to, for windowed data at date intervals that are based on the sliding amount specified.

gs://${BUCKET_NAME}/windowing-output/

--outputSessionBasedWindowAvroPrefix

The location on Cloud Storage to output the AVRO files to, for windowed data at date intervals based on Google Analytics sessions. In this data, one window is output for every session.

gs://${BUCKET_NAME}/windowing-session-output/

  1. In Cloud Shell, run the WindowingPipeline.java pipeline on Dataflow:

    mvn -Pdataflow-runner compile exec:java \
    -D_MAIN_CLASS=SlidingWindowPipeline \
    -Dexec.args="--runner=DataflowRunner \
        --project=${PROJECT_ID} \
        --inputAvroSessionsLocation='gs://${BUCKET_NAME}/usersession-output/${RUN_ID}*' \
        --snapshotStartDate='31/01/2017' \
        --snapshotEndDate='31/01/2018' \
        --slideTimeInSeconds='604800' \
        --minimumLookaheadTimeInSeconds='86400' \
        --maximumLookaheadTimeInSeconds='1209600' \
        --stopOnFirstPositiveLabel='true' \
        --lookbackGapInSeconds='86400' \
        --windowTimeInSeconds='7776000' \
        --outputSlidingWindowAvroPrefix='gs://${BUCKET_NAME}/windowing-output/${RUN_ID}'"
    
  2. Verify that your job created staging files in the Cloud Storage folder gs://${BUCKET_NAME}/usersession-output/:

    gsutil ls gs://${BUCKET_NAME}/windowing-output/${RUN_ID}*
    

    The output is similar to the following:

    gs://mldatawindowingpipeline/windowing-session-output/
    gs://mldatawindowingpipeline/windowing-session-output/-00000-of-00052.avro
    gs://mldatawindowingpipeline/windowing-session-output/-00001-of-00052.avro
    gs://mldatawindowingpipeline/windowing-session-output/-00002-of-00052.avro
    gs://mldatawindowingpipeline/windowing-session-output/-00003-of-00052.avro
    gs://mldatawindowingpipeline/windowing-session-output/-00004-of-00052.avro
    

Step 4: Aggregate windowed data to create features

Using the AVRO files from Step 3, the GenerateFeaturesPipeline.java pipeline outputs a BigQuery table that contains features for all snapshots of users and is ready for ML processing. The features are based on different types of aggregated events that occurred in the lookback windows that you created earlier.

You can create two types of features:

  • Numerical. These features have meaning as a measurement, such as sums, counts, and averages.
  • Categorical. These features represent characteristics such as most frequent and latest.

Whether you create numerical or categorical features, the GenerateFeaturesPipeline.java pipeline requires the arguments listed in the following table. When you call the pipeline in Cloud Shell, you pass these arguments.

Flag Description Example argument

--windowedAvroLocation

The location of the sliding or session window AVRO files from Step 3.

gs://${BUCKET_NAME}/windowing-output/

--featureDestinationTable

The location of the BigQuery features table.

'${PROJECT_ID}':${DATASET}.ga_features_table

--trainMode

To include predictionLabel in the output, set to true.

true

--showEffectiveDate

To include the effective date (snapshot date) in the output, set to true.

true

--showStartTime

To include the start time to consider data from in the output, set to true.

true

--showEndTime

To include the end time to consider data from in the output, set to true.

true

--showEffectiveDateWeekOfYear

To include the week of the year of the effective date (snapshot date) in the output, set to true.

true

--showEffectiveDateMonthOfYear

To include the month of the year of the effective date (snapshot date) in the output, set to true.

true

--showUserTenure

To include userTenure (daysSinceFirstActivity) in the output, set to true.

true

To create either numerical or categorical features, the GenerateFeaturesPipeline.java pipeline requires other arguments.

Arguments for numerical features

To create numerical features, the GenerateFeaturesPipeline.java pipeline also requires the arguments listed in the following table.

Flag Description Example argument

--sumValueFromVariables

This feature calculates the sum of all numeric values of a given variable within the lookback window. The input expects at least one item, or a comma-separated list of two or more items.

totals.hits, totals.pageviews, totals.timeOnSite

--averageByTenureValueFromVariables

This feature calculates the average per day of all numeric values of a given variable within the lookback window (or their tenure if it's smaller than the lookback window size). The input expects at least one item, or a comma-separated list of two or more items.

totals.hits, totals.pageviews, totals.timeOnSite

--averageValueFromVariables

This feature calculates the average of values of a given variable in the lookback window. The input expects at least one item, or a comma-separated list of two or more items.

totals.hits, totals.pageviews, totals.timeOnSite

Arguments for categorical features

To create categorical features, the GenerateFeaturesPipeline.java pipeline requires the arguments listed in the following table.

Flag Description Example argument

--countValueFromVariables

This feature counts the number of occurrences of a value in the lookback window. The input expects a comma‑ separated list of column names and a list of values to count. If the value is not within the list of values submitted, the value is considered under the default value [Others].

channelGrouping:[Organic Search,Social,Direct,Referral,Paid Search,Affiliates,Others]:[Others],trafficSource.source:[direct,yahoo,google,Others]:[Others]

--mostFreqValueFromVariables

This feature calculates the most frequent value active within the lookback window. The input expects a comma‑separated list of column names and a list of values from which to find the most frequent. If the value is not within the list of values submitted, the value is considered under the default value [Others].

channelGrouping:[Organic Search,Social,Direct,Referral,Paid Search,Affiliates,Others]:[Others],trafficSource.source:[direct,yahoo,google,Others]:[Others]

--proportionsValueFromVariables

This feature calculates the proportions of a set of values that are active within the lookback window. The input expects a comma‑separated list of column names and a list of values to find proportions from. If the value is not within the list of values submitted, the value is considered under the default value [Others].

channelGrouping:[Organic Search,Social,Direct,Referral,Paid Search,Affiliates,Others]:[Others],trafficSource.source:[direct,yahoo,google,Others]:[Others]

--recentValueFromVariables

This feature extracts the most recent value used in the lookback window. The input expects a comma‑separated list of column names and a list of values. If the value is not within the list of values submitted, the value is considered under the default value [Others].

channelGrouping:[Organic Search,Social,Direct,Referral,Paid Search,Affiliates,Others]:[Others],trafficSource.source:[direct,yahoo,google,Others]:[Others]

  1. In Cloud Shell, run the GenerateFeaturesPipeline.java pipeline on Dataflow:

    mvn -Pdataflow-runner compile exec:java \
    -D_MAIN_CLASS=GenerateFeaturesPipeline \
    -Dexec.args="--runner=DataflowRunner \
        --project='${PROJECT_ID}' \
        --windowedAvroLocation='gs://${BUCKET_NAME}/windowing-output/${RUN_ID}*.avro' \
        --featureDestinationTable='${PROJECT_ID}:${DATASET}.ga_features_table' \
        --recentValueFromVariables='' \
        --averageByTenureValueFromVariables='' \
        --sumValueFromVariables='totals.hits,totals.pageviews,totals.timeOnSite' \
        --countValueFromVariables='' \
        --proportionsValueFromVariables='channelGrouping:[Organic Search,Social,Direct,Referral,Paid Search,Affiliates,Others]:[Others],trafficSource.source:[direct,yahoo,google,Others]:[Others]' \
        --averageValueFromVariables='' \
        --mostFreqValueFromVariables=''"
    
  2. In the Cloud Console, go to the Dataflow page.

    Go to the Dataflow page

  3. Verify that the job status is Running at first and then Succeeded.

    Job status displays as succeeded.

  4. In the Cloud Console, go to the BigQuery page.

    Go to the BigQuery page

  5. Check that the ga_features_table is listed.

    Listings in BigQuery features table.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, you can delete the Google Cloud project that you created for this tutorial, or delete the resources associated with this tutorial.

Delete the Google Cloud project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the resources

If you want to keep the Google Cloud project you used in this tutorial, delete the individual resources.

Delete Cloud Storage resources

  1. In the Cloud Console, go to the Cloud Storage page.

    Go to the Cloud Storage page

  2. For the bucket that you want to delete, select the checkbox.

  3. Click Delete.

Delete BigQuery resources

  1. In the Cloud Console, go to the BigQuery page.

    Go to the BigQuery page

  2. Select the tables that you want to delete.

  3. Click Delete.

What's next