Building a secure anomaly detection solution using Dataflow, BigQuery ML, and Cloud Data Loss Prevention

This tutorial shows you how to build a secure ML-based network anomaly detection solution for telecommunication networks. This type of solution is used to help identify cybersecurity threats.

This tutorial is intended for data engineers and scientists and assumes that you have basic knowledge of the following:

Reference architecture

The following diagram shows the components used to build an ML-based network anomaly detection system. Pub/Sub and Cloud Storage serve as data sources. Dataflow aggregates and extracts features from this data tokenized with DLP API. BigQuery ML creates a k-means clustering model from these features and Dataflow identifies the outliers.

Anomaly detection reference architecture using
Dataflow and BigQuery ML.

Objectives

  • Create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data.
  • Aggregate and extract features from NetFlow log data using Dataflow.
  • Create a BigQuery ML k-means clustering model.
  • Tokenize sensitive data with the DLP API.
  • Create a Dataflow pipeline for real-time outlier detection, using normalized and trained data.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. In the Cloud Console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Cloud Console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Cloud SDK already installed, including the gcloud command-line tool, and with values already set for your current project. It can take a few seconds for the session to initialize.

  5. You run all commands in this tutorial from the Cloud Shell.
  6. In Cloud Shell, enable the BigQuery, Dataflow, Cloud Storage, and DLP APIs.

    gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
      dataflow.googleapis.com storage-component.googleapis.com \
      pubsub.googleapis.com cloudbuild.googleapis.com
    

Generating synthetic data using Dataflow and Pub/Sub

In this section, you create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data by triggering an automated Dataflow pipeline.

Create a Pub/Sub topic and a subscription

    In Cloud Shell, create a Pub/Sub topic and a subscription:
    export PROJECT_ID=$(gcloud config get-value project)
    export TOPIC_ID=TOPIC_ID
    export SUBSCRIPTION_ID=SUBSCRIPTION_ID
    gcloud pubsub topics create $TOPIC_ID
    gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID 
    Replace the following:
    • TOPIC_ID: the name of the Pub/Sub topic
    • SUBSCRIPTION_ID: the name of the Pub/Sub subscription

Trigger the synthetic data generation pipeline

  1. In Cloud Shell, clone the GitHub repository:

    git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
    cd df-ml-anomaly-detection
    
  2. To enable submitting a job automatically, grant Dataflow permissions to your Cloud Build service account:

    export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
      --format="value(PROJECT_NUMBER)")
      gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/dataflow.admin
    
  3. Start the synthetic data generation pipeline:

    gcloud builds submit . --machine-type=n1-highcpu-8 \
      --config scripts/cloud-build-data-generator.yaml \
      --substitutions _TOPIC_ID=${TOPIC_ID}
    

    Because of the large code package, you must use a high memory machine type. For this tutorial, use machine-type=n1-highcpu-8.

  4. Validate that the log data is published in the subscription:

    gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
    cat raw_log.txt
    

    The output contains a subset of NetFlow log schema fields populated with random values, similar to the following:

    {
     \"subscriberId\": \"mharper\",
     \"srcIP\": \"12.0.9.4",
     \"dstIP\": \"12.0.1.2\",
     \"srcPort\": 5000,
     \"dstPort\": 3000,
     \"txBytes\": 15,
     \"rxBytes\": 40,
     \"startTime\": 1570276550,
     \"endTime\": 1570276559,
     \"tcpFlag\": 0,
     \"protocolName\": \"tcp\",
     \"protocolNumber\": 0
    }
    

Extracting features and finding outlier data

In this section, you create BigQuery tables to store feature and outlier data processed by the anomaly detection pipeline.

