Erkennen von Anomalien in Finanztransaktionen mit AI Platform, Dataflow und BigQuery

;

In dieser Anleitung erfahren Sie, wie Sie eine Anwendung zur Anomalieerkennung implementieren, die betrügerische Transaktionen mithilfe eines Boosted Tree-Modells identifiziert.

Diese Anleitung richtet sich an Entwickler, Data Engineers und Data Scientists und setzt grundlegende Kenntnisse zu folgenden Themen voraus:

  • ML-Modellentwicklung mit TensorFlow und Python
  • Standard-SQL
  • Dataflow-Pipelines, die mit dem Apache Beam Java SDK erstellt wurden

Architektur

Die Beispielanwendung besteht aus folgenden Komponenten:

  • Ein Boosted Tree-Modell, das mit TensorFlow entwickelt und in AI Platform bereitgestellt wird.
  • Eine Dataflow-Pipeline, die die folgenden Aufgaben ausführt:

    • Veröffentlichen von Transaktionsdaten aus einem Cloud Storage-Bucket in ein Pub/Sub-Thema und anschließendes Auslesen dieser Daten als Stream von einem Pub/Sub-Abo zu diesem Thema.
    • Ermittelt Schätzungen der Betrugswahrscheinlichkeit für jede Transaktion durch Verwendung der Apache Beam Timer API für Micro-Batch-Aufrufe an die AI Platform Prediction API.
    • Schreibt Transaktionsdaten und Betrugswahrscheinlichkeitsdaten zur Analyse in BigQuery-Tabellen.

Das folgende Diagramm veranschaulicht die Architektur der Anomalieerkennung:

Diagramm mit der Architektur der Anomalieerkennungslösung.

Dataset

Das in dieser Anleitung verwendete Boosting-Tree-Modell wird mit dem synthetischen Finanz-Dataset für Betrugserkennung von Kaggle trainiert. Dieses Dataset wurde mit dem PaySim-Simulator generiert.

Wir verwenden ein synthetisches Dataset, da es mehrere Finanz-Datasets gibt, die für die Betrugserkennung geeignet sind, und solche, die häufig Daten enthalten, mit denen sich personenidentifizierbare Informationen erkennen lassen, die anonymisiert werden müssen.

Ziele

  • Erstellen eines Boosted Tree-Modell, das die Wahrscheinlichkeit eines Betrugs in Finanztransaktionen schätzt.
  • Bereitstellen eines Modells in AI Platform für Onlinevorhersagen.
  • Mit einer Dataflow-Pipeline können Sie:
    • Transaktionsdaten aus dem Beispiel-Dataset in eine transactions-Tabelle in BigQuery schreiben.
    • Micro-Batch-Anfragen an das gehostete Modell senden, um die Wahrscheinlichkeitsvorhersagen für Betrug abzurufen und die Ergebnisse in eine fraud_detection-Tabelle in BigQuery zu schreiben.
  • Eine BigQuery-Abfrage ausführen, die diese Tabellen zusammenführt, um die Wahrscheinlichkeit für den Betrug bei jeder Transaktion zu ermitteln.

Kosten

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

Sie können mithilfe des Preisrechners die Kosten für Ihre voraussichtliche Nutzung kalkulieren. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Cloud Console auf der Seite für die Projektauswahl ein Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and AI Platform Notebooks APIs aktivieren.

    Aktivieren Sie die APIs

Kontingentverfügbarkeit prüfen

  1. Seite "IAM-Kontingente" öffnen
  2. Prüfen Sie, ob die folgenden Compute Engine API-Kontingente in der Region "us-central1" verfügbar sind. Sie benötigen diese Kontingente, um den in dieser Anleitung verwendeten Dataflow-Job auszuführen. Fordern Sie andernfalls eine Kontingenterhöhung an.

    Bezeichnung des Limits Kontingent
    CPUs 241
    IP-Adressen in Verwendung 30
    Instanzgruppen 1
    Instanzvorlagen 1
    Verwaltete Instanzgruppen 1
    Nichtflüchtiger Standardspeicher (GB) 12900

