Detectar anomalias em transações financeiras usando o AI Platform, o Dataflow e o BigQuery

;

Neste tutorial, mostramos como implementar um aplicativo de detecção de anomalias que identifica transações fraudulentas usando um modelo de árvore elevada.

Este tutorial é destinado a desenvolvedores, engenheiros e cientistas de dados e pressupõe que você tenha conhecimento básico dos seguintes itens:

  • Desenvolvimento de modelos de machine learning com TensorFlow e Python
  • SQL padrão
  • Pipelines do Dataflow criados com o SDK do Apache Beam para Java

Arquitetura

O aplicativo de amostra consiste nos seguintes componentes:

  • Um modelo de árvore otimizado desenvolvido usando o TensorFlow e implantado no AI Platform.
  • Um pipeline do Dataflow que realiza as seguintes tarefas:

    • publica dados de transações de um bucket do Cloud Storage em um tópico do Pub/Sub e, em seguida, lê esses dados como um stream de uma assinatura do Pub/Sub para esse tópico;
    • Recebe estimativas de probabilidade de fraude para cada transação usando a API Timer do Apache Beam para chamadas em microlote para a API de previsão do AI Platform.
    • Grava dados de transações e de dados de probabilidade de fraude em tabelas do BigQuery para análise.

No seguinte diagrama, ilustramos a arquitetura da solução de detecção de anomalias:

Diagrama mostrando a arquitetura da solução de detecção de anomalias.

Conjunto de dados

O modelo de árvore aprimorada usado neste tutorial é treinado no Conjunto de dados financeiros sintéticos para detecção de fraudes do Kaggle. Este conjunto de dados foi gerado com o simulador PaySim.

Usamos um conjunto de dados sintéticos porque há poucos conjuntos de dados financeiros adequados para detecção de fraudes e aqueles que costumam conter informações de identificação pessoal (PII, na sigla em inglês) que precisam ser anônimas.

Objetivos

  • Crie um modelo de árvore otimizado que estima a probabilidade de fraude em transações financeiras.
  • Implantar o modelo no AI Platform para predição on-line.
  • Use um pipeline do Dataflow para:
    • Grave dados de transação do conjunto de dados de amostra em uma tabela transactions no BigQuery.
    • Enviar solicitações em lote para o modelo hospedado para recuperar previsões de probabilidade de fraude e gravar os resultados em uma tabela fraud_detection no BigQuery.
  • Execute uma consulta do BigQuery que una essas tabelas para ver a probabilidade de fraude de cada transação.

Custos

Neste tutorial, há componentes faturáveis do Google Cloud, a saber:

  • AI Platform
  • BigQuery
  • Cloud Storage
  • Compute Engine
  • Dataflow
  • Pub/Sub

Use a calculadora de preços para gerar uma estimativa de custos baseada na projeção de uso. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs AI Platform Training and Prediction, Cloud Storage, Compute Engine, Dataflow, and Notebooks.

    Ative as APIs

Verificar disponibilidade da cota

  1. Abrir a página Cotas do IAM
  2. Verifique se você tem as seguintes cotas da API Compute Engine disponíveis na região us-central1. Você precisa dessas cotas para executar o job do Dataflow usado neste tutorial. Caso contrário, solicite um aumento de cota.

    Nome do limite Cota
    CPUs 241
    Endereços IP em uso 30
    Grupos de instâncias 1
    Modelos de instâncias 1
    Grupos de instâncias gerenciadas 1
    Persistent Disk padrão (GB) 12900

Criar e implantar o modelo

Siga as instruções nesta seção para criar um modelo de árvore otimizado para prever fraudes em transações financeiras.

