AI Platform, Dataflow, BigQuery를 사용하여 금융 거래의 이상 감지

이 가이드에서는 부스트 트리 모델을 사용하여 사기성 거래를 식별하는 이상 감지 애플리케이션을 구현하는 방법을 보여줍니다.

이 가이드는 개발자, 데이터 엔지니어, 데이터 과학자를 대상으로 하며 다음과 같은 기본 지식이 있다고 가정합니다.

  • TensorFlow 및 Python을 사용한 머신러닝 모델 개발
  • 표준 SQL
  • Apache Beam 자바 SDK를 사용하여 빌드된 Dataflow 파이프라인

아키텍처

샘플 애플리케이션은 다음 구성요소로 구성됩니다.

  • TensorFlow를 사용하여 개발된 후 AI Platform에 배포되는 부스트 트리 모델
  • 다음 태스크를 완료하는 Dataflow 파이프라인

    • Cloud Storage 버킷의 Pub/Sub 주제로 트랜잭션 데이터를 게시한 후 이 데이터를 Pub/Sub 구독에서 해당 주제로 보내는 스트림으로 읽습니다.
    • Apache Beam Timer API를 사용하여 AI Platform Prediction API에 대한 마이크로 배치 호출을 통해 각 트랜잭션의 사기 가능성을 예측합니다.
    • 분석을 위해 트랜잭션 데이터와 사기 가능성 데이터를 BigQuery 테이블에 씁니다.

다음 다이어그램은 이상 감지 솔루션의 아키텍처를 보여줍니다.

이상 감지 솔루션의 아키텍처를 보여주는 다이어그램

데이터 세트

이 가이드에서 사용된 부스트 트리 모델은 Kaggle의 Synthetic Financial Dataset for Fraud Detection을 통해 학습됩니다. 이 데이터 세트는 PaySim 시뮬레이터를 사용하여 생성되었습니다.

사기 감지에 적합한 금융 데이터 세트가 거의 없으며 이러한 데이터 세트에는 익명 처리가 필요한 개인 식별 정보(PII)가 포함되는 경우가 많으므로 합성 데이터 세트를 사용합니다.

목표

  • 금융 거래의 사기 가능성을 예측하는 부스트 트리 모델을 만듭니다.
  • 온라인 예측을 위해 모델을 AI Platform에 배포합니다.
  • Dataflow 파이프라인을 사용하여 다음을 수행합니다.
    • 샘플 데이터 세트의 트랜잭션 데이터를 BigQuery의 transactions 테이블에 씁니다.
    • 호스팅된 모델에 마이크로 배치 요청을 전송하여 사기 가능성 예측을 검색하고 결과를 BigQuery의 fraud_detection 테이블에 씁니다.
  • 이러한 테이블을 조인하는 BigQuery 쿼리를 실행하여 각 트랜잭션의 사기 가능성을 확인합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

가격 계산기를 사용하여 예상 사용량을 기준으로 예상 비용을 산출합니다. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Cloud Console의 프로젝트 선택기 페이지에서 Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and AI Platform Notebooks API를 사용 설정합니다.

    API 사용 설정

할당량 사용 가능 여부 확인

  1. IAM 할당량 페이지 열기
  2. us-central1 리전에서 사용할 수 있는 Compute Engine API 할당량이 있는지 확인합니다. 이 가이드에서 사용된 Dataflow 작업을 실행하려면 이 할당량이 필요합니다. 그렇지 않은 경우 할당량 상향을 요청하세요.

    한도 이름 할당량
    CPU 241
    사용 중인 IP 주소 30
    인스턴스 그룹 1
    인스턴스 템플릿 1
    관리형 인스턴스 그룹 1
    Persistent Disk Standard(GB) 12900

모델 만들기 및 배포

이 섹션의 안내에 따라 금융 거래 사기를 예측하는 부스트 트리 모델을 만듭니다.

메모장 만들기

  1. AI Platform Notebooks 콘솔 열기
  2. 새 인스턴스를 클릭합니다.
  3. GPU를 사용하지 않는 TensorFlow Enterprise 1.15를 선택합니다.

    선택할 인스턴스 유형을 표시합니다.

  4. 인스턴스 이름boosted-trees를 입력합니다.

  5. 만들기를 클릭합니다. 메모장 인스턴스가 생성되는 데 몇 분 정도 걸립니다.

  6. 인스턴스를 사용할 수 있으면 JupyterLab 열기를 클릭합니다.

  7. JupyterLab 런처의 메모장 섹션에서 Python 3을 클릭합니다.

샘플 데이터 다운로드

샘플 데이터베이스의 사본을 다운로드합니다.

  1. 다음 코드를 메모장의 첫 번째 셀에 복사합니다.

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. 메뉴 바에서 실행을 클릭합니다.