Modell erstellen und bereitstellen

Folgen Sie der Anleitung in diesem Abschnitt, um ein Boosted Tree-Modell zu erstellen, mit dem in Finanztransaktionen Betrugsfälle vorhergesagt werden können.

Notebook erstellen

  1. Öffnen Sie die AI Platform Notebooks-Konsole
  2. Klicken Sie auf Neue Instanz.
  3. Wählen Sie TensorFlow Enterprise 1.15 ohne GPUs aus.

    Instanztyp zur Auswahl anzeigen.

  4. Geben Sie für Instanzname boosted-trees ein.

  5. Klicken Sie auf Erstellen. Das Erstellen der Notebookinstanz dauert einige Minuten.

  6. Sobald die Instanz verfügbar ist, klicken Sie auf JupyterLab öffnen.

  7. Klicken Sie im JupyterLab Launcher im Abschnitt Notebook auf Python 3.

Beispieldaten herunterladen

Laden Sie eine Kopie der Beispieldatenbank herunter:

  1. Kopieren Sie folgenden Code in die erste Zelle des Notebooks:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. Klicken Sie in der Menüleiste auf Ausführen.

Daten für das Training vorbereiten

Die Beispieldaten sind unausgeglichen, was zu einem ungenauen Modell führen kann. Der folgende Code korrigiert das Ungleichgewicht durch die Verwendung von Downsampling und teilt die Daten anschließend in ein Trainings-Dataset und ein Test-Dataset auf.

Bereiten Sie die Daten vor, indem Sie den folgenden Code in die zweite Zelle des Notebooks kopieren und dann ausführen:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

Nach Abschluss des Codes werden mehrere Beispielzeilen der verarbeiteten Daten ausgegeben. Die Ergebnisse sollten in etwa so aussehen:

Die ersten fünf Zeilen verarbeiteter Trainingsdaten.

Modell erstellen und trainieren

Erstellen und trainieren Sie das Modell, indem Sie den folgenden Code in die dritte Zelle des Notebooks kopieren und es dann ausführen:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

Nach Abschluss des Codes wird eine Reihe von Messwerten ausgegeben, die die Leistung des Modells beschreiben. Die Ergebnisse sollten in etwa so aussehen: Die Werte accuracy und auc liegen bei etwa 99 %:

Leistungsmesswerte für das Boosted Tree-Modell

Modell testen

Testen Sie das Modell, um zu überprüfen, ob die betrügerischen Transaktionen korrekt gekennzeichnet sind. Kopieren Sie dazu den folgenden Code in die vierte Zelle des Notebooks und führen Sie es dann aus:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

Nachdem der Code abgeschlossen ist, gibt er die vorhergesagte und tatsächliche Wahrscheinlichkeit für die Testdaten aus. Die Ergebnisse sollten in etwa so aussehen:

Prognostizierte und tatsächliche Betrugswahrscheinlichkeitsergebnisse für die Testdaten.

Modell exportieren

Erstellen Sie ein SavedModel aus dem trainierten Modell und exportieren Sie es in Cloud Storage. Kopieren Sie dazu den folgenden Code in die fünfte Zelle des Notebooks und führen Sie es dann aus. Ersetzen Sie myProject durch die ID des Projekts, das Sie für diese Anleitung verwenden.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

Nach Abschluss des Codes wird ein SignatureDef zurückgegeben, der die Ein- und Ausgaben des Modells beschreibt. Die Ergebnisse sollten in etwa so aussehen.

SignatureDef, die die Eingaben und Ausgaben des Modells beschreibt.

Modell in AI Platform bereitstellen