Criar um notebook

  1. Abrir o console do Notebooks
  2. Clique em Nova instância.
  3. Escolha TensorFlow Enterprise 1.15 sem GPUs.

    Mostrar o tipo de instância a ser selecionada.

  4. Em Nome da instância, digite boosted-trees.

  5. Clique em Criar A criação da instância do notebook leva alguns minutos.

  6. Quando a instância estiver disponível, clique em Abrir JupyterLab.

  7. Na seção Notebook do JupyterLab, clique em Python 3.

Fazer o download dos dados de amostra

Faça o download de uma cópia do banco de dados de amostra:

  1. Copie o seguinte código na primeira célula do notebook:

    !gsutil cp gs://financial_fraud_detection/fraud_data_kaggle.csv .

  2. Clique em Executar na barra de menu.

Preparar os dados para uso em treinamento

Os dados de amostra são desequilíbrios que podem levar a um modelo impreciso. O código a seguir corrige o desequilíbrio usando o downsampling e, em seguida, divide os dados em um conjunto de treinamento e em um conjunto de teste.

Prepare os dados copiando o código a seguir na segunda célula do notebook e executando-o:

import uuid
import itertools
import numpy as np
import pandas as pd
import os
import tensorflow as tf
import json
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix

os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

data = pd.read_csv('fraud_data_kaggle.csv')

# Split the data into 2 DataFrames
fraud = data[data['isFraud'] == 1]
not_fraud = data[data['isFraud'] == 0]

# Take a random sample of non fraud rows
not_fraud_sample = not_fraud.sample(random_state=2, frac=.005)

# Put it back together and shuffle
df = pd.concat([not_fraud_sample,fraud])
df = shuffle(df, random_state=2)

# Remove a few columns (isFraud is the label column we'll use, not isFlaggedFraud)
df = df.drop(columns=['nameOrig', 'nameDest', 'isFlaggedFraud'])

# Add transaction id to make it possible to map predictions to transactions
df['transactionId'] = [str(uuid.uuid4()) for _ in range(len(df.index))]

train_test_split = int(len(df) * .8)

# Split the dataset for training and testing
train_set = df[:train_test_split]
test_set = df[train_test_split:]

train_labels = train_set.pop('isFraud')
test_labels = test_set.pop('isFraud')

train_set.head()

Depois que o código for concluído, ele gerará várias linhas de exemplo dos dados processados. Os resultados serão semelhantes aos exibidos abaixo:

Primeiras 10 linhas de dados de treinamento processados.

Criar e treinar o modelo

Para criar e treinar o modelo, copie o código a seguir na terceira célula do notebook e execute-o:

# Define features
fc = tf.feature_column
CATEGORICAL_COLUMNS = ['type']
NUMERIC_COLUMNS = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
KEY_COLUMN = 'transactionId'
def one_hot_cat_column(feature_name, vocab):
    return tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

feature_columns = []

for feature_name in CATEGORICAL_COLUMNS:
    vocabulary = train_set[feature_name].unique()
    feature_columns.append(one_hot_cat_column(feature_name, vocabulary))

for feature_name in NUMERIC_COLUMNS:
  feature_columns.append(tf.feature_column.numeric_column(feature_name,
                                           dtype=tf.float32))

# Define training and evaluation input functions
NUM_EXAMPLES = len(train_labels)
def make_input_fn(X, y, n_epochs=None, shuffle=True):
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((dict(X), y))
    if shuffle:
      dataset = dataset.shuffle(NUM_EXAMPLES)
    dataset = dataset.repeat(n_epochs)
    dataset = dataset.batch(NUM_EXAMPLES)
    return dataset
  return input_fn

train_input_fn = make_input_fn(train_set, train_labels)
eval_input_fn = make_input_fn(test_set, test_labels, shuffle=False, n_epochs=1)

# Define the model
n_batches = 1
model = tf.estimator.BoostedTreesClassifier(feature_columns,
                                          n_batches_per_layer=n_batches)
model = tf.contrib.estimator.forward_features(model,KEY_COLUMN)

# Train the model
model.train(train_input_fn, max_steps=100)

