Detecta anomalías en transacciones financieras con AI Platform, Dataflow y BigQuery

;

En este instructivo, se muestra cómo implementar una aplicación de detección de anomalías que identifica transacciones fraudulentas mediante un modelo de árbol potenciado.

Este instructivo está dirigido a desarrolladores, ingenieros de datos y científicos de datos, y supone que tienes conocimientos básicos sobre lo siguiente:

  • Desarrollo de modelos de aprendizaje automático con TensorFlow y Python
  • SQL estándar
  • Canalizaciones de Dataflow compiladas mediante el SDK de Java de Apache Beam

Arquitectura

La aplicación de muestra consta de los siguientes componentes:

  • Un modelo de árbol potenciado que se desarrolló mediante TensorFlow y se implementó en AI Platform
  • Una canalización de Dataflow que completa las siguientes tareas:

    • Publica datos de transacciones de un bucket de Cloud Storage a un tema de Pub/Sub y, luego, lee esos datos como una transmisión de una suscripción de Pub/Sub a ese tema.
    • Obtiene estimaciones de probabilidad de fraude de cada transacción mediante la API de cronómetro de Apache Beam para llamadas por microlotes a la API de AI Platform Prediction.
    • Escribe datos de transacciones y datos de probabilidad de fraude en tablas de BigQuery para su análisis.

En el siguiente diagrama, se ilustra la arquitectura de la solución de detección de anomalías:

Diagrama que muestra la arquitectura de la solución de detección de anomalías.

Conjunto de datos

El modelo de árbol potenciado que se usa en este instructivo se entrenó en el Synthetic Financial Dataset For Fraud Detection (conjunto de datos financieros sintéticos para la detección de fraudes) de Kaggle. Este conjunto de datos se generó con el simulador PaySim.

Usamos un conjunto de datos sintéticos porque existen pocos conjuntos de datos financieros apropiados para la detección de fraudes, y estos suelen contener información de identificación personal (PII) que debe ser anónima.

Objetivos

  • Crear un modelo de árbol potenciado que calcule la probabilidad de fraude en transacciones financieras
  • Implementar el modelo en AI Platform para la predicción en línea
  • Usar una canalización de Dataflow para realizar las siguientes acciones:
    • Escribir datos de transacciones del conjunto de datos de muestra en una tabla transactions en BigQuery
    • Enviar solicitudes por microlotes al modelo alojado para recuperar predicciones de probabilidad de fraude y escribir los resultados en una tabla fraud_detection en BigQuery
  • Ejecutar una consulta de BigQuery que una estas tablas para ver la probabilidad de fraude por cada transacción

Costos

En este instructivo, se usan componentes facturables de Google Cloud, que incluyen los siguientes:

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Google Cloud pueden ser elegibles para obtener una prueba gratuita.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks.

    Habilita las API

Comprueba la disponibilidad de la cuota

  1. Abrir la página Cuotas de IAM
  2. Verifica que las siguientes cuotas de la API de Compute Engine estén disponibles en la región us-central1. Necesitas estas cuotas para ejecutar el trabajo de Dataflow que se usa en este instructivo. De lo contrario, solicita un aumento de la cuota.

    Nombre del límite Cuota
    CPU 241
    Direcciones IP en uso 30
    Grupos de instancias 1
    Plantillas de instancias 1
    Grupos de instancias administrados 1
    Persistent Disk estándar (GB): 12900

Crea e implementa el modelo

Sigue las instrucciones de esta sección para crear un modelo de árbol potenciado a fin de predecir posibles fraudes en las transacciones financieras.

Crea un notebook

  1. Abre la consola de Notebooks
  2. Haz clic en Instancia nueva (New instance).
  3. Elige TensorFlow Enterprise 1.15 without GPUs.

    Se muestra el tipo de instancia que se puede seleccionar.

  4. En Nombre de la instancia, escribe boosted-trees.

  5. Haz clic en Crear. La instancia de notebook toma unos minutos en crearse.

  6. Cuando la instancia esté disponible, haz clic en Abrir JupyterLab.

  7. En la sección Notebook del iniciador de JupyterLab, haz clic en Python 3.

Descarga los datos de muestra