학습에 사용할 데이터 준비

샘플 데이터는 균형이 맞지 않는 경우 이로 인해 부정확한 모델이 발생할 수 있습니다. 다음 코드는 다운샘플링을 사용하여 불균형을 수정한 다음 데이터를 학습 세트와 테스트 세트로 나눕니다.

다음 코드를 메모장의 두 번째 셀에 복사한 후 실행하여 데이터를 준비합니다.

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

코드가 완료되면 처리된 데이터의 몇 가지 예시 행이 출력됩니다. 다음과 비슷한 결과가 표시됩니다.

처리된 학습 데이터의 처음 5개 행

모델 생성 및 학습

다음 코드를 메모장의 세 번째 셀에 복사한 후 실행하여 모델을 만들고 학습시킵니다.

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

코드가 완료되면 모델의 성능을 설명하는 측정항목 세트가 출력됩니다. 99% 가량의 accuracyauc 값을 포함하여 다음과 비슷한 결과가 표시됩니다. .

부스트 트리 모델의 성능 측정항목

모델 테스트

다음 코드를 메모장의 네 번째 셀에 복사한 후 실행하여 모델이 사기 거래의 라벨을 올바르게 지정하는지 테스트를 통해 확인합니다.

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

코드가 완료되면 테스트 데이터에 대한 예상 및 실제 사기 가능성을 출력합니다. 다음과 비슷한 결과가 표시됩니다.

테스트 데이터에 대한 예상 및 실제 사기 가능성 결과

모델 내보내기

다음 코드를 메모장의 다섯 번째 셀로 복사한 후 실행하여 학습된 모델에서 저장된 모델을 만들고 Cloud Storage로 내보냅니다. myProject를 이 가이드를 완료하는 데 사용할 프로젝트의 ID로 바꿉니다.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

코드가 완료되면 모델의 입력과 출력을 설명하는 SignatureDef를 반환합니다. 다음과 비슷한 결과가 표시됩니다.

SignatureDef는 모델의 입력과 출력을 설명합니다.

AI Platform에 모델 배포

다음 코드를 메모장의 여섯 번째 셀에 복사한 후 실행하여 예측을 위한 모델을 배포합니다. 모델 버전을 만드는 데 몇 분 정도 걸립니다.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

배포된 모델에서 예측 가져오기

다음 코드를 메모장의 일곱 번째 셀에 복사한 후 실행하여 테스트 데이터 세트의 예측을 가져옵니다.

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

코드가 완료되면 테스트 데이터에 대한 예측을 반환합니다. 다음과 비슷한 결과가 표시됩니다.

배포된 모델에서 도출된 테스트 데이터 예측

파이프라인 생성 및 실행

AI Platform 모델에서 금융 거래 데이터를 읽고, AI Platform 모델에서 각 트랜잭션에 대한 사기 예측 정보를 요청한 다음, 트랜잭션과 사기 예측 데이터를 모두 BigQuery에 기록하여 분석할 수 있도록 Dataflow 파이프라인을 만듭니다.

BigQuery 데이터 세트 및 테이블 만들기

  1. BigQuery 콘솔 열기
  2. 리소스 섹션에서 이 가이드를 완료하려는 프로젝트를 선택합니다.
  3. 데이터 세트 만들기를 클릭합니다.

    데이터 세트 만들기 버튼의 위치를 표시합니다.

  4. 데이터 세트 만들기 페이지에서 다음을 수행합니다.

    • 데이터 세트 IDfraud_detection를 입력합니다.
    • 데이터 위치미국(US)을 선택합니다.
    • 데이터 세트 만들기를 클릭합니다.
  5. 쿼리 편집기 창에서 다음 SQL 문을 실행하여 transactionsfraud_prediction 테이블을 만듭니다.

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Pub/Sub 주제 및 구독 만들기

  1. Pub/Sub 콘솔 열기
  2. 주제 만들기를 클릭합니다.
  3. 주제 IDsample_data을 입력합니다.
  4. 주제 만들기를 클릭합니다.
  5. 구독을 클릭합니다.
  6. 구독 만들기를 클릭합니다.
  7. 구독 IDsample_data를 입력합니다.
  8. Cloud Pub/Sub 주제 선택에서 projects/<myProject>/topics/sample_data를 선택합니다.
  9. 페이지 하단으로 스크롤하여 만들기를 클릭합니다.

