Usar o Vertex AI Pipelines para modelagem de propensão no Google Cloud

Last reviewed 2024-06-25 UTC

Neste documento, descrevemos um exemplo de pipeline implementado no Google Cloud que realiza modelagem de propensão. Ele é destinado a engenheiros de dados, engenheiros de machine learning ou equipes de ciências de marketing que criam e implantam modelos de machine learning. Neste documento, presumimos que você conheça os conceitos de machine learning e que conheça os notebooks do Google Cloud, BigQuery, Vertex AI Pipelines, Python e Jupyter. Também é necessário que você tenha uma compreensão do Google Analytics 4 e do recurso de exportação bruto no BigQuery.

O pipeline com que você trabalha usa dados de amostra do Google Analytics. O o pipeline cria vários modelos usando o BigQuery ML e o XGBoost, e você executa o pipeline usando o Vertex AI Pipelines. Neste documento, você verá a descrição dos processos de treinamento dos modelos, da avaliação e da implantação deles. Descrevemos também como automatizar todo o processo.

O código completo do pipeline está em um notebook do Jupyter em um repositório do GitHub.

O que é modelagem de propensão?

A modelagem de propensão prevê ações que um consumidor pode realizar. Exemplos de estimativa de propensão incluem prever quais consumidores provavelmente comprarão um produto, inscrever-se em um serviço ou até mesmo se desligar e não se tornar mais um cliente ativo de uma marca.

A saída de um modelo de propensão é uma pontuação entre 0 e 1 para cada consumidor, em que essa pontuação representa a probabilidade de o consumidor realizar essa ação. Um dos principais fatores que levam as organizações à modelagem de propensão é a necessidade de fazer mais com os dados próprios. Para casos de uso de marketing, os melhores modelos de propensão incluem sinais de fontes on-line e off-line, como site analytics e dados de CRM.

Esta demonstração usa dados de amostra do GA4 no BigQuery. No seu caso de uso, considere outros sinais off-line.

Como o MLOps simplifica os pipelines de ML

A maioria dos modelos de ML não é usada na produção. Os resultados do modelo geram insights e, frequentemente, depois que as equipes de ciência de dados concluem um modelo, uma equipe de engenharia de ML ou engenharia de software precisa incluí-la no código para produção usando um framework, como Flask ou FastAPI. Esse processo geralmente exige que o modelo seja criado em um novo framework, o que significa que os dados precisam ser retransformados. Esse trabalho pode levar semanas ou meses, e muitos modelos não chegam à produção.

As operações de machine learning (MLOps) se tornaram importantes para receber valor de projetos de ML e MLOps e agora são um conjunto de habilidades em evolução para organizações de ciência de dados. Para ajudar as organizações a entender esse valor, o Google Cloud publicou um Guia de práticas para MLOps que fornece uma visão geral de MLOps.

Usando os princípios do MLOps e do Google Cloud, é possível enviar modelos para um endpoint usando um processo automático que remove grande parte da complexidade do processo manual. As ferramentas e o processo descritos neste documento discutem uma abordagem de propriedade do pipeline completa, o que ajuda você a colocar seus modelos em produção. O documento do guia de profissionais mencionado anteriormente fornece uma solução horizontal e um resumo do que é possível usar com MLOps e o Google Cloud.

O que é o Vertex AI Pipelines?

O Vertex AI Pipelines permite executar pipelines que foram criados usando o Kubeflow Pipelines ou o TensorFlow Extended (TFX). Sem a Vertex AI, a execução de qualquer um desses frameworks de código aberto em escala requer que você configure e mantenha seus próprios clusters do Kubernetes. O Vertex AI Pipelines aborda esse desafio. Por ser um serviço gerenciado, ele é escalonado verticalmente ou reduzido conforme necessário e não requer manutenção contínua.

Cada etapa do processo do Vertex AI Pipelines consiste em um contêiner independente que pode receber entradas ou produzir saídas na forma de artefatos. Por exemplo, se uma etapa no processo criar o conjunto de dados, a saída será o artefato do conjunto de dados. Esse artefato de conjunto de dados pode ser usado como entrada para a próxima etapa. Como cada componente é um contêiner separado, é necessário fornecer informações para cada componente do pipeline, como o nome da imagem base e uma lista de quaisquer dependências.

