This tutorial shows you how to implement an anomaly detection application that identifies fraudulent transactions by using a boosted tree model.
This tutorial is intended for developers, data engineers and data scientists, and assumes that you have basic knowledge of the following:
- Machine learning model development with TensorFlow and Python
- Standard SQL
- Dataflow pipelines built using the Apache Beam Java SDK
Architecture
The sample application consists of the following components:
- A boosted tree model developed using TensorFlow and deployed to AI Platform.
A Dataflow pipeline that completes the following tasks:
- Publishes transaction data from a Cloud Storage bucket to a Pub/Sub topic, then reads that data as a stream from a Pub/Sub subscription to that topic.
- Gets fraud likelihood estimates for each transaction by using the Apache Beam Timer API to micro-batch calls to the AI Platform prediction API.
- Writes transaction data and fraud likelihood data to BigQuery tables for analysis.
The following diagram illustrates the architecture of the anomaly detection solution:
Dataset
The boosted tree model used in this tutorial is trained on the Synthetic Financial Dataset For Fraud Detection from Kaggle. This dataset was generated using the PaySim simulator.
We use a synthetic dataset because there are few financial datasets appropriate for fraud detection, and those that exist often contain personally identifiable information (PII) that needs to be anonymized.
Objectives
- Create a boosted tree model that estimates the probability of fraud in financial transactions.
- Deploy the model to AI Platform for online prediction.
- Use a Dataflow pipeline to:
- Write transaction data from the sample dataset to a
transactions
table in BigQuery. - Send microbatched requests to the hosted model to retrieve
fraud probability predictions, and write the results to a
fraud_detection
table in BigQuery.
- Write transaction data from the sample dataset to a
- Run a BigQuery query that joins these tables to see the probability of fraud for each transaction.
Costs
This tutorial uses the following billable components of Google Cloud:
- AI Platform
- BigQuery
- Cloud Storage
- Compute Engine
- Dataflow
- Pub/Sub
To generate a cost estimate based on your projected usage,
use the pricing calculator.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks APIs.
Check quota availability
- Open the IAM Quotas page
Check that you have the following Compute Engine API quotas available in the us-central1 region; you need these quotas in order to run the Dataflow job used in this tutorial. If you don't, request a quota increase.
Limit name Quota CPUs 241 In-use IP addresses 30 Instance groups 1 Instance templates 1 Managed instance groups 1 Persistent Disk Standard (GB) 12900
Create and deploy the model
Follow the instructions in this section to create a boosted tree model to predict fraud in financial transactions.
Create a notebook
In the console, go to the Notebooks page.
On the User-managed notebooks tab, click
New notebook.Choose TensorFlow Enterprise 1.15 without GPUs
For Instance name, type
boosted-trees
.Click Create. It takes a few minutes for the notebook instance to be created.
When the instance is available, click Open JupyterLab.
In the Notebook section of the JupyterLab Launcher, click Python 3.
Download the sample data
Download a copy of the sample database:
Copy the following code into the first cell of the notebook:
!gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .
Click Run
in the menu bar.
Prepare the data for use in training
The sample data is imbalanced, which can lead to an inaccurate model. The following code corrects the imbalance through the use of downsampling, then splits the data into a training set and a testing set.
Prepare the data by copying the following code into the second cell of the notebook and then running it:
import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'
data = pd.read_csv('fraud_data_kaggle.csv')
# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]
# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)
# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)
# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])
# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]
train_test_split = int(len(df) * .8)
# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]
train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')
train_set.head()
After the code completes, it outputs several example rows of the processed data. You should see results similar to the following:
Create and train the model
Create and train the model by copying the following code into the third cell of the notebook and then running it:
# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))
feature_columns = []
for feature_name in CATEGORICAL_COLUMNS:
vocabulary = train_set[feature_name].unique()
feature_columns.append(one_hot_cat_column(feature_name, vocabulary))
for feature_name in NUMERIC_COLUMNS:
feature_columns.append(tf.feature_column.numeric_column(feature_name,
dtype=tf.float32))
# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
if shuffle:
dataset = dataset.shuffle(NUM_EXAMPLES)
dataset = dataset.repeat(n_epochs)
dataset = dataset.batch(NUM_EXAMPLES)
return dataset
return input_fn
train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)
# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)
# Train the model
model.train(train_input_fn, max_steps=100)
# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))
After the code completes, it outputs a set of metrics that describe the model's
performance. You should see results similar to the following, with the
accuracy
and
auc
values around 99%:
Test the model
Test the model to verify that it labels the fraudulent transactions correctly by copying the following code into the fourth cell of the notebook and then running it:
pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])
for i,val in enumerate(probabilities[:30]):
print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
print()
After the code completes, it outputs the predicted and actual fraud likelihood for the test data. You should see results similar to the following:
Export the model
Create a SavedModel from the trained model and export it to Cloud Storage by copying the following code into the fifth cell of the notebook and then running it. Replace myProject with the ID of the project you are using to complete this tutorial.
GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'
!gsutil mb $MODEL_BUCKET
def json_serving_input_fn():
feature_placeholders = {
'type': tf.placeholder(tf.string, [None]),
'step': tf.placeholder(tf.float32, [None]),
'amount': tf.placeholder(tf.float32, [None]),
'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
'newbalanceOrig': tf.placeholder(tf.float32, [None]),
'oldbalanceDest': tf.placeholder(tf.float32, [None]),
'newbalanceDest': tf.placeholder(tf.float32, [None]),
KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
}
features = {key: tf.expand_dims(tensor, -1)
for key, tensor in feature_placeholders.items()}
return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)
export_path = model.export_saved_model(
MODEL_BUCKET + '/explanations-with-key',
serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')
!saved_model_cli show --dir $export_path --all
After the code completes, it returns a SignatureDef that describes the inputs and outputs of the model. You should see results similar to the following:
Deploy the model to AI Platform
Deploy the model for predictions by copying the following code into the sixth cell of the notebook and then running it. It takes a few minutes for the model version to be created.
MODEL = 'fraud_detection_with_key'
!gcloud ai-platform models create $MODEL
VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10
!gcloud ai-platform versions describe $VERSION --model $MODEL
Get predictions from the deployed model
Get predictions for the test data set by copying the following code into the seventh cell of the notebook and then running it:
fraud_indices = []
for i,val in enumerate(test_labels):
if val == 1:
fraud_indices.append(i)
num_test_examples = 10
import numpy as np
def convert(o):
if isinstance(o, np.generic): return o.item()
raise TypeError
for i in range(num_test_examples):
test_json = {}
ex = test_set.iloc[fraud_indices[i]]
keys = ex.keys().tolist()
vals = ex.values.tolist()
for idx in range(len(keys)):
test_json[keys[idx]] = vals[idx]
print(test_json)
with open('data.txt', 'a') as outfile:
json.dump(test_json, outfile, default=convert)
outfile.write('\n')
!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'
After the code completes, it returns predictions for the test data. You should see results similar to the following:
Create and run the pipeline
Create a Dataflow pipeline that reads financial transaction data, requests fraud prediction information for each transaction from the AI Platform model, and then writes both transaction and fraud prediction data to BigQuery for analysis.
Create the BigQuery dataset and tables
- Open the BigQuery console
- In the Resources section, select the project in which you are completing this tutorial.
Click Create dataset.
On the Create dataset page:
- For Dataset ID, type
fraud_detection
. - For Data location, choose United States (US).
- Click Create dataset.
- For Dataset ID, type
In the Query editor pane, run the following SQL statements to create the
transactions
andfraud_prediction
tables:CREATE OR REPLACE TABLE fraud_detection.transactions ( step INT64, nameOrig STRING, nameDest STRING, isFlaggedFraud INT64, isFraud INT64, type STRING, amount FLOAT64, oldbalanceOrg FLOAT64, newbalanceOrig FLOAT64, oldbalanceDest FLOAT64, newbalanceDest FLOAT64, transactionId STRING ); CREATE OR REPLACE TABLE fraud_detection.fraud_prediction ( transactionId STRING, logistic FLOAT64, json_response STRING );
Create the Pub/Sub topic and subscription
- Open the Pub/Sub console
- Click Create Topic.
- For Topic ID, type
sample_data
. - Click Create Topic.
- Click Subscriptions.
- Click Create Subscription.
- For Subscription ID, type
sample_data
. - For Select a Cloud Pub/Sub topic, select projects/<myProject>/topics/sample_data.
- Scroll down to the bottom of the page and click Create.
Run the Dataflow pipeline
- Activate Cloud Shell
In Cloud Shell, run the command below to run the Dataflow pipeline, replacing myProject with the ID of the project you are using to complete this tutorial:
gcloud beta dataflow flex-template run "anomaly-detection" \ --project=myProject \ --region=us-central1 \ --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \ --parameters=autoscalingAlgorithm="NONE",\ numWorkers=30,\ maxNumWorkers=30,\ workerMachineType=n1-highmem-8,\ subscriberId=projects/myProject/subscriptions/sample_data,\ tableSpec=myProject:fraud_detection.transactions,\ outlierTableSpec=myProject:fraud_detection.fraud_prediction,\ inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\ modelId=fraud_detection_with_key,\ versionId=v1,\ keyRange=1024,\ batchSize=500000
To adapt this pipeline to production, you can alter the
batchSize
andkeyRange
parameter values to control the size and timing of the prediction request batches. Consider that:- Using a small batch size and high key range value results in faster processing, but also might exceed quota limitations and require you to request additional quota.
- Using a larger batch size and low key range value will be slower but more likely to keep the operations within quota.
In the jobs list, click anomaly-detection.
Wait until the job graph appears, and the StreamFraudData element of the graph shows greater than a 0 second run time.
Verify the data in BigQuery
Verify the data was written to BigQuery by running a query to see the transactions that have been identified as fraudulent.
- Open the BigQuery console
In the Query Editor pane, run the following query:
SELECT DISTINCT outlier.logistic as fraud_probablity, outlier.transactionId, transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud) FROM `fraud_detection.fraud_prediction` AS outlier JOIN fraud_detection.transactions AS transactions ON transactions.transactionId = outlier.transactionId WHERE logistic >0.99 ORDER BY fraud_probablity DESC;
You should see results similar to the following:
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project containing the resources, or keep the project but delete just those resources.
Either way, you should remove those resources so you won't be billed for them in the future. The following sections describe how to delete these resources.
Delete the project
The easiest way to eliminate billing is to delete the project you created for the tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Delete the components
If you don't want to delete the project, use the following sections to delete the billable components of this tutorial.
Stop the Dataflow job
- Open the Dataflow Jobs page
- In the jobs list, click anomaly-detection.
- On the job details page, click Stop.
- Select Cancel.
- Click Stop job.
Delete the Cloud Storage buckets
- Open the Cloud Storage browser
- Select the checkboxes of the <myProject>-bucket and dataflow-staging-us-central1-<projectNumber> buckets.
- Click Delete.
- In the overlay window that appears, type
DELETE
and then click Confirm.
Delete the Pub/Sub topic and subscription
- Open the Pub/Sub Subscriptions page
- Select the checkbox of the sample_data subscription.
- Click Delete.
- In the overlay window that appears, confirm you want to delete the subscription and its contents by clicking Delete.
- Click Topics.
- Select the checkbox of the sample_data topic.
- Click Delete.
- In the overlay window that appears, type
delete
and then click Delete.
Delete the BigQuery dataset and tables
- Open the BigQuery console
- In the Resources section, expand the project in which you are completing this tutorial and select the fraud_detection dataset.
- Click Delete dataset in the header of the dataset pane.
- In the overlay window that appears, type
fraud_detection
and then click Delete.
Delete the AI Platform model
- Open the AI Platform Models page
- In the models list, click fraud_detection_with_key.
- On the Model Details page, select the checkbox for the v1 (default) version.
- Click More , then click Delete.
- When the version has finished deleting, click Back to return to the models list.
- Select the checkbox for the fraud_detection_with_key model.
- Click More , then click Delete.
Delete the notebook
- Open the User-managed notebooks tab
- Select the checkbox for the boosted-trees notebook instance.
- Click Delete.
- In the overlay window that appears, click Delete.
What's next
- Review the sample code in the Anomaly Detection in Financial Transactions repo on GitHub.
- Learn about other anomaly detection solutions.
- Learn more about Dataflow.
- Learn more about AI Platform.