Descarga una copia de la base de datos de muestra:

  1. Copia el siguiente código en la primera celda del notebook:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. Haz clic en Ejecutar  en la barra de menú.

Prepara los datos para usarlos en el entrenamiento

Los datos de muestra están desequilibrados, lo que puede generar un modelo inexacto. Con el siguiente código, se corrige el desequilibrio mediante el uso de la reducción de muestreo, luego se dividen los datos en un conjunto de entrenamiento y un conjunto de prueba.

Para preparar los datos, copia el siguiente código en la segunda celda del notebook y, luego, ejecútalo:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

Una vez que se completa el código, se generan varias filas de ejemplo de los datos procesados. Deberías ver resultados similares a los siguientes:

Primeras 5 filas de los datos de entrenamiento procesados

Crea y entrena el modelo

Crea y entrena el modelo. Para ello, copia el siguiente código en la tercera celda del notebook y, luego, ejecútalo:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

Una vez que se completa el código, se genera un conjunto de métricas que describen el rendimiento del modelo. Deberías ver resultados similares a los siguientes, con los valores accuracy y auc cerca del 99%:

Métricas de rendimiento para el modelo de árbol potenciado

Prueba el modelo

Prueba el modelo a fin de verificar que etiqueta las transacciones fraudulentas de manera correcta. Para ello, copia el siguiente código en la cuarta celda del notebook y, luego, ejecútalo:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

Una vez que se completa el código, se muestra la probabilidad prevista y la real de fraudes para los datos de prueba. Deberías ver resultados similares a los siguientes:

Resultados de probabilidad de fraude prevista y real para los datos de prueba

Exporta el modelo

Crea un modelo guardado a partir del modelo entrenado y expórtalo a Cloud Storage. Para ello, copia el siguiente código en la quinta celda del notebook y, luego, ejecútalo. Reemplaza myProject por el ID del proyecto que usarás para completar este instructivo.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

Una vez que se completa el código, se muestra una SignatureDef que describe las entradas y salidas del modelo. Deberías ver resultados similares a los siguientes:

SignatureDef que describe las entradas y salidas del modelo

Implementa el modelo en AI Platform

Implementa el modelo para las predicciones. Para ello, copia el siguiente código en la sexta celda del notebook y, luego, ejecútalo. La versión del modelo toma unos minutos en crearse.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

Obtén predicciones a partir del modelo implementado

A fin de obtener las predicciones para el conjunto de datos de prueba, copia el siguiente código en la séptima celda del notebook y, luego, ejecútalo:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

Una vez que se completa el código, se muestran predicciones para los datos de prueba. Deberías ver resultados similares a los siguientes:

Predicciones a partir del modelo implementado para datos de prueba

Crea y ejecuta la canalización

Crea una canalización de Dataflow que lea los datos de las transacciones financieras, solicita información sobre la predicción de fraudes de cada transacción desde el modelo de AI Platform y, luego, escribe datos de predicciones de transacciones y fraudes en BigQuery para su análisis.

Crea las tablas y el conjunto de datos de BigQuery

  1. Abre la consola de BigQuery
  2. En la sección Recursos (Resources), selecciona el proyecto con el que deseas completar este instructivo.
  3. Haz clic en Crear conjunto de datos.

    Muestra la ubicación del botón Crear conjunto de datos

  4. En la página Crear conjunto de datos:

    • En ID de conjunto de datos, ingresa fraud_detection.
    • En Ubicación de los datos, elige Estados Unidos (EE.UU.).
    • Haz clic en Crear conjunto de datos (Create dataset).
  5. En el panel Editor de consultas, ejecuta las siguientes instrucciones de SQL para crear las tablas transactions y fraud_prediction:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Crea el tema y la suscripción de Pub/Sub

  1. Abre la consola de Pub/Sub.
  2. Haz clic en Crear tema.
  3. En ID del tema, escribe sample_data.
  4. Haz clic en Crear tema.
  5. Haz clic en Suscripciones.
  6. Haz clic en Crear suscripción.
  7. En ID de la suscripción, escribe sample_data.
  8. En Selecciona un tema de Cloud Pub/Sub, elige projects/<myProject>/topics/sample_data.
  9. Desplázate hasta el final de la página y haz clic en Crear.