O processo de criação do pipeline

O exemplo descrito neste documento usa um notebook Juptyer para criar os componentes do pipeline e para compilar, executar e automatizar esses componentes. Como observado anteriormente, o notebook está em um repositório do GitHub.

É possível executar o código do notebook usando uma instância de notebooks gerenciados pelo usuário do Vertex AI Workbench, que processa a autenticação para você. O Vertex AI Workbench permite que você trabalhe com notebooks para criar máquinas, criar notebooks e se conectar ao Git. O Vertex AI Workbench inclui muitos outros recursos, mas eles não são abordados neste documento.

Quando a execução do pipeline for concluída, um diagrama semelhante ao seguinte será gerado no Vertex AI Pipelines:

Um gráfico acíclico dirigido que mostra os componentes executados pelo pipeline.

O diagrama anterior é um gráfico acíclico dirigido (DAG, na sigla em inglês). Criar e revisar o DAG é uma etapa central para entender os pipelines de ML ou de dados. Os principais atributos dos DAGs são o fluxo dos componentes em uma única direção, nesse caso, de cima para baixo, e a ausência de ciclos. Ou seja, um componente pai não depende dos próprios componentes filhos. Alguns componentes podem ocorrer em paralelo, enquanto outros têm dependências e, portanto, ocorrem em série.

A caixa de seleção verde em cada componente significa que o código foi executado corretamente. Se houver erros, você verá um ponto de exclamação vermelho. Clique em cada componente no diagrama para ver mais detalhes do job.

O diagrama do DAG está incluído nessa seção do documento para servir como um modelo para cada componente criado pelo pipeline. Veja na lista a seguir uma descrição de cada componente.

O pipeline completo executa as seguintes etapas, conforme mostrado no diagrama do DAG:

  1. create-input-view: este componente cria uma visualização do BigQuery. O componente copia o SQL de um bucket do Cloud Storage e preenche os valores de parâmetros fornecidos. Essa visualização do BigQuery é o conjunto de dados de entrada usado em todos os modelos posteriormente no pipeline.
  2. build-bqml-logistic: o pipeline usa o BigQuery ML para criar um modelo de regressão logística. Quando esse componente for concluído, um novo modelo ficará visível no console do BigQuery. Você pode usar esse objeto de modelo para visualizar o desempenho do modelo e depois para criar previsões.
  3. evaluate-bqml-logistic: o pipeline usa esse componente para criar uma curva de precisão/recall (logistic_data_path no diagrama do DAG) para a regressão logística. Esse artefato é armazenado em um bucket do Cloud Storage.
  4. build-bqml-xgboost: este componente cria um modelo XGBoost usando o BigQuery ML. Quando esse componente for concluído, será possível visualizar um novo objeto de modelo (system.Model) no console do BigQuery. Você pode usar esse objeto para ver o desempenho do modelo e depois para criar previsões.
  5. evaluate-bqml-xgboost: este componente cria uma curva de precisão/recall chamada xgboost_data_path para o modelo XGBoost. Esse artefato é armazenado em um bucket do Cloud Storage.
  6. build-xgb-xgboost: o pipeline cria um modelo XGBoost. Esse componente usa Python em vez do BigQuery ML para que você veja diferentes abordagens para criar o modelo. Quando esse componente é concluído, ele armazena um objeto de modelo e as métricas de desempenho em um bucket do Cloud Storage.
  7. deploy-xgb: este componente implanta o modelo XGBoost. Ele cria um endpoint que permite previsões em lote ou on-line. Explore o endpoint na guia Modelos na página do console da Vertex AI. O endpoint é escalonado automaticamente para corresponder ao tráfego.
  8. build-bqml-automl: o pipeline cria um modelo do AutoML usando o BigQuery ML. Quando esse componente for concluído, um novo objeto de modelo poderá ser visualizado no console do BigQuery. Você pode usar esse objeto para ver o desempenho do modelo e depois para criar previsões.
  9. evaluate-bqml-automl: o pipeline cria uma curva de precisão/recall para o modelo do AutoML. O artefato é armazenado em um bucket do Cloud Storage.

