Detecting anomalies in financial transactions by using AI Platform, Dataflow, and BigQuery

Stay organized with collections Save and categorize content based on your preferences.

This tutorial shows you how to implement an anomaly detection application that identifies fraudulent transactions by using a boosted tree model.

This tutorial is intended for developers, data engineers and data scientists, and assumes that you have basic knowledge of the following:

  • Machine learning model development with TensorFlow and Python
  • Standard SQL
  • Dataflow pipelines built using the Apache Beam Java SDK

Architecture

The sample application consists of the following components:

  • A boosted tree model developed using TensorFlow and deployed to AI Platform.
  • A Dataflow pipeline that completes the following tasks:

    • Publishes transaction data from a Cloud Storage bucket to a Pub/Sub topic, then reads that data as a stream from a Pub/Sub subscription to that topic.
    • Gets fraud likelihood estimates for each transaction by using the Apache Beam Timer API to micro-batch calls to the AI Platform prediction API.
    • Writes transaction data and fraud likelihood data to BigQuery tables for analysis.

The following diagram illustrates the architecture of the anomaly detection solution:

Diagram showing the architecture of the anomaly detection solution.

Dataset

The boosted tree model used in this tutorial is trained on the Synthetic Financial Dataset For Fraud Detection from Kaggle. This dataset was generated using the PaySim simulator.

We use a synthetic dataset because there are few financial datasets appropriate for fraud detection, and those that exist often contain personally identifiable information (PII) that needs to be anonymized.

Objectives

  • Create a boosted tree model that estimates the probability of fraud in financial transactions.
  • Deploy the model to AI Platform for online prediction.
  • Use a Dataflow pipeline to:
    • Write transaction data from the sample dataset to a transactions table in BigQuery.
    • Send microbatched requests to the hosted model to retrieve fraud probability predictions, and write the results to a fraud_detection table in BigQuery.
  • Run a BigQuery query that joins these tables to see the probability of fraud for each transaction.

Costs

This tutorial uses the following billable components of Google Cloud:

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks APIs.

    Enable the APIs

Check quota availability

  1. Open the IAM Quotas page
  2. Check that you have the following Compute Engine API quotas available in the us-central1 region; you need these quotas in order to run the Dataflow job used in this tutorial. If you don't, request a quota increase.

    Limit name Quota
    CPUs 241
    In-use IP addresses 30
    Instance groups 1
    Instance templates 1
    Managed instance groups 1
    Persistent Disk Standard (GB) 12900

Create and deploy the model

Follow the instructions in this section to create a boosted tree model to predict fraud in financial transactions.

Create a notebook

  1. In the console, go to the Notebooks page.

    Go to Notebooks

  2. On the User-managed notebooks tab, click  New notebook.

  3. Choose TensorFlow Enterprise 1.15 without GPUs

    Show instance type to select.

  4. For Instance name, type boosted-trees.

  5. Click Create. It takes a few minutes for the notebook instance to be created.

  6. When the instance is available, click Open JupyterLab.

  7. In the Notebook section of the JupyterLab Launcher, click Python 3.

Download the sample data

Download a copy of the sample database:

  1. Copy the following code into the first cell of the notebook:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. Click Run in the menu bar.

Prepare the data for use in training

The sample data is imbalanced, which can lead to an inaccurate model. The following code corrects the imbalance through the use of downsampling, then splits the data into a training set and a testing set.

Prepare the data by copying the following code into the second cell of the notebook and then running it:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

After the code completes, it outputs several example rows of the processed data. You should see results similar to the following:

First 5 rows of processed training data.

Create and train the model

Create and train the model by copying the following code into the third cell of the notebook and then running it:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

After the code completes, it outputs a set of metrics that describe the model's performance. You should see results similar to the following, with the accuracy and auc values around 99%:

Performance metrics for the boosted tree model.

Test the model

Test the model to verify that it labels the fraudulent transactions correctly by copying the following code into the fourth cell of the notebook and then running it:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

After the code completes, it outputs the predicted and actual fraud likelihood for the test data. You should see results similar to the following:

Predicted and actual fraud likelihood results for the test data.

Export the model

Create a SavedModel from the trained model and export it to Cloud Storage by copying the following code into the fifth cell of the notebook and then running it. Replace myProject with the ID of the project you are using to complete this tutorial.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

After the code completes, it returns a SignatureDef that describes the inputs and outputs of the model. You should see results similar to the following:

SignatureDef that describes the inputs and outputs of the model.

Deploy the model to AI Platform