Ejecuta la canalización de Dataflow

  1. Activa Cloud Shell.
  2. En Cloud Shell, ingresa el siguiente comando a fin de ejecutar la canalización de Dataflow y reemplaza myProject por el ID del proyecto que usas para completar este instructivo:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    Para adaptar esta canalización a la producción, puedes modificar los valores de los parámetros batchSize y keyRange a fin de controlar el tamaño y el horario de los lotes de solicitud de predicción. Ten en cuenta lo siguiente:

    • El uso de un tamaño de lote pequeño y un valor de rango de claves alto da como resultado un procesamiento más rápido, pero también podría generar un exceso de las limitaciones de cuota, y es probable que debas solicitar una cuota adicional.
    • Si usas un tamaño de lote mayor y un valor de rango de claves bajo, será más lento, pero es más probable que las operaciones se mantengan dentro de la cuota.
  3. Abre la página Trabajos de Dataflow.

  4. En la lista de trabajos, haz clic en anomaly-detection.

  5. Espera hasta que aparezca el gráfico de trabajo y que, en el elemento StreamFraudData del gráfico, se muestre un tiempo de ejecución de más de 0 segundos.

Verifica los datos en BigQuery

Verifica que los datos se hayan escrito en BigQuery mediante la ejecución de una consulta para ver las transacciones que se identificaron como fraudulentas.

  1. Abre la consola de BigQuery
  2. En el panel Editor de consultas, ejecuta la siguiente consulta:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    Deberías ver resultados similares a los siguientes:

    Primeras 9 filas de los resultados de probabilidad de fraude

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto, pero borra solo esos recursos.

De cualquier manera, debes quitar esos recursos para que no se te cobre por ellos en el futuro. En las siguientes secciones, se describe cómo borrar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra los componentes

Si no deseas borrar el proyecto, usa las siguientes secciones para borrar los componentes facturables de este instructivo.

Detenga el trabajo de Dataflow

  1. Abre la página Trabajos de Dataflow.
  2. En la lista de trabajos, haz clic en anomaly-detection.
  3. En la página de detalles del trabajo, haz clic en Detener.
  4. Selecciona Cancelar.
  5. Haz clic en Detener trabajo.

Borra los depósitos de Cloud Storage

  1. Abre el navegador de Cloud Storage.
  2. Selecciona las casillas de verificación de los depósitos <myProject>-bucket y dataflow-staging-us-central1-<projectNumber>.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, escribe DELETE y, luego, haz clic en Confirmar.

Borra el tema y la suscripción de Pub/Sub

  1. Abre la página Suscripciones de Pub/Sub.
  2. Selecciona la casilla de verificación de la suscripción sample_data.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, haz clic en Borrar para confirmar que deseas borrar la suscripción y su contenido.
  5. Haz clic en Temas.
  6. Selecciona la casilla de verificación del tema sample_data.
  7. Haz clic en Borrar.
  8. En la ventana de superposición que aparece, escribe delete y, luego, haz clic en Borrar.

Borra las tablas y el conjunto de datos de BigQuery

  1. Abre la consola de BigQuery
  2. En la sección Recursos, expande el proyecto en el que completaste este instructivo y selecciona el conjunto de datos fraud_detection.
  3. Haz clic en Borrar conjunto de datos en el encabezado del panel de conjunto de datos.
  4. En la ventana de superposición que aparece, escribe fraud_detection y, luego, haz clic en Borrar.

Borra el modelo de AI Platform

  1. Abre la página Modelos de AI Platform.
  2. En la lista de modelos, haz clic en fraud_detection_with_key.
  3. En la página Detalles del modelo, selecciona la casilla de verificación para la versión v1 (predeterminado).
  4. Haz clic en Más  y, luego, en Borrar.
  5. Cuando se termine de borrar la versión, haz clic en Atrás  para volver a la lista de modelos.
  6. Selecciona la casilla de verificación para el modelo fraud_detection_with_key.
  7. Haz clic en Más  y, luego, en Borrar.

Borra el notebook de AI Platform

  1. Abre la página AI Platform Notebooks.
  2. Selecciona la casilla de verificación para la instancia de notebook boosted-trees.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, haz clic en Borrar.

¿Qué sigue?