O processo não envia os modelos do BigQuery ML para um endpoint. Isso ocorre porque é possível gerar previsões diretamente do objeto de modelo no BigQuery. Ao decidir entre usar o BigQuery ML e usar outras bibliotecas para sua solução, pense em como as previsões precisam ser geradas. Se uma predição em lote diária atender às suas necessidades, permanecer no ambiente do BigQuery poderá simplificar o fluxo de trabalho. No entanto, se você precisar de previsões em tempo real ou se o cenário precisar de uma funcionalidade que esteja em outra biblioteca, siga as etapas deste documento para enviar o modelo salvo a um endpoint.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

O notebook do Jupyter para este cenário

As tarefas para criar e criar o pipeline são integradas a um notebook do Jupyter, que está em um repositório do GitHub.

Para executar as tarefas, você recebe o notebook e executa as células de código no notebook em ordem. O fluxo descrito neste documento presume que você está executando os notebooks no Vertex AI Workbench.

Abra o ambiente do Vertex AI Workbench

Comece clonando o repositório do GitHub em um ambiente Vertex AI Workbench.

  1. No Console do Google Cloud, selecione o projeto em que você quer criar o notebook.
  2. Acesse a página do Vertex AI Workbench.

    Acesse a página do Vertex AI Workbench

  3. Na guia Notebooks gerenciados pelo usuário, clique em Novo notebook.

  4. Na lista de tipos de notebook, escolha um notebook do Python 3.

  5. Na caixa de diálogo Novo notebook, clique em Opções avançadas e, em Tipo de máquina, selecione o tipo de máquina que você quer usar. Se não tiver certeza, escolha n1-standard-1 (1 cVPU, 3.75 GB de RAM).

  6. Clique em Criar.

    Leva alguns instantes para que o ambiente de notebook seja criado.

  7. Quando o notebook for criado, selecione-o e clique em Abrir o JupyterLab.

    O ambiente do JupyterLab é aberto no navegador.

  8. Para iniciar uma guia de terminal, selecione File > New > Launcher.

  9. Clique no ícone Terminal na guia Acesso rápido.

  10. No terminal, clone o repositório do GitHub mlops-on-gcp:

    git clone https://github.com/GoogleCloudPlatform/cloud-for-marketing/
    

    Quando o comando for concluído, você verá a pasta cloud-for-marketing no navegador de arquivos.

Definir configurações de notebooks

Antes de executar o notebook, é preciso configurá-lo. O notebook requer um bucket do Cloud Storage para armazenar artefatos de pipeline. Portanto, comece criando esse bucket.

  1. Crie um bucket do Cloud Storage no qual o notebook pode armazenar artefatos de pipeline. O nome do bucket precisa ser globalmente exclusivo.
  2. Na pasta cloud-for-marketing/marketing-analytics/predicting/kfp_pipeline/, abra o notebook Propensity_Pipeline.ipynb.
  3. No notebook, defina o valor da variável PROJECT_ID como o ID do projeto do Google Cloud em que você quer executar o pipeline.
  4. Defina o valor da variável BUCKET_NAME como o nome do bucket que você acabou de criar.

O restante deste documento descreve snippets de código que são importantes para entender como o pipeline funciona. Para ver a implementação completa, consulte o repositório do GitHub (em inglês).

Criar a visualização do BigQuery

A primeira etapa no pipeline gera os dados de entrada, que serão usados para criar cada modelo. Este componente do Vertex AI Pipelines gera uma visualização do BigQuery. Para simplificar o processo de criação da visualização, alguns SQL já foram gerados e salvos em um arquivo de texto no GitHub.

O código para cada componente começa decorando (modificando uma classe pai ou a função por meio de atributos) da classe de componentes Vertex AI Pipelines. Em seguida, o código define a função create_input_view, que é uma etapa no pipeline.

