AI Platform、Dataflow、BigQuery を使用して金融トランザクションの異常を検出する

;

このチュートリアルでは、ブーストツリー モデルを使用して不正なトランザクションを識別する異常検出アプリケーションを実装する方法を示します。

このチュートリアルは、デベロッパー、データ エンジニア、データ サイエンティストを対象としており、以下に関する基本的な知識があることを前提としています。

  • TensorFlow と Python を使用した機械学習モデルの開発
  • 標準 SQL
  • Apache Beam Java SDK を使用して構築された Dataflow パイプライン

アーキテクチャ

サンプル アプリケーションは、次のコンポーネントで構成されます。

  • TensorFlow を使用して開発され、AI Platform にデプロイされるブーストツリー モデル。
  • 以下のタスクを実行する Dataflow パイプライン。

    • Cloud Storage バケットから Pub/Sub トピックにトランザクション データをパブリッシュし、そのデータを Pub/Sub サブスクリプションからそのトピックへのストリームとして読み取ります。
    • Apache Beam Timer API を使用して AI Platform Prediction API にマイクロバッチを呼び出し、各トランザクションの不正行為の可能性の推定値を取得します。
    • 分析のために、トランザクション データと不正行為の可能性に関するデータを BigQuery テーブルに書き込みます。

次の図は、異常検出ソリューションのアーキテクチャを示しています。

異常検出ソリューションのアーキテクチャを示す図。

データセット

このチュートリアルで使用されるブーストツリー モデルは、Kaggle の Synthetic Financial Dataset For Fraud Detection でトレーニングされています。このデータセットは、PaySim シミュレータを使用して生成されました。

Google では合成データセットを使用しています。その理由は、不正行為の検出に適した金融データセットが少なく、あったとしても匿名化する必要がある個人情報(PII)が含まれていることが多いためです。

目標

  • 金融トランザクションにおける不正行為の可能性を推定するブーストツリー モデルを作成する。
  • オンライン予測のモデルを AI Platform にデプロイする。
  • Dataflow パイプラインを使用して以下のことを行う。
    • サンプル データセットからトランザクション データを BigQuery の transactions テーブルに書き込む。
    • マイクロバッチ リクエストをホストされているモデルに送信して、不正行為の可能性を予測し、結果を BigQuery の fraud_detection テーブルに書き込む。
  • これらのテーブルを結合する BigQuery クエリを実行して、各トランザクションの不正行為の可能性を確認する。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  7. AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks API を有効にします。

    API を有効にする

割り当てが使用可能かどうかの確認

  1. IAM の割り当てページを開きます
  2. us-central1 リージョンで利用可能な次の Compute Engine API の割り当てがあることを確認します。このチュートリアルで使用する Dataflow ジョブを実行するためには、これらの割り当てが必要です。ない場合は、割り当ての増加をリクエストします。

    上限名 割り当て
    CPU 241
    使用中の IP アドレス 30
    インスタンス グループ 1
    インスタンス テンプレート 1
    マネージド インスタンス グループ 1
    Persistent disk standard(GB) 12900

モデルを作成してデプロイする

このセクションの手順に沿って、金融トランザクションにおける不正行為を予測するブーストツリー モデルを作成します。

ノートブックを作成する

  1. Notebooks コンソールを開く
  2. [新しいインスタンス] をクリックします。
  3. [TensorFlow Enterprise 1.15] の [Without GPUs] を選択します。

    選択するインスタンス タイプを表示します。

  4. [インスタンス名] に「boosted-trees」と入力します。

  5. [作成] をクリックします。ノートブック インスタンスが作成されるまで数分かかります。

  6. インスタンスが利用可能になったら、[JupyterLab を開く] をクリックします。

  7. JupyterLab Launcher の [Notebook] セクションで [Python 3] をクリックします。

サンプルデータをダウンロードする

サンプル データベースのコピーをダウンロードします。

  1. ノートブックの最初のセルに次のコードをコピーします。

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. メニューバーの実行アイコン をクリックします。