# Get metrics to evaluate the model's performance
result = model.evaluate(eval_input_fn)
print(pd.Series(result))

Depois que o código é concluído, ele gera um conjunto de métricas que descreve o desempenho do modelo. Você verá resultados semelhantes aos seguintes, com os valores accuracy e auc cerca de 99%:

Métricas de desempenho para o modelo de árvore aprimoradas.

Testar o modelo

Teste o modelo para verificar se ele rotula as transações fraudulentas corretamente. Basta copiar o código a seguir na quarta célula do notebook e executá-lo:

pred_dicts = list(model.predict(eval_input_fn))
probabilities = pd.Series([pred['logistic'][0] for pred in pred_dicts])

for i,val in enumerate(probabilities[:30]):
  print('Predicted: ', round(val), 'Actual: ', test_labels.iloc[i])
  print()

Depois que o código é concluído, ele produz a probabilidade de fraude estimada e real para os dados de teste. Os resultados serão semelhantes aos exibidos abaixo:

Resultados de probabilidade de fraude reais e previstos para os dados de teste.

Exportar o modelo

Crie um SavedModel a partir do modelo treinado e exporte-o para o Cloud Storage copiando o código a seguir na quinta célula do notebook e executando-o. Substitua myProject pelo ID do projeto que você está usando para concluir este tutorial.

GCP_PROJECT = 'myProject'
MODEL_BUCKET = 'gs://myProject-bucket'

!gsutil mb $MODEL_BUCKET

def json_serving_input_fn():
    feature_placeholders = {
        'type': tf.placeholder(tf.string, [None]),
        'step': tf.placeholder(tf.float32, [None]),
        'amount': tf.placeholder(tf.float32, [None]),
        'oldbalanceOrg': tf.placeholder(tf.float32, [None]),
        'newbalanceOrig': tf.placeholder(tf.float32, [None]),
        'oldbalanceDest': tf.placeholder(tf.float32, [None]),
        'newbalanceDest': tf.placeholder(tf.float32, [None]),
         KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
    }
    features = {key: tf.expand_dims(tensor, -1)
                for key, tensor in feature_placeholders.items()}
    return tf.estimator.export.ServingInputReceiver(features,feature_placeholders)

export_path = model.export_saved_model(
    MODEL_BUCKET + '/explanations-with-key',
    serving_input_receiver_fn=json_serving_input_fn
).decode('utf-8')

!saved_model_cli show --dir $export_path --all

Depois que o código é concluído, ele retorna um SignatureDef que descreve as entradas e saídas do modelo. Os resultados serão semelhantes aos exibidos abaixo:

SignatureDef que descreve as entradas e saídas do modelo.

Implantar o modelo no AI Platform

Implante o modelo para as predições. Para isso, copie o código a seguir na 6a célula do notebook e execute-o. A criação da versão do modelo leva alguns minutos.

MODEL = 'fraud_detection_with_key'

!gcloud ai-platform models create $MODEL

VERSION = 'v1'
!gcloud beta ai-platform versions create $VERSION \
--model $MODEL \
--origin $export_path \
--runtime-version 1.15 \
--framework TENSORFLOW \
--python-version 3.7 \
--machine-type n1-standard-4 \
--num-paths 10

!gcloud ai-platform versions describe $VERSION --model $MODEL

Receber previsões do modelo implantado

Para fazer previsões para o conjunto de dados de teste, copie o seguinte código na sétima célula do notebook e execute-o:

fraud_indices = []

for i,val in enumerate(test_labels):
    if val == 1:
        fraud_indices.append(i)

num_test_examples = 10
import numpy as np

def convert(o):
    if isinstance(o, np.generic): return o.item()
    raise TypeError

for i in range(num_test_examples):
    test_json = {}
    ex = test_set.iloc[fraud_indices[i]]
    keys = ex.keys().tolist()
    vals = ex.values.tolist()
    for idx in range(len(keys)):
        test_json[keys[idx]] = vals[idx]

    print(test_json)
    with open('data.txt', 'a') as outfile:
        json.dump(test_json, outfile, default=convert)
        outfile.write('\n')