A função requer várias entradas. No momento, alguns desses valores são codificados no código, como as datas de início e término. Ao automatizar o pipeline, é possível modificar o código para usar valores adequados (por exemplo, usando a função CURRENT_DATE para uma data). ou você pode atualizar o componente para usar esses valores como parâmetros, em vez de mantê-los codificados. Também é necessário alterar o valor de ga_data_ref para o nome da sua tabela do GA4 e definir o valor da variável conversion como sua conversão. Neste exemplo, usamos os dados públicos de amostra do GA4.

A listagem a seguir mostra o código para o componente create-input-view.

@component(
   # this component builds a BigQuery view, which will be the underlying source for model
   packages_to_install=["google-cloud-bigquery", "google-cloud-storage"],
   base_image="python:3.9",
   output_component_file="output_component/create_input_view.yaml",
)
def create_input_view(view_name: str,
                     data_set_id: str,
                     project_id: str,
                     bucket_name: str,
                     blob_path: str

):
   from google.cloud import bigquery
   from google.cloud import storage
   client = bigquery.Client(project=project_id)
   dataset = client.dataset(data_set_id)
   table_ref = dataset.table(view_name)
   ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'
   conversion = "hits.page.pageTitle like '%Shopping Cart%'"
   start_date = '20170101'
   end_date = '20170131'

def get_sql(bucket_name, blob_path):
       from google.cloud import storage
       storage_client = storage.Client()
       bucket = storage_client.get_bucket(bucket_name)
       blob = bucket.get_blob(blob_path)
       content = blob.download_as_string()
       return content
def if_tbl_exists(client, table_ref):

...

   else:
       content = get_sql()
       content = str(content, 'utf-8')
       create_base_feature_set_query = content.
                                   format(start_date = start_date,
                                   end_date = end_date,
                                   ga_data_ref = ga_data_ref,
                                   conversion = conversion)
shared_dataset_ref = client.dataset(data_set_id)
base_feature_set_view_ref = shared_dataset_ref.table(view_name)
base_feature_set_view = bigquery.Table(base_feature_set_view_ref)
base_feature_set_view.view_query = create_base_feature_set_query.format(project_id)
base_feature_set_view = client.create_table(base_feature_set_view)

Crie o modelo do BigQuery ML

Após a criação da visualização, execute o componente chamado build_bqml_logistic para criar um modelo do BigQuery ML. Esse bloco de notebook é um componente essencial. Usando a visualização de treinamento que você criou no primeiro bloco, você cria um modelo do BigQuery ML. Neste exemplo, o notebook usa regressão logística.

Para informações sobre os tipos de modelo e os hiperparâmetros disponíveis, consulte a documentação de referência do BigQuery ML.

A listagem a seguir mostra o código desse componente.

@component(
   # this component builds a logistic regression with BigQuery ML
   packages_to_install=["google-cloud-bigquery"],
   base_image="python:3.9",
   output_component_file="output_component/create_bqml_model_logistic.yaml"
)
def build_bqml_logistic(project_id: str,
                       data_set_id: str,
                       model_name: str,
                       training_view: str
):
   from google.cloud import bigquery
   client = bigquery.Client(project=project_id)
   model_name = f"{project_id}.{data_set_id}.{model_name}"
   training_set = f"{project_id}.{data_set_id}.{training_view}"
   build_model_query_bqml_logistic = '''
   CREATE OR REPLACE MODEL `{model_name}`
   OPTIONS(model_type='logistic_reg'
   , INPUT_LABEL_COLS = ['label']
   , L1_REG = 1
   , DATA_SPLIT_METHOD = 'RANDOM'
   , DATA_SPLIT_EVAL_FRACTION = 0.20
   ) AS
       SELECT * EXCEPT (fullVisitorId, label),
       CASE WHEN label is null then 0 ELSE label end as label
   FROM `{training_set}`
   '''.format(model_name = model_name, training_set = training_set)
job_config = bigquery.QueryJobConfig()
client.query(build_model_query_bqml_logistic, job_config=job_config)

Usar o XGBoost em vez do BigQuery ML

