Détecter les anomalies dans les transactions financières à l'aide d'AI Platform, Dataflow et BigQuery

;

Ce tutoriel explique comment mettre en œuvre une application de détection d'anomalies qui identifie les transactions frauduleuses à l'aide d'un modèle d'arbre de décision à boosting.

Ce tutoriel est destiné aux développeurs, aux ingénieurs de données et aux data scientists. Il part du principe que vous possédez des connaissances de base sur les éléments suivants:

  • Développement de modèles de machine learning avec TensorFlow et Python
  • SQL standard
  • Pipelines Dataflow créés à l'aide du SDK Java Apache Beam

Architecture

L'exemple d'application comprend les composants suivants:

  • Modèle d'arbre de décision à boosting développé à l'aide de TensorFlow et déployé dans AI Platform.
  • Un pipeline Dataflow qui effectue les tâches suivantes:

    • Il publie les données de transaction d'un bucket Cloud Storage dans un sujet Pub/Sub, puis lit ces données sous forme de flux d'un abonnement Pub/Sub vers ce sujet.
    • Récupère les estimations de probabilité de fraude pour chaque transaction à l'aide de l'API Apache Beam Timer pour traiter par micro-lots les appels vers l'API Prediction d'AI Platform.
    • Écriture des données de transaction et de probabilité de fraude dans des tables BigQuery pour analyse.

Le schéma suivant illustre l'architecture de la solution de détection d'anomalies:

Schéma illustrant l'architecture de la solution de détection d'anomalies.

Dataset

Le modèle en arbre de décision à boosting utilisé dans ce tutoriel est entraîné sur l'ensemble de données financier synthétique pour la détection des fraudes de Kaggle. Cet ensemble de données a été généré à l'aide du simulateur PaySim.

Nous utilisons un ensemble de données synthétique, car il y a peu d'ensembles de données financiers adaptés à la détection des fraudes, et ceux qui existent contiennent souvent des informations personnelles devant être anonymisés.

Objectifs

  • Créer un modèle d'arbre de décision à boosting qui estime la probabilité de fraude dans les transactions financières.
  • Déployer le modèle dans AI Platform pour la prédiction en ligne
  • Utilisez un pipeline Dataflow pour effectuer les opérations suivantes :
    • Écrire les données de transaction de l'exemple d'ensemble de données dans une table transactions de BigQuery
    • Envoyer des requêtes de microtraitement par lot au modèle hébergé pour récupérer des prédictions de probabilité de fraude et écrire les résultats dans une table fraud_detection dans BigQuery.
  • Exécutez une requête BigQuery qui joint ces tables pour afficher la probabilité de fraude pour chaque transaction.

Coûts

Ce tutoriel utilise des composants facturables de Google Cloud, dont :

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Instance
  • Dataflow
  • Pub/Sub

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Cloud.

    Accéder à la page de sélection du projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and AI Platform Notebooks.

    Activer les API

Vérifier la disponibilité des quotas

  1. Ouvrir la page Quotas IAM
  2. Vérifiez que les quotas d'API Compute Engine suivants sont disponibles dans la région us-central1. vous avez besoin de ces quotas pour exécuter la tâche Dataflow utilisée dans ce tutoriel. S'ils ne sont pas disponibles, demandez une augmentation du quota.

    Nom de la limite Quota
    Processeurs 241
    Adresses IP en cours d'utilisation 30
    Groupes d'instances 1
    Modèles d'instances 1
    Groupes d'instances gérés 1
    Disque persistant standard (Go) 12900

Créer et déployer le modèle

Suivez les instructions de cette section pour créer un modèle d'arbre de décision à boosting pour prédire la fraude lors des transactions financières.