トレーニングで使用するデータを準備する

サンプルデータは、不均衡であるため、モデルが不正確になる可能性があります。次のコードは、ダウンサンプリングを使用して不均衡の修正を行い、データをトレーニング セットとテストセットに分割します。

ノートブックの 2 番目のセルに以下のコードをコピーして実行し、データを準備します。

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

コードが完了すると、処理されたデータのサンプル行がいくつか出力されます。次のような結果が表示されます。

処理されたトレーニング データの最初の 5 行。

モデルの作成とトレーニングを行う

ノートブックの 3 番目のセルに次のコードをコピーして実行し、モデルを作成してトレーニングします。

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

コードが完了すると、モデルのパフォーマンスを説明する一連の指標が出力されます。次のような結果が表示されます。accuracyauc の値は約 99% です。

ブーストツリー モデルのパフォーマンス指標。

モデルをテストする

モデルをテストします。次のコードをノートブックの 4 番目のセルにコピーして実行することで、不正なトランザクションが正しくラベル付けされていることを確認します。

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

コードが完了すると、テストデータの不正可能性の予測値と実際値が出力されます。次のような結果が表示されます。

テストデータの不正可能性の予測値と実際値

モデルをエクスポートする

トレーニングされたモデルから SavedModel を作成し、次のコードをノートブックの 5 番目のセルにコピーしてから実行して、Cloud Storage にエクスポートします。myProject は、このチュートリアルで使用するプロジェクトの ID に置き換えます。

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

コードが完了すると、モデルの入力と出力を説明する SignatureDef が返されます。次のような結果が表示されます。

モデルの入力と出力を記述する SignatureDef。

モデルを AI Platform にデプロイする

ノートブックの 6 番目のセルに以下のコードをコピーして実行し、予測用のモデルをデプロイします。モデル バージョンの作成には数分かかります。

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

デプロイされたモデルから予測を取得する

ノートブックの 7 番目のセルに次のコードをコピーして実行し、テスト データセットの予測を取得します。

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

コードが完了すると、テストデータの予測が返されます。次のような結果が表示されます。

テストデータ用にデプロイされたモデルからの予測。

パイプラインを作成して実行する

財務トランザクション データを読み取り、AI Platform モデルから各トランザクションの不正行為予測情報をリクエストし、分析用にトランザクションと不正行為の予測データの両方を BigQuery に書き込む Dataflow パイプラインを作成します。

BigQuery データセットとテーブルを作成する

  1. BigQuery コンソールを開きます
  2. [リソース] セクションで、このチュートリアルを実施するプロジェクトを選択します。
  3. [データセットを作成] をクリックします。

    [データセットを作成] ボタンの場所を表示。

  4. [データセットを作成] ページで次の操作を行います。

    • [データセット ID] に「fraud_detection」と入力します。
    • [データのロケーション] で [米国(US)] を選択します。
    • [データセットを作成] をクリックします。
  5. [クエリエディタ] ペインで、次の SQL ステートメントを実行して、transactions テーブルと fraud_prediction テーブルを作成します。

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Pub/Sub トピックとサブスクリプションを作成する

  1. Pub/Sub コンソールを開きます
  2. [トピックを作成] をクリックします。
  3. [トピック ID] に「sample_data」と入力します。
  4. [トピックを作成] をクリックします。
  5. [サブスクリプション] をクリックします。
  6. [サブスクリプションを作成] をクリックします。
  7. [サブスクリプション ID] に「sample_data」と入力します。
  8. [Cloud Pub/Sub トピックを選択してください] で、projects/<myProject>/topics/sample_data を選択します。
  9. ページの一番下までスクロールして [CREATE] をクリックします。

Dataflow パイプラインを実行する

  1. Cloud Shell をアクティブにする
  2. Cloud Shell で次のコマンドを実行して Dataflow パイプラインを実行します。myProject は、このチュートリアルで使用するプロジェクトの ID に置き換えます。

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    このパイプラインを本番環境に適応させるには、batchSize および keyRange パラメータ値を変更して、予測リクエスト バッチのサイズとタイミングを制御します。次の点を考慮してください。

    • 小さいバッチサイズと高いキー範囲値を使用すると処理が速くなりますが、割り当て上限を超過する場合があり、追加の割り当てをリクエストする必要があります。
    • 大きいバッチサイズと低いキー範囲値を使用すると、速度は低下しますが、オペレーションが割り当てに収まる可能性は高くなります。
  3. Dataflow ジョブページを開きます

  4. ジョブリストで、[anomaly-detection] をクリックします。

  5. ジョブグラフが表示され、グラフの StreamFraudData 要素に 0 秒を超える実行時間が表示されるまで待ちます。

BigQuery でデータを確認する

クエリを実行して、不正と確認されたトランザクションを確認することで、データが BigQuery に書き込まれていることを確認します。

  1. BigQuery コンソールを開きます
  2. [クエリエディタ] ペインで、次のクエリを実行します。

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    次のような結果が表示されます。

    不正行為の可能性を示す結果の最初の 9 行。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトは削除せず、これらのリソースのみを削除します。

いずれにしても、今後これらのリソースについて料金が発生しないように、削除する必要があります。以降のセクションでは、このようなリソースを削除する方法を説明します。

プロジェクトの削除

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

コンポーネントの削除

プロジェクトを削除しない場合は、以下のセクションに従って、このチュートリアルの課金対象コンポーネントを削除してください。

Dataflow ジョブを停止する

  1. Dataflow ジョブページを開きます
  2. ジョブリストで、[anomaly-detection] をクリックします。
  3. ジョブの詳細ページで、[停止] をクリックします。
  4. [キャンセル] を選択します。
  5. [ジョブの停止] をクリックします。

Cloud Storage バケットを削除する

  1. Cloud Storage ブラウザを開く
  2. <myProject>-bucket バケットと dataflow-staging-us-central1-<projectNumber> バケットのチェックボックスをオンにします。
  3. [削除] をクリックします。
  4. 上に重なったウィンドウで、「DELETE」と入力して [確認] をクリックします。

Pub/Sub トピックとサブスクリプションを削除する

  1. Pub/Sub サブスクリプション ページを開きます
  2. sample_data サブスクリプションのチェックボックスをオンにします。
  3. [削除] をクリックします。
  4. 上に重なったウィンドウで、[削除] をクリックして、サブスクリプションとそのコンテンツを削除することを確認します。
  5. [トピック] をクリックします。
  6. sample_data トピックのチェックボックスをオンにします。
  7. [削除] をクリックします。
  8. 上に重なったウィンドウで、「delete」と入力して [削除] をクリックします。

BigQuery のデータセットとテーブルを削除する

  1. BigQuery コンソールを開きます
  2. [リソース] セクションで、このチュートリアルを実施しているプロジェクトを展開し、fraud_detection データセットを選択します。
  3. データセット ペインのヘッダーにある [データセットの削除] をクリックします。
  4. 上に重なったウィンドウで、「fraud_detection」と入力して [削除] をクリックします。

AI Platform モデルを削除する

  1. AI Platform の [モデル] ページを開きます
  2. モデルリストで fraud_detection_with_key をクリックします。
  3. [モデルの詳細] ページで、v1(デフォルト)バージョンのチェックボックスを選択します。
  4. [その他] をクリックして、[削除] をクリックします。
  5. バージョンの削除が完了したら、[戻る] をクリックしてモデルの一覧に戻ります。
  6. fraud_detection_with_key モデルのチェックボックスをオンにします。
  7. [その他] をクリックして、[削除] をクリックします。

AI Platform ノートブックを削除する

  1. AI Platform Notebooks のページを開きます
  2. boosted-trees ノートブック インスタンスのチェックボックスをオンにします。
  3. [削除] をクリックします。
  4. 上に重なったウィンドウで [削除] をクリックします。

次のステップ