Stellen Sie das Modell für Vorhersagen bereit. Kopieren Sie dazu den folgenden Code in die sechste Zelle des Notebooks und führen Sie ihn aus. Es kann einige Minuten dauern, bis die Modellversion erstellt wird.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

Vorhersagen aus dem bereitgestellten Modell abrufen

Kopieren Sie den folgenden Code in die siebte Zelle des Notebooks und führen Sie ihn aus, um Vorhersagen für das Test-Dataset zu erhalten:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

Nach Abschluss des Codes werden Vorhersagen für die Testdaten zurückgegeben. Die Ergebnisse sollten in etwa so aussehen.

Vorhersagen aus dem bereitgestellten Modell für Testdaten.

Pipeline erstellen und ausführen

Erstellen Sie eine Dataflow-Pipeline, die Finanztransaktionsdaten liest, Informationen zur Betrugsvorhersage für jede Transaktion aus dem Modell der AI Platform anfordert und dann sowohl Transaktions- als auch Betrugsvorhersagedaten zur Analyse in BigQuery schreibt.

BigQuery-Dataset und -Tabellen erstellen

  1. Öffnen Sie die BigQuery Console:
  2. Wählen Sie im Abschnitt Ressourcen das Projekt aus, in dem Sie diese Anleitung durcharbeiten.
  3. Klicken Sie auf Dataset erstellen.

    Position der Schaltfläche

  4. Führen Sie auf der Seite Dataset erstellen die folgenden Schritte aus:

    • Geben Sie als Dataset-ID den String fraud_detection ein.
    • Wählen Sie unter Speicherort der Daten die Option USA aus.
    • Klicken Sie auf Dataset erstellen.
  5. Führen Sie im Bereich Abfrageeditor die folgenden SQL-Anweisungen aus, um die Tabellen transactions und fraud_prediction zu erstellen:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Pub/Sub-Thema und -Abo erstellen

  1. Pub/Sub-Konsole öffnen
  2. Klicken Sie auf Thema erstellen.
  3. Geben Sie als Themen-ID sample_data ein.
  4. Klicken Sie auf Thema erstellen.
  5. Klicken Sie auf Abos.
  6. Klicken Sie auf Abo erstellen.
  7. Geben Sie als Abo-ID sample_data ein.
  8. Wählen Sie unter Cloud Pub/Sub-Thema auswählen die Option projects/<myProject>/topics/sample_data aus.
  9. Scrollen Sie zum Ende der Seite und klicken Sie auf Erstellen.

Dataflow-Pipeline ausführen

  1. Cloud Shell aktivieren
  2. Führen Sie in Cloud Shell den folgenden Befehl aus, um die Dataflow-Pipeline auszuführen. Ersetzen Sie dabei myProject durch die ID des Projekts, das Sie zum Ausführen dieser Anleitung verwenden:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    Um diese Pipeline an die Produktion anzupassen, können Sie die Parameterwerte batchSize und keyRange ändern, um die Größe und den Zeitpunkt der Batches für Vorhersageanfragen zu steuern. Folgendes ist zu berücksichtigen:

    • Die Verwendung einer kleinen Batchgröße und eines hohen Schlüsselbereichswerts beschleunigt die Verarbeitung, kann jedoch auch Kontingentgrenzen überschreiten und erfordert die Anforderung zusätzlicher Kontingente.
    • Die Verwendung einer größeren Batchgröße und eines niedrigen Schlüsselbereichwerts ist langsamer, wird aber die Vorgänge eher innerhalb des Kontingents halten.
  3. Zur Seite "Dataflow-Jobs"

  4. Klicken Sie in der Jobliste auf Anomalieerkennung.

  5. Warten Sie, bis die Jobgrafik angezeigt wird und das Element StreamFraudData der Grafik eine Wiedergabezeit von mehr als 0 Sekunden zeigt.

Daten in BigQuery überprüfen