Créer un notebook

  1. Ouvrir la console AI Platform Notebooks
  2. Cliquez sur Nouvelle instance.
  3. Choisissez TensorFlow Enterprise 1.15 sans GPU.

    Affiche le type d'instance à sélectionner.

  4. Dans le champ Instance name (Nom de l'instance), saisissez boosted-trees.

  5. Cliquez sur Créer. La création de l'instance de notebook prend quelques minutes.

  6. Lorsque l'instance est disponible, cliquez sur Ouvrir JupyterLab.

  7. Dans la section Notebook du lanceur d'applications de JupyterLab, cliquez sur Python 3.

Télécharger les données de l'exemple

Téléchargez une copie de l'exemple de base de données:

  1. Copiez le code suivant dans la première cellule du notebook:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. Cliquez sur Exécuter dans la barre de menu.

Préparer les données à utiliser pour l'entraînement

Les données de l'exemple sont non équilibrées, ce qui peut entraîner un modèle inexact. Le code suivant corrige l'équilibre à l'aide d'un sous-échantillonnage, puis divise les données en un ensemble d'entraînement et un ensemble de test.

Préparez les données en copiant le code suivant dans la deuxième cellule du notebook, puis en l'exécutant:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

Une fois le code terminé, plusieurs exemples de lignes des données traitées sont générés. Un résultat semblable aux lignes suivantes doit s'afficher:

Cinq premières lignes de données d'entraînement traitées.

Créer et entraîner le modèle

Créez et entraînez le modèle en copiant le code suivant dans la troisième cellule du notebook, puis en l'exécutant:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

Une fois le code terminé, il génère un ensemble de métriques décrivant les performances du modèle. Un résultat semblable aux lignes suivantes doit s'afficher, avec les valeurs accuracy et auc autour de 99 %. :

Métriques de performances pour le modèle d'arbre de décision à boosting.

Tester le modèle

Testez le modèle pour vous assurer qu'il marque correctement les transactions frauduleuses en copiant le code suivant dans la quatrième cellule du notebook, puis en l'exécutant:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

Une fois le code terminé, il affiche la probabilité de fraude estimée et réelle pour les données de test. Un résultat semblable aux lignes suivantes doit s'afficher:

Résultat de probabilité de fraude prédite et réelle pour les données de test.

Exporter le modèle

Créez un modèle SavedModel à partir du modèle entraîné, puis exportez-le dans Cloud Storage. Pour ce faire, copiez le code suivant dans la cinquième cellule du notebook, puis exécutez-le. Remplacez myProject par l'ID du projet que vous utilisez pour suivre ce tutoriel.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

Une fois le code terminé, il renvoie un message SignatureDef qui décrit les entrées et les sorties du modèle. Un résultat semblable aux lignes suivantes doit s'afficher:

SignatureDeef qui décrit les entrées et les sorties du modèle.

Déployer le modèle sur AI Platform

Déployez le modèle pour les prédictions en copiant le code suivant dans la sixième cellule du notebook, puis en l'exécutant. La création de la version du modèle prend quelques minutes.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

Obtenir des prédictions à partir du modèle déployé

Pour obtenir des prédictions pour l'ensemble de données de test, copiez le code suivant dans la septième cellule du notebook, puis exécutez-le:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

Une fois le code terminé, il renvoie des prédictions pour les données de test. Un résultat semblable aux lignes suivantes doit s'afficher :

Prédictions à partir du modèle déployé pour les données de test.

Créer et exécuter le pipeline

Créez un pipeline Dataflow qui lit les données de transaction financière, demande des informations sur la prédiction de fraude pour chaque transaction du modèle AI Platform, puis écrit les données de transaction et de prédiction de fraude dans BigQuery pour analyse.

Créer l'ensemble de données et les tables BigQuery

  1. Ouvrez la console BigQuery
  2. Dans la section Ressources, sélectionnez le projet dans lequel vous suivez ce tutoriel.
  3. Cliquez sur Créer un ensemble de données.

    Affiche l'emplacement du bouton

  4. Sur la page Create dataset (Créer un ensemble de données), procédez comme suit :

    • Dans le champ ID de l'ensemble de données, saisissez fraud_detection.
    • Dans le champ Data location (Emplacement des données), sélectionnez United States (US) (États-Unis).
    • Cliquez sur Créer un ensemble de données.
  5. Dans le volet de l'éditeur de requête, exécutez les instructions SQL suivantes pour créer les tables transactions et fraud_prediction:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Créer le sujet et l'abonnement Pub/Sub

  1. Ouvrir la console Pub/Sub
  2. Cliquez sur Créer un sujet.
  3. Dans le champ ID du sujet, saisissez sample_data.
  4. Cliquez sur Créer un sujet.
  5. Cliquez sur Abonnements.
  6. Cliquez sur Créer un abonnement.
  7. Pour l'ID d'abonnement, saisissez sample_data.
  8. Pour le champ Select a Cloud Pub/Sub topic (Sélectionner un sujet Cloud Pub/Sub), choisissez projects/<myProject>/topics/sample_data.
  9. Faites défiler la page jusqu'en bas, puis cliquez sur Create (Créer).

Exécuter le pipeline Dataflow

  1. Activer Cloud Shell
  2. Dans Cloud Shell, exécutez la commande ci-dessous pour exécuter le pipeline Dataflow, en remplaçant myProject par l'ID du projet que vous utilisez pour suivre ce tutoriel:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    Pour adapter ce pipeline à l'environnement de production, vous pouvez modifier les valeurs des paramètres batchSize et keyRange afin de contrôler la taille et la chronologie des lots de requêtes de prédiction. Réfléchissez aux réalités suivantes :

    • L'utilisation d'une petite taille de lot et d'une valeur de plage de clés élevée accélère le traitement, mais peut également dépasser les limites de quota et vous demander de demander un quota supplémentaire.
    • L'utilisation d'une taille de lot plus importante et d'une valeur de plage de clés faible est plus lente, mais il est plus probable que les opérations ne dépassent pas le quota.
  3. Ouvrir la page Tâches Dataflow

  4. Dans la liste des tâches, cliquez sur anomaly-detection.

  5. Attendez que le graphique de la tâche s'affiche et que l'élément StreamFraudData du graphique affiche une durée d'exécution supérieure à 0 seconde.

Vérifier les données dans BigQuery

Vérifiez que les données ont été écrites dans BigQuery en exécutant une requête pour afficher les transactions identifiées comme frauduleuses.

  1. Ouvrez la console BigQuery
  2. Dans le volet de l'éditeur de requête, exécutez la requête suivante:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    Un résultat semblable aux lignes suivantes doit s'afficher:

    9 premières lignes de résultats représentant une probabilité de fraude.

Nettoyer

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, vous pouvez soit supprimer le projet contenant les ressources, soit le conserver, mais supprimer uniquement ces ressources.

Dans les deux cas, vous devez supprimer ces ressources afin d'éviter qu'elles ne vous soient facturées par la suite. Dans les sections suivantes, nous allons voir comment supprimer ces ressources.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer .
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer les composants

Si vous ne souhaitez pas supprimer le projet, reportez-vous aux sections suivantes pour supprimer les composants facturables de ce tutoriel.

Arrêtez la tâche Dataflow.

  1. Ouvrir la page Tâches Dataflow
  2. Dans la liste des tâches, cliquez sur anomaly-detection.
  3. Sur la page des détails de la tâche, cliquez sur Arrêter.
  4. Sélectionnez Annuler.
  5. Cliquez sur Stop job (Arrêter la tâche).

Supprimer les buckets Cloud Storage

  1. Ouvrir le navigateur Cloud Storage
  2. Cochez les cases des buckets <myProject>-bucket et dataflow-staging-us-central1-<projectNumber>.
  3. Cliquez sur Supprimer.
  4. Dans la fenêtre qui apparaît en superposition, saisissez DELETE, puis cliquez sur Confirm (Confirmer).

Supprimez le sujet et l'abonnement Pub/Sub :

  1. Ouvrir la page "Abonnements Pub/Sub"
  2. Cochez la case correspondant à l'abonnement sample_data.
  3. Cliquez sur Supprimer.
  4. Dans la fenêtre qui apparaît en superposition, cliquez sur Supprimer pour confirmer que vous souhaitez supprimer l'abonnement et son contenu.
  5. Cliquez sur Sujets.
  6. Cochez la case correspondant au sujet sample_data.
  7. Cliquez sur Supprimer.
  8. Dans la fenêtre qui apparaît en superposition, saisissez delete, puis cliquez sur Supprimer.

Supprimer l'ensemble de données et les tables BigQuery

  1. Ouvrez la console BigQuery
  2. Dans la section Ressources, développez le projet dans lequel vous avez terminé ce tutoriel, puis sélectionnez l'ensemble de données fraude_detection.
  3. Cliquez sur Supprimer l'ensemble de données dans l'en-tête du volet de l'ensemble de données.
  4. Dans la fenêtre qui apparaît en superposition, saisissez fraud_detection, puis cliquez sur Supprimer.

Supprimer le modèle AI Platform

  1. Ouvrez la page des modèles AI Platform.
  2. Dans la liste des modèles, cliquez sur fraude_detection_with_key.
  3. Sur la page Informations sur le modèle, cochez la case correspondant à la version v1 (par défaut).
  4. Cliquez sur Plus, puis sur Supprimer.
  5. Lorsque la suppression de la version est terminée, cliquez sur Retour pour revenir à la liste des modèles.
  6. Cochez la case pour le modèle fraude_detection_with_key.
  7. Cliquez sur Plus, puis sur Supprimer.

Supprimer le notebook AI Platform

  1. Ouvrir la page AI Platform Notebooks
  2. Cochez la case correspondant à l'instance de notebook boosted-trees.
  3. Cliquez sur Supprimer.
  4. Dans la fenêtre qui apparaît en superposition, cliquez sur Supprimer.

Étape suivante