!gcloud ai-platform predict --model $MODEL \
--version $VERSION \
--json-instances='data.txt' \
--signature-name='predict'

Depois que o código é concluído, ele retorna previsões para os dados de teste. Os resultados serão semelhantes aos exibidos abaixo:

Previsões do modelo implantado para dados de teste.

Criar e executar o pipeline

Crie um pipeline do Dataflow que leia os dados de transações financeiras, solicite informações de previsão de fraude para cada transação do modelo do AI Platform e, em seguida, grave os dados de previsão de transação e fraude no BigQuery para análise.

Criar o conjunto de dados e as tabelas do BigQuery

  1. Abra o console do BigQuery
  2. Na seção Recursos, selecione o projeto em que você está fazendo este tutorial.
  3. Clique em Criar conjunto de dados.

    Mostrar local do botão "Criar conjunto de dados".

  4. Na página Criar conjunto de dados, faça o seguinte:

    • Em ID do conjunto de dados, digite fraud_detection.
    • Em Local dos dados, selecione Estados Unidos.
    • Clique em Criar conjunto de dados.
  5. No painel Editor de consultas, execute as seguintes instruções SQL para criar as tabelas transactions e fraud_prediction:

    CREATE OR REPLACE TABLE fraud_detection.transactions (
       step INT64,
       nameOrig STRING,
       nameDest STRING,
       isFlaggedFraud INT64,
       isFraud INT64,
       type STRING,
       amount FLOAT64,
       oldbalanceOrg FLOAT64,
       newbalanceOrig FLOAT64,
       oldbalanceDest FLOAT64,
       newbalanceDest FLOAT64,
       transactionId STRING
     );
    
    CREATE OR REPLACE TABLE fraud_detection.fraud_prediction (
        transactionId STRING,
        logistic FLOAT64,
        json_response STRING
    );
    

Criar o tópico e a assinatura do Pub/Sub

  1. Abra o console do Pub/Sub
  2. Clique em Criar tópico.
  3. Em ID do tópico, digite sample_data.
  4. Clique em Criar tópico.
  5. Clique em Assinaturas.
  6. Clique em Criar assinatura.
  7. Para o ID da assinatura, digite sample_data.
  8. Em Selecionar um tópico do Cloud Pub/Sub, selecione projects/<myProject>/topics/sample_data.
  9. Role até a parte inferior da página e clique em Criar.

Execute o pipeline do Dataflow

  1. Ativar o Cloud Shell
  2. No Cloud Shell, execute o comando abaixo para executar o pipeline do Dataflow. Substitua myProject pelo ID do projeto que você está usando para concluir este tutorial:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=myProject \
    --region=us-central1 \
    --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=30,\
    maxNumWorkers=30,\
    workerMachineType=n1-highmem-8,\
    subscriberId=projects/myProject/subscriptions/sample_data,\
    tableSpec=myProject:fraud_detection.transactions,\
    outlierTableSpec=myProject:fraud_detection.fraud_prediction,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,\
    modelId=fraud_detection_with_key,\
    versionId=v1,\
    keyRange=1024,\
    batchSize=500000
    

    Para adaptar esse pipeline à produção, altere os valores de parâmetro batchSize e keyRange para controlar o tamanho e o tempo dos lotes de solicitação de previsão. Considere o seguinte:

    • Usar um tamanho de lote pequeno e um valor de intervalo de chave alto resulta em um processamento mais rápido, mas também pode exceder as limitações da cota e exigir que você solicite cota adicional.
    • Usar um tamanho de lote maior e um valor de intervalo de chaves baixo será mais lento, mas tem maior probabilidade de manter as operações dentro da cota.
  3. Abrir a página "Jobs do Dataflow"

  4. Na lista de jobs, clique em detecção de anomalias.

  5. Aguarde até que o gráfico do job seja exibido e o elemento StreamFractionData do gráfico seja maior que um tempo de execução de 0 segundo.