O componente ilustrado na seção anterior usa o BigQuery ML. Na próxima seção de notebooks, você verá como usar o XGBoost diretamente no Python em vez de usar o BigQuery ML.

Execute o componente chamado build_bqml_xgboost para criar o componente para executar um modelo de classificação XGBoost padrão com uma pesquisa de grade. Em seguida, o código salva o modelo como um artefato no bucket do Cloud Storage que você criou. A função é compatível com outros parâmetros (metrics e model) para artefatos de saída. Esses parâmetros são exigidos pelo Vertex AI Pipelines.

@component(
   # this component builds an xgboost classifier with xgboost
   packages_to_install=["google-cloud-bigquery", "xgboost", "pandas", "sklearn", "joblib", "pyarrow"],
   base_image="python:3.9",
   output_component_file="output_component/create_xgb_model_xgboost.yaml"
)
def build_xgb_xgboost(project_id: str,
                     data_set_id: str,
                     training_view: str,
                     metrics: Output[Metrics],
                     model: Output[Model]
):

...

  data_set = f"{project_id}.{data_set_id}.{training_view}"
  build_df_for_xgboost = '''
                         SELECT * FROM `{data_set}`
                         '''.format(data_set = data_set)

...

  xgb_model = XGBClassifier(n_estimators=50,
                            objective='binary:hinge',
                            silent=True,
                            nthread=1,
                           eval_metric="auc")
   random_search = RandomizedSearchCV(xgb_model,
                                     param_distributions=params,
                                     n_iter=param_comb,
                                     scoring='precision',
                                     n_jobs=4,
                                     cv=skf.split(X_train,y_train),
                                     verbose=3,
                                     random_state=1001 )
  random_search.fit(X_train, y_train)
  xgb_model_best = random_search.best_estimator_
  predictions = xgb_model_best.predict(X_test)
  score = accuracy_score(y_test, predictions)
  auc = roc_auc_score(y_test, predictions)
  precision_recall = precision_recall_curve(y_test, predictions)

  metrics.log_metric("accuracy",(score * 100.0))
  metrics.log_metric("framework", "xgboost")
  metrics.log_metric("dataset_size", len(df))
  metrics.log_metric("AUC", auc)

  dump(xgb_model_best, model.path + ".joblib")

Criar um endpoint

Execute o componente chamado deploy_xgb para criar um endpoint usando o modelo XGBoost da seção anterior. O componente usa o artefato do modelo XGBoost anterior, cria um contêiner e implanta o endpoint, além de fornecer o URL do endpoint como um artefato para visualizá-lo. Quando essa etapa for concluída, um endpoint da Vertex AI será criado e poderá ser visualizado na página do console da Vertex AI.

@component(
   # Deploys xgboost model
   packages_to_install=["google-cloud-aiplatform", "joblib", "sklearn", "xgboost"],
   base_image="python:3.9",
   output_component_file="output_component/xgboost_deploy_component.yaml",
)
def deploy_xgb(
   model: Input[Model],
   project_id: str,
   vertex_endpoint: Output[Artifact],
   vertex_model: Output[Model]
):
   from google.cloud import aiplatform
   aiplatform.init(project=project_id)
   deployed_model = aiplatform.Model.upload(
       display_name="tai-propensity-test-pipeline",
       artifact_uri = model.uri.replace("model", ""),
       serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest"
   )
   endpoint = deployed_model.deploy(machine_type="n1-standard-4")
# Save data to the output params
   vertex_endpoint.uri = endpoint.resource_name
   vertex_model.uri = deployed_model.resource_name

Definir o pipeline

Para definir o pipeline, defina cada operação com base nos componentes criados anteriormente. Em seguida, especifique a ordem dos elementos do pipeline se eles não forem explicitamente chamados no componente.

Por exemplo, o código a seguir no notebook define um pipeline. Nesse caso, o código exige que o componente build_bqml_logistic_op seja executado após o componente create_input_view_op.