Create BigQuery tables for storing feature and outlier data

  1. In Cloud Shell, create a BigQuery dataset:

    export DATASET_NAME=dataset-name
    bq --location=US mk -d \
      --description "Network Logs Dataset" \
      ${DATASET_NAME}
    
  2. Create BigQuery tables:

    bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
      --time_partitioning_type=DAY \
      --clustering_fields="dst_subnet,subscriber_id" \
      --description "Network Log Feature Table" \
      ${PROJECT_ID}:${DATASET_NAME}.cluster_model_data
    
    bq mk -t --schema src/main/resources/outlier_table_schema.json \
      --description "Network Log Outlier Table" \
      ${PROJECT_ID}:${DATASET_NAME}.outlier_data
    
    bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
      --description "Sample Normalized Data" \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
    

    The following tables are generated:

    • cluster_model_data: a clustered partition table that stores feature values for model creation.
    • outlier_data: an outlier table that stores anomalies.
    • normalized_centroid_data: a table pre-populated with normalized data created from a sample model.
  3. Load the centroid data into the tables:

    bq load \
      --source_format=NEWLINE_DELIMITED_JSON \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
      gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
    

Create and trigger a Dataflow Flex Template

In this section, you create a Dataflow Flex Template to trigger the anomaly detection pipeline.

  1. In Cloud Shell, create a Docker image in your project:

    gcloud auth configure-docker
    gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
    
  2. Upload the Flex Template configuration file to the Cloud Storage bucket that you created earlier:

    export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
    gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
    cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
    {"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
    "sdk_info": {"language": "JAVA"}
    }
    EOF
    

    Replace the following:

    • PROJECT_ID: your Cloud project ID
    • DF_TEMPLATE_CONFIG: the name of the Cloud Storage bucket for your Dataflow Flex Template configuration file
    • REGION: the region where you created the Cloud Storage bucket
  3. Create a SQL file to pass the normalized model data as a pipeline parameter:

    echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
    gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
    
  4. Run the anomaly detection pipeline:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true
    
  5. In the Cloud Console, go to the Dataflow page.

    Go to the Dataflow page

  6. Click the netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` job. A representation of the Dataflow pipeline that's similar to the following appears:

Anomaly detection pipeline job view in the Dataflow
monitoring user interface.

Publish an outlier message for testing

You can publish a message to validate that the outlier message is correctly detected from the pipeline.

  1. In Cloud Shell, publish the following message:

    gcloud pubsub topics publish ${TOPIC_ID} --message \
    "{\"subscriberId\": \"00000000000000000\",  \
    \"srcIP\": \"12.0.9.4\", \
    \"dstIP\": \"12.0.1.3\", \
    \"srcPort\": 5000, \
    \"dstPort\": 3000, \
    \"txBytes\": 150000, \
    \"rxBytes\": 40000, \
    \"startTime\": 1570276550, \
    \"endTime\": 1570276550, \
    \"tcpFlag\": 0, \
    \"protocolName\": \"tcp\", \
    \"protocolNumber\": 0}"
    

    Notice the unusually high number of transmission (txBytes) and receiving bytes (rxBytes) compared to the range (100 to 500 bytes) set up for synthetic data. This message might indicate a security risk to validate.

  2. After a minute or so, validate that the anomaly is identified and stored in the BigQuery table:

    export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    WHERE subscriber_id like "0%" limit 1'
    bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
    cat outlier_orig.txt
    

    The output is similar to the following:

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |   transaction_time |
    +---------------+--------------+----------------------------+
    | 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

Creating a k-means clustering model using BigQuery ML

  1. In the Cloud Console, go to the BigQuery Query editor page.

    Go to Query editor

  2. Select training data from the feature table and create a k-means clustering model using BigQuery ML:

    --> temp table for training data
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_ID.train_data as
    (SELECT * FROM DATASET_ID.cluster_model_data
    WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
    AND NOT IS_NAN(avg_tx_bytes)
    AND NOT IS_NAN(avg_rx_bytes)
    AND NOT IS_NAN(avg_duration))
    limit 100000;
    
    --> create a model using BigQuery ML
    #standardSQL
    CREATE OR REPLACE MODEL DATASET_ID.log_cluster options(model_type='kmeans', standardize_features = true) AS
    SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
    FROM DATASET_ID.train_data;
    

    Replace the following:

    • START_DATE and END_DATE: the current date
    • DATASET_ID: your created dataset ID
  3. Normalize the data for each cluster:

    --> create normalize table for each centroid
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_ID.normalized_centroid_data as(
    with centroid_details AS (
    SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
    order by centroid_id) AS cluster
    from ML.CENTROIDS(model DATASET_ID.log_cluster)
    group by centroid_id
    ),
    cluster as (select centroid_details.centroid_id as centroid_id,
    (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
    (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
    (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
    (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
    (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
    (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
    (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
    (select value from unnest(cluster) where name = 'max_duration') AS max_duration,
    (select value from unnest(cluster) where name = 'min_duration') AS min_duration,
    (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
    FROM centroid_details order by centroid_id asc),
    predict as
    (select * from ML.PREDICT(model DATASET_ID.log_cluster,
    (select * from DATASET_ID.train_data)))
    select c.centroid_id as centroid_id,
    (stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+      (p.avg_rx_bytes-c.min_rx_bytes)
    +(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
    as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes,  any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as   avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes,   any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes,  any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
    as max_duration , any_value(c.min_duration) as min_duration
    from predict as p
    inner join cluster as c on c.centroid_id = p.centroid_id
    group by c.centroid_id);
    

    This query calculates a normalized distance for each cluster by using the standard deviation function between the input and centroid vectors. In other words, it implements the following formula:

    stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))

  4. Validate the normalized_centroid_data table:

    #standardSQL
    SELECT * from DATASET_ID.normalized_centroid_data
    

    Replace DATASET_ID with the ID of your created dataset.

    The result from this statement is a table of calculated normalized distances for each centroid ID:

    Normalized data for each k-means cluster.

    De-identifying data using Cloud DLP

In this section, you reuse the pipeline by passing an additional parameter to de-identify the international mobile subscriber identity (IMSI) number in the subscriber_id column.

  1. In Cloud Shell, create a crypto key:

    export TEK=$(openssl rand -base64 32); echo ${TEK}
    a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
    
  2. To launch the code editor, on the toolbar of the Cloud Shell window, click Open Editor .

  3. Click File > New File and create a file named deid_template.json.

  4. Copy the following JSON block into the new file:

    {
      "deidentifyTemplate": {
        "displayName": "Config to de-identify IMEI Number",
        "description": "IMEI Number masking transformation",
        "deidentifyConfig": {
          "recordTransformations": {
            "fieldTransformations": [
              {
                "fields": [
                  {
                    "name": "subscriber_id"
                  }
                ],
                "primitiveTransformation": {
                  "cryptoDeterministicConfig": {
                    "cryptoKey": {
                      "unwrapped": {
                        "key": "CRYPTO_KEY"
                      }
                    },
                    "surrogateInfoType": {
                      "name": "IMSI_TOKEN"
                    }
                  }
                }
              }
            ]
          }
        }
      },
      "templateId": "dlp-deid-subid"
    }
    

    Replace CRYPTO_KEY with the crypto key that you previously created. It's a best practice to use a Cloud KMS wrapped key for production workloads. Save the file.

  5. In the Cloud Shell toolbar, click Open Terminal.

  6. In the Cloud Shell terminal, create a Cloud DLP de-identify template:

    export DLP_API_ROOT_URL="https://dlp.googleapis.com"
    export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
    export DEID_CONFIG="@deid_template.json"
    
    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST -H "Content-Type: application/json" \
       -H "Authorization: Bearer ${ACCESS_TOKEN}" \
       "${DEID_TEMPLATE_API}" \
       -d "${DEID_CONFIG}"
    

    This creates a template with the following name in your Cloud project:

    "name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"

  7. Stop the pipeline that you triggered in an earlier step:

    gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
    
  8. Trigger the anomaly detection pipeline using the Cloud DLP de-identify the template name:

    gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true,\
    deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
    
  9. Query the outlier table to validate that the subscriber ID is successfully de-identified:

    export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    ORDER BY transaction_time DESC'
    
    bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt
    
    cat outlier_deid.txt
    

    The output is similar to the following:

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |      transaction_time      |
    +---------------+--------------+----------------------------+
    | IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

    If the subscriber ID was de-identified, the subscriber_id column is no longer the original subscriber ID, which was 00000000000.

Cleaning up

If you don't intend to continue with the tutorials in the series, the easiest way to eliminate billing is to delete the Cloud project you created for the tutorial. Alternatively, you can delete the individual resources.

Delete the project

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next