Verifique os dados no BigQuery

Verifique se os dados foram gravados no BigQuery executando uma consulta para ver as transações identificadas como fraudulentas.

  1. Abra o console do BigQuery
  2. No painel Editor de consultas, execute a seguinte consulta:

    SELECT DISTINCT
      outlier.logistic as fraud_probablity,
      outlier.transactionId,
      transactions.* EXCEPT (transactionId,isFraud,isFlaggedFraud)
    FROM `fraud_detection.fraud_prediction` AS outlier
    JOIN fraud_detection.transactions AS transactions
    ON transactions.transactionId = outlier.transactionId
    WHERE logistic >0.99
    ORDER BY fraud_probablity DESC;
    

    Os resultados serão semelhantes aos exibidos abaixo:

    Primeiras nove linhas de resultados de probabilidade de fraude.

Limpeza

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que contém os recursos ou mantenha o projeto, mas exclua apenas esses recursos.

Seja como for, remova esses recursos para que não sejam faturados no futuro. Nas seções a seguir, você verá como excluir esses recursos.

Excluir o projeto

A maneira mais fácil de eliminar o faturamento é excluir o projeto criado para o tutorial.

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir os componentes

Se você não quiser excluir o projeto, use as seguintes seções para excluir os componentes faturáveis deste tutorial.

interrompa o job do Dataflow

  1. Abrir a página "Jobs do Dataflow"
  2. Na lista de jobs, clique em detecção de anomalias.
  3. Na página de detalhes do job, clique em Parar.
  4. Selecione Cancelar.
  5. Clique em Interromper job.

Excluir os buckets do Cloud Storage

  1. Abra o navegador do Cloud Storage
  2. Marque as caixas de seleção dos buckets <myProject>-bucket e dataflow-staging-us-central1-<projectNumber>.
  3. Clique em Excluir.
  4. Na janela de sobreposição que aparece, digite DELETE e clique em Confirmar.

Exclua o tópico do Pub/Sub e a assinatura

  1. Abrir a página de assinaturas do Pub/Sub
  2. Marque a caixa de seleção da assinatura sample_data.
  3. Clique em Excluir.
  4. Na janela de sobreposição que aparece, clique em Excluir e confirme a exclusão da assinatura e do conteúdo dela.
  5. Clique em Tópicos.
  6. Marque a caixa de seleção do tópico sample_data.
  7. Clique em Excluir.
  8. Na janela de sobreposição que aparece, digite delete e clique em Excluir.

Excluir o conjunto de dados e as tabelas do BigQuery

  1. Abra o console do BigQuery
  2. Na seção Recursos, expanda o projeto em que você está concluindo este tutorial e selecione o conjunto de dados fraud_detection.
  3. Clique em Excluir conjunto de dados no cabeçalho do painel do conjunto de dados.
  4. Na janela de sobreposição que aparece, digite fraud_detection e clique em Excluir.

Excluir o modelo do AI Platform

  1. Abra a página de modelos do AI Platform
  2. Na lista de modelos, clique em fraud_detection_with_key.
  3. Na página Detalhes do modelo, marque a caixa de seleção da versão v1 (padrão).
  4. Clique em Mais e em Excluir.
  5. Quando a exclusão da versão for concluída, clique em Voltar para retornar à lista de modelos.
  6. Marque a caixa de seleção do modelo fraud_detection_with_key.
  7. Clique em Mais e em Excluir.

Excluir o AI Platform Notebooks

  1. Abrir a página do AI Platform Notebooks
  2. Marque a caixa de seleção da instância do notebook boosted-trees.
  3. Clique em Excluir.
  4. Na janela de sobreposição que aparece, clique em Excluir.

A seguir