Überprüfen Sie, ob die Daten in BigQuery geschrieben wurden. Führen Sie dazu eine Abfrage aus, um die Transaktionen anzuzeigen, die als betrügerisch identifiziert wurden.

  1. Öffnen Sie die BigQuery Console:
  2. Führen Sie im Abfrageeditor die folgende Abfrage aus:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    Die Ergebnisse sollten in etwa so aussehen:

    Die ersten neun Zeilen der Ergebnisse der Betrugswahrscheinlichkeit.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt bei und löschen Sie nur diese Ressourcen.

In beiden Fällen sollten Sie diese Ressourcen entfernen, damit sie Ihnen nicht weiter in Rechnung gestellt werden. In den folgenden Abschnitten erfahren Sie, wie Sie diese Ressourcen löschen.

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten, wenn Sie das für die Anleitung erstellte Projekt löschen.

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite "Ressourcen verwalten"

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen .
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.

Komponenten löschen

Wenn Sie das Projekt nicht löschen möchten, verwenden Sie die folgenden Abschnitte, um die abrechenbaren Komponenten dieser Anleitung zu löschen.

Beenden Sie den Dataflow-Job

  1. Zur Seite "Dataflow-Jobs"
  2. Klicken Sie in der Jobliste auf Anomalieerkennung.
  3. Klicken Sie auf der Seite mit den Jobdetails auf Beenden.
  4. Wählen Sie Abbrechen aus.
  5. Klicken Sie auf Job beenden.

Cloud Storage-Buckets löschen

  1. Zur Seite "Storage-Browser"
  2. Klicken Sie die Kästchen für die Buckets <myProject>-bucket und dataflow-staging-us-central1-<projectNumber> an.
  3. Klicken Sie auf Löschen.
  4. Geben Sie im eingeblendeten Fenster DELETE ein und klicken Sie dann auf Bestätigen.

Löschen Sie das Pub/Sub-Thema und -Abo

  1. Seite "Pub/Sub-Abos" öffnen
  2. Klicken Sie auf das Kästchen des Abos sample_data.
  3. Klicken Sie auf Löschen.
  4. Bestätigen Sie im eingeblendeten Fenster, dass Sie das Abo und dessen Inhalt löschen möchten. Klicken Sie dazu auf Löschen.
  5. Klicken Sie auf Themen.
  6. Klicken Sie auf das Kästchen für das Thema sample_data.
  7. Klicken Sie auf Löschen.
  8. Geben Sie im eingeblendeten Fenster delete ein und klicken Sie dann auf Löschen.

BigQuery-Dataset und -Tabellen löschen

  1. Öffnen Sie die BigQuery Console
  2. Maximieren Sie im Abschnitt Ressourcen das Projekt, in dem Sie diese Anleitung abschließen, und wählen Sie das Dataset fraud_detection aus.
  3. Klicken Sie im Header-Bereich des Datasets auf Dataset löschen.
  4. Geben Sie im eingeblendeten Fenster fraud_detection ein und klicken Sie dann auf Löschen.

AI Platform-Modell löschen

  1. Öffnen Sie die Seite mit den AI Platform-Modellen
  2. Klicken Sie in der Liste der Modelle auf fraud_detection_with_key.
  3. Klicken Sie auf der Seite Modelldetails das Kästchen für die Version v1 (Standard) an.
  4. Klicken Sie auf Mehr und anschließend auf Löschen.
  5. Klicken Sie nach dem Löschen der Version auf Zurück, um zur Modellliste zurückzukehren.
  6. Klicken Sie auf das Kästchen für das Modell fraud_detection_with_key.
  7. Klicken Sie auf Mehr und anschließend auf Löschen.

AI Platform-Notebook löschen

  1. Zur Seite "AI Platform Notebooks"
  2. Klicken Sie auf das Kästchen für die Notebookinstanz boosted-trees.
  3. Klicken Sie auf Löschen.
  4. Klicken Sie im eingeblendeten Fenster auf Löschen.

Nächste Schritte