使用 AI Platform、Dataflow 和 BigQuery 检测金融交易中的异常

;

本教程介绍如何实现使用提升树模型识别欺诈性交易的异常检测应用。

本教程适用于开发者、数据工程师和数据科学家,并假定您具备以下基础知识:

  • 使用 TensorFlow 和 Python 开发机器学习模型
  • 标准 SQL
  • 使用 Apache Beam Java SDK 构建的 Dataflow 流水线

架构

示例应用包含以下组件:

  • 使用 TensorFlow 开发并部署到 AI Platform 的提升树模型。
  • 可完成以下任务的 Dataflow 流水线:

    • 将交易数据从 Cloud Storage 存储分区发布到 Pub/Sub 主题,然后从该主题的 Pub/Sub 订阅流式读取该数据。
    • 使用 Apache Beam Timer API 对 AI Platform Prediction API 进行微批量调用,以获取每个交易的欺诈可能性估算值。
    • 将交易数据和欺诈可能性数据写入 BigQuery 表以进行分析。

下图展示了异常检测解决方案的架构:

显示异常检测解决方案架构的图表。

数据集

本教程中使用的提升树模型基于 Kaggle 的用于欺诈检测的综合金融数据集进行训练。此数据集使用 PaySimim 模拟器生成。

我们使用综合数据集是因为适合进行欺诈检测的金融数据集很少,而那些适合的数据集通常包含需进行匿名化处理的个人身份信息 (PII)。

目标

  • 创建一个估算金融交易欺诈概率的提升树模型。
  • 将模型部署到 AI Platform 以进行在线预测。
  • 使用 Dataflow 流水线完成以下任务:
    • 将示例数据集中的交易数据写入 BigQuery 中的 transactions 表。
    • 向托管模型发送微批处理请求以检索欺诈概率预测,并将结果写入 BigQuery 中的 fraud_detection 表。
  • 运行联接这些表的 BigQuery 查询以查看每笔交易的欺诈概率。

费用

本教程使用 Google Cloud 的以下收费组件:

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

请使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 启用 AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and AI Platform Notebooks API。

    启用 API

检查配额可用性

  1. 打开“IAM 配额”页面
  2. 检查您是否在 us-central1 地区中拥有以下可用 Compute Engine API 配额;您需要这些配额才能运行本教程中使用的 Dataflow 作业。如果配额不足,请申请增加配额

    限制名称 配额
    CPU 241
    使用中的 IP 地址数 30
    实例组 1
    实例模板 1
    代管实例组 1
    永久性磁盘(标准,单位为 GB) 12900

创建和部署模型

按照本部分中的说明创建一个预测金融交易欺诈的提升树模型。

创建笔记本

  1. 打开 AI Platform Notebooks 控制台
  2. 点击新建实例
  3. 选择 TensorFlow Enterprise 1.15 without GPUs

    显示要选择的实例类型。

  4. 对于实例名称,输入 boosted-trees

  5. 点击创建。创建笔记本实例可能需要几分钟时间。

  6. 当实例可用时,点击打开 JupyterLab

  7. 在 JupyterLab 启动器的笔记本部分中,点击 Python 3

下载示例数据

下载示例数据库的副本:

  1. 将以下代码复制到笔记本的第一个单元中:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. 点击菜单栏中的运行

准备要在训练中使用的数据

示例数据是不均衡的,可能会导致模型不准确。以下代码通过使用降采样纠正不均衡,然后将数据拆分为训练集和测试集。

将以下代码复制到笔记本的第二个单元中,然后运行代码,以准备数据:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

代码完成后会输出已处理数据的几个示例行。您应该会看到如下所示的结果:

已处理的训练数据的前 5 行。

创建和训练模型

将以下代码复制到笔记本的第三个单元中,然后运行代码,从而创建和训练模型:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

代码完成后会输出一组描述模型性能的指标。您应该会看到如下所示的结果,其中 accuracyauc 值约为 99%:

提升树模型的性能指标。

测试模型

将以下代码复制到笔记本的第四个单元中,然后运行代码,从而测试模型以验证它是否正确标记了欺诈性交易:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

代码完成后会输出测试数据的预测和实际欺诈可能性。您应该会看到如下所示的结果:

测试数据的预测和实际欺诈可能性结果。

导出模型

将以下代码复制到笔记本的第五个单元中,然后运行代码,从而从经过训练的模型创建 SavedModel 并导出到 Cloud Storage。将 myProject 替换为您用于完成本教程的项目的 ID。

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

代码完成后会返回一个 SignatureDef,它描述模型的输入和输出。您应该会看到如下所示的结果:

描述模型的输入和输出的 SignatureDef。

将模型部署到 AI Platform

将以下代码复制到笔记本的第六个单元中,然后运行代码,以部署预测模型。创建模型版本需要几分钟时间。

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

从已部署的模型获取预测

将以下代码复制到笔记本的第七个单元中,然后运行代码,以获取测试数据集的预测:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