Deploy the model for predictions by copying the following code into the sixth cell of the notebook and then running it. It takes a few minutes for the model version to be created.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

Get predictions from the deployed model

Get predictions for the test data set by copying the following code into the seventh cell of the notebook and then running it:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

After the code completes, it returns predictions for the test data. You should see results similar to the following:

Predictions from the deployed model for test data.

Create and run the pipeline

Create a Dataflow pipeline that reads financial transaction data, requests fraud prediction information for each transaction from the AI Platform model, and then writes both transaction and fraud prediction data to BigQuery for analysis.

Create the BigQuery dataset and tables

  1. Open the BigQuery console
  2. In the Resources section, select the project in which you are completing this tutorial.
  3. Click Create dataset.

    Show location of Create Dataset button.

  4. On the Create dataset page:

    • For Dataset ID, type fraud_detection.
    • For Data location, choose United States (US).
    • Click Create dataset.
  5. In the Query editor pane, run the following SQL statements to create the transactions and fraud_prediction tables:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Create the Pub/Sub topic and subscription

  1. Open the Pub/Sub console
  2. Click Create Topic.
  3. For Topic ID, type sample_data.
  4. Click Create Topic.
  5. Click Subscriptions.
  6. Click Create Subscription.
  7. For Subscription ID, type sample_data.
  8. For Select a Cloud Pub/Sub topic, select projects/<myProject>/topics/sample_data.
  9. Scroll down to the bottom of the page and click Create.

Run the Dataflow pipeline

  1. Activate Cloud Shell
  2. In Cloud Shell, run the command below to run the Dataflow pipeline, replacing myProject with the ID of the project you are using to complete this tutorial:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    To adapt this pipeline to production, you can alter the batchSize and keyRange parameter values to control the size and timing of the prediction request batches. Consider that:

    • Using a small batch size and high key range value results in faster processing, but also might exceed quota limitations and require you to request additional quota.
    • Using a larger batch size and low key range value will be slower but more likely to keep the operations within quota.
  3. Open the Dataflow Jobs page

  4. In the jobs list, click anomaly-detection.

  5. Wait until the job graph appears, and the StreamFraudData element of the graph shows greater than a 0 second run time.

Verify the data in BigQuery

Verify the data was written to BigQuery by running a query to see the transactions that have been identified as fraudulent.

  1. Open the BigQuery console
  2. In the Query Editor pane, run the following query:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    You should see results similar to the following:

    First 9 rows of fraud probability results.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project containing the resources, or keep the project but delete just those resources.

Either way, you should remove those resources so you won't be billed for them in the future. The following sections describe how to delete these resources.

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the components

If you don't want to delete the project, use the following sections to delete the billable components of this tutorial.

Stop the Dataflow job

  1. Open the Dataflow Jobs page
  2. In the jobs list, click anomaly-detection.
  3. On the job details page, click Stop.
  4. Select Cancel.
  5. Click Stop job.

Delete the Cloud Storage buckets

  1. Open the Cloud Storage browser
  2. Select the checkboxes of the <myProject>-bucket and dataflow-staging-us-central1-<projectNumber> buckets.
  3. Click Delete.
  4. In the overlay window that appears, type DELETE and then click Confirm.

Delete the Pub/Sub topic and subscription

  1. Open the Pub/Sub Subscriptions page
  2. Select the checkbox of the sample_data subscription.
  3. Click Delete.
  4. In the overlay window that appears, confirm you want to delete the subscription and its contents by clicking Delete.
  5. Click Topics.
  6. Select the checkbox of the sample_data topic.
  7. Click Delete.
  8. In the overlay window that appears, type delete and then click Delete.

Delete the BigQuery dataset and tables

  1. Open the BigQuery console
  2. In the Resources section, expand the project in which you are completing this tutorial and select the fraud_detection dataset.
  3. Click Delete dataset in the header of the dataset pane.
  4. In the overlay window that appears, type fraud_detection and then click Delete.

Delete the AI Platform model

  1. Open the AI Platform Models page
  2. In the models list, click fraud_detection_with_key.
  3. On the Model Details page, select the checkbox for the v1 (default) version.
  4. Click More, then click Delete.
  5. When the version has finished deleting, click Back to return to the models list.
  6. Select the checkbox for the fraud_detection_with_key model.
  7. Click More, then click Delete.

Delete the notebook

  1. Open the User-managed notebooks tab
  2. Select the checkbox for the boosted-trees notebook instance.
  3. Click Delete.
  4. In the overlay window that appears, click Delete.

What's next