@dsl.pipeline(
   # Default pipeline root. You can override it when submitting the pipeline.
   pipeline_root=PIPELINE_ROOT,
   # A name for the pipeline.
   name="pipeline-test",
   description='Propensity BigQuery ML Test'
)
def pipeline():

   create_input_view_op = create_input_view(
                          view_name = VIEW_NAME,
                          data_set_id = DATA_SET_ID,
                          project_id = PROJECT_ID,
                          bucket_name = BUCKET_NAME,
                          blob_path = BLOB_PATH
                                            )
    build_bqml_logistic_op = build_bqml_logistic(
                        project_id = PROJECT_ID,
                        data_set_id = DATA_SET_ID,
                        model_name = 'bqml_logistic_model',
                        training_view = VIEW_NAME
                                                  )

 # several components have been deleted for brevity

   build_bqml_logistic_op.after(create_input_view_op)
   build_bqml_xgboost_op.after(create_input_view_op)
   build_bqml_automl_op.after(create_input_view_op)
   build_xgb_xgboost_op.after(create_input_view_op)

   evaluate_bqml_logistic_op.after(build_bqml_logistic_op)
   evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op)
   evaluate_bqml_automl_op.after(build_bqml_automl_op)

Compilar e executar o pipeline

Agora é possível compilar e executar o pipeline.

O código a seguir no notebook define o valor enable_caching como verdadeiro para ativar o armazenamento em cache. Quando o armazenamento em cache é ativado, as execuções anteriores em que um componente foi concluído não são executadas novamente. Essa sinalização é útil especialmente quando você está testando o pipeline porque, quando o armazenamento em cache está ativado, a execução é concluída mais rapidamente e usa menos recursos.

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run = pipeline_jobs.PipelineJob(
   display_name="test-pipeine",
   template_path="pipeline.json",

   job_id="test-{0}".format(TIMESTAMP),
   enable_caching=True
)
run.run()

Automatize o pipeline

Nesta etapa, você iniciou o primeiro pipeline. Acesse o console Vertex AI Pipelines no console para ver o status desse job. Você pode observar como cada contêiner é criado e executado. Também é possível rastrear erros em componentes específicos nesta seção clicando em cada um deles.

Para programar o pipeline, crie uma função do Cloud e use um programador semelhante a um cron job.

O código na última seção do notebook programa o pipeline para ser executado uma vez por dia, conforme mostrado no snippet de código a seguir:

from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
                            region='us-central1'
                            )
api_client.create_schedule_from_job_spec(
   job_spec_path='pipeline.json',
   schedule='0 * * * *',
   enable_caching=False
)

Usar o canal concluído em produção

O pipeline concluído executou as seguintes tarefas:

  • criou um conjunto de dados de entrada.
  • treinou vários modelos usando o BigQuery ML e o XGBoost do Python.
  • analisou resultados de modelos.
  • implantou o modelo XGBoost.

Você também automatizou o pipeline usando o Cloud Functions e o Cloud Scheduler para execução diária.

O pipeline definido no notebook foi criado para ilustrar maneiras de criar vários modelos. No entanto, isso não seria possível porque ele já foi criado em um cenário de produção. No entanto, é possível usar esse pipeline como um guia e modificar os componentes para atender às suas necessidades. Por exemplo, é possível editar o processo de criação de recursos para aproveitar seus dados, modificar períodos e, talvez, criar modelos alternativos. Escolha também o modelo entre aqueles ilustrados que atendam melhor aos seus requisitos de produção.

Quando o pipeline estiver pronto para produção, será possível implementar tarefas adicionais. Por exemplo, é possível implementar um modelo campeão/desafiador, em que a cada dia um novo modelo é criado e o novo modelo (o desafiador) e o atual (o campeão) são pontuados em novos dados. Você coloca o novo modelo em produção somente se o desempenho dele for melhor do que o atual. Para monitorar o progresso do seu sistema, mantenha um registro do desempenho do modelo de cada dia e visualize o desempenho em alta.

A seguir

Colaboradores

Autor: Tai Conley | Engenheiro de clientes do Cloud

Outro colaborador: Lars Ahlfors | Engenheiro de clientes do Cloud