代码完成后会返回测试数据的预测。您应该会看到如下所示的结果:

已部署的模型对测试数据的预测。

创建并运行流水线

创建一个 Dataflow 流水线,该流水线会读取金融交易数据,向 AI Platform 模型请求每笔交易的欺诈预测信息,然后将交易和欺诈预测数据写入 BigQuery 以进行分析。

创建 BigQuery 数据集和表

  1. 打开 BigQuery 控制台
  2. 资源部分,选择要在其中完成本教程的项目。
  3. 点击创建数据集

    显示“创建数据集”按钮的位置。

  4. 创建数据集页面中执行以下操作:

    • 对于数据集 ID,请输入 fraud_detection
    • 选择美国 (US) 作为数据位置
    • 点击创建数据集
  5. 查询编辑器窗格中,运行以下 SQL 语句以创建 transactionsfraud_prediction 表:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

创建 Pub/Sub 主题和订阅

  1. 打开 Pub/Sub 控制台
  2. 点击创建主题
  3. 对于主题 ID,输入 sample_data
  4. 点击创建主题
  5. 点击订阅
  6. 点击创建订阅
  7. 对于订阅 ID,输入 sample_data
  8. 对于选择 Cloud Pub/Sub 主题,选择 projects/<myProject>/topics/sample_data
  9. 向下滚动到页面底部,然后点击创建

运行 Dataflow 流水线

  1. 激活 Cloud Shell
  2. 在 Cloud Shell 中,运行以下命令来运行 Dataflow 流水线,将 myProject 替换为您用于完成本教程的项目的 ID:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    要调整此流水线以使其适应生产环境,您可以更改 batchSizekeyRange 参数值,以控制预测请求批次的大小和时间。请注意以下事项:

    • 使用较小的批次大小和较高的键范围值可以加快处理速度,但也可能会超出配额上限并需要您申请更多配额。
    • 如果使用较大的批次大小和较低的键范围值,处理速度会比较慢,但更有可能将操作维持在配额范围内。
  3. 打开 Dataflow 作业页面

  4. 在作业列表中,点击 anomaly-detection

  5. 等待作业图显示,并且图的 StreamFraudData 元素显示超过 0 秒的运行时间。

验证 BigQuery 中的数据

通过运行查询查看已确认为欺诈的交易来验证数据是否已写入 BigQuery。

  1. 打开 BigQuery 控制台
  2. 查询编辑器窗格中,运行以下查询:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    您应该会看到如下所示的结果:

    欺诈概率结果的前 9 行。

清理

为避免系统因本教程中使用的资源而向您的 Google Cloud 帐号收取费用,请删除包含资源的项目,或者保留项目但仅删除这些资源。

无论采用哪种方式,您都应移除这些资源,以免日后再为这些资源付费。以下部分介绍如何删除这些资源。

删除项目

为避免支付费用,最简单的方法是删除您为本教程创建的项目。

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除组件

如果您不想删除项目,请使用以下几个部分删除本教程的可计费组件。

停止 Dataflow 作业

  1. 打开 Dataflow 作业页面
  2. 在作业列表中,点击 anomaly-detection
  3. 在作业详情页面上,点击停止
  4. 选择取消
  5. 点击停止作业

删除 Cloud Storage 存储分区

  1. 打开 Cloud Storage 浏览器
  2. 选中 <myProject>-bucketdataflow-staging-us-central1-<projectNumber> 存储分区的复选框。
  3. 点击删除
  4. 在出现的叠加窗口中,输入 DELETE,然后点击确认

删除 Pub/Sub 主题和订阅

  1. 打开 Pub/Sub 订阅页面
  2. 选中 sample_data 订阅的复选框。
  3. 点击删除
  4. 在出现的叠加窗口中,点击删除以确认要删除此订阅及其内容。
  5. 点击主题
  6. 选中 sample_data 主题的复选框。
  7. 点击删除
  8. 在出现的叠加窗口中,输入 delete,然后点击删除

删除 BigQuery 数据集和表

  1. 打开 BigQuery 控制台
  2. 资源部分中,展开在其中完成本教程的项目,然后选择 fraud_detection 数据集。
  3. 点击数据集窗格标题中的删除数据集
  4. 在出现的叠加窗口中,输入 fraud_detection,然后点击删除

删除 AI Platform 模型

  1. 打开 AI Platform 模型页面
  2. 在模型列表中,点击 fraud_detection_with_key
  3. 模型详情页面上,选中 v1(默认)版本对应的复选框。
  4. 点击更多 ,然后点击删除
  5. 版本删除完成后,点击返回 返回到模型列表。
  6. 选中 fraud_detection_with_key 模型的复选框。
  7. 点击更多 ,然后点击删除

删除 AI Platform 笔记本

  1. 打开 AI Platform Notebooks 页面
  2. 选中 boosted-trees 笔记本实例的复选框。
  3. 点击删除
  4. 在出现的叠加窗口中,点击删除

后续步骤