Dataflow 파이프라인 실행

  1. Cloud Shell 활성화
  2. Cloud Shell에서 아래 명령어를 실행하여 Dataflow 파이프라인을 실행하고 myProject를 이 가이드를 완료하는 데 사용할 프로젝트의 ID로 바꿉니다.

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    이 파이프라인을 프로덕션에 조정하려면 batchSizekeyRange 매개변수 값을 변경하여 예측 요청 배치의 크기와 타이밍을 제어할 수 있습니다. 다음과 같은 경우를 생각해보세요.

    • 작은 배치 크기와 높은 키 범위 값을 사용하면 처리 속도가 빨라지지만 할당량 제한이 초과될 수 있으며 추가 할당량을 요청해야 할 수 있습니다.
    • 더 큰 배치 크기와 더 작은 키 범위 값을 사용하면 속도가 느려지지만 작업이 할당량을 초과하지 않을 가능성이 큽니다.
  3. Dataflow 작업 페이지 열기

  4. 작업 목록에서 이상 감지를 클릭합니다.

  5. 작업 그래프가 나타날 때까지 기다리면 그래프의 StreamFraudData 요소가 0초 이상의 실행 시간을 표시합니다.

BigQuery에서 데이터 확인

사기로 식별된 트랜잭션이 있는지 보기 위해 쿼리를 실행하여 BigQuery에 데이터가 작성되었는지 확인합니다.

  1. BigQuery 콘솔 열기
  2. 쿼리 편집기 창에서 다음 쿼리를 실행합니다.

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    다음과 비슷한 결과가 표시됩니다.

    사기 가능성이 있는 처음 9개 행의 결과

삭제

이 가이드에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트만 유지하고 해당 리소스를 삭제합니다.

어느 쪽이든 향후 리소스 비용이 청구되지 않도록 이러한 리소스를 삭제해야 합니다. 다음 섹션에서는 이러한 리소스를 삭제하는 방법을 설명합니다.

프로젝트 삭제

청구되지 않도록 하는 가장 쉬운 방법은 가이드에서 만든 프로젝트를 삭제하는 것입니다.

  1. Cloud Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리 페이지로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제 를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

구성요소 삭제

프로젝트를 삭제하지 않으려면 다음 섹션을 사용하여 이 가이드의 청구 가능 구성요소를 삭제합니다.

Dataflow 작업 중지

  1. Dataflow 작업 페이지 열기
  2. 작업 목록에서 이상 감지를 클릭합니다.
  3. 작업 세부정보 페이지에서 중지를 클릭합니다.
  4. 취소를 선택합니다.
  5. 작업 중지를 클릭합니다.

Cloud Storage 버킷 삭제

  1. Cloud Storage 브라우저 열기
  2. <myProject>-bucketdataflow-staging-us-central1-<projectNumber> 버킷의 체크박스를 선택합니다.
  3. 삭제를 클릭합니다.
  4. 오버레이 창이 나타나면 DELETE를 입력한 다음 확인을 클릭합니다.

Pub/Sub 주제 및 구독 삭제

  1. Pub/Sub 구독 페이지 열기
  2. sample_data 구독의 체크박스를 선택합니다.
  3. 삭제를 클릭합니다.
  4. 오버레이 창이 나타나면 삭제를 클릭하여 구독과 그 콘텐츠를 삭제할 것인지 확인합니다.
  5. 주제를 클릭합니다.
  6. sample_data 주제의 체크박스를 선택합니다.
  7. 삭제를 클릭합니다.
  8. 오버레이 창이 나타나면 delete를 입력한 다음 삭제를 클릭합니다.

BigQuery 데이터 세트 및 테이블 삭제

  1. BigQuery 콘솔 열기
  2. 리소스 섹션에서 이 가이드를 완료할 프로젝트를 확장하고 fraud_detection 데이터 세트를 선택합니다.
  3. 데이터 세트 창의 헤더에서 데이터 세트 삭제를 클릭합니다.
  4. 오버레이 창이 나타나면 fraud_detection를 입력한 다음 삭제를 클릭합니다.

AI Platform 모델 삭제

  1. AI Platform Models 페이지 열기
  2. 모델 목록에서 fraud_detection_with_key를 클릭합니다.
  3. 모델 세부정보 페이지에서 v1(기본값) 버전의 체크박스를 선택합니다.
  4. 더보기를 클릭한 다음 삭제를 클릭합니다.
  5. 버전 삭제가 완료되면 뒤로를 클릭하여 모델 목록으로 돌아갑니다.
  6. fraud_detection_with_key 모델의 체크박스를 선택합니다.
  7. 더보기를 클릭한 다음 삭제를 클릭합니다.

AI Platform 메모장 삭제

  1. AI Platform Notebooks 페이지 열기
  2. boost-trees 메모장 인스턴스의 체크박스를 선택합니다.
  3. 삭제를 클릭합니다.
  4. 오버레이 창이 나타나면 삭제를 클릭합니다.

다음 단계