Neste documento, descrevemos um exemplo de pipeline implementado no Google Cloud que realiza modelagem de propensão. Ele é destinado a engenheiros de dados, engenheiros de machine learning ou equipes de ciências de marketing que criam e implantam modelos de machine learning. Neste documento, presumimos que você conheça os conceitos de machine learning e que conheça os notebooks do Google Cloud, BigQuery, Vertex AI Pipelines, Python e Jupyter. Também é necessário que você tenha uma compreensão do Google Analytics 4 e do recurso de exportação bruto no BigQuery.
O pipeline com que você trabalha usa dados de amostra do Google Analytics. O o pipeline cria vários modelos usando o BigQuery ML e o XGBoost, e você executa o pipeline usando o Vertex AI Pipelines. Neste documento, você verá a descrição dos processos de treinamento dos modelos, da avaliação e da implantação deles. Descrevemos também como automatizar todo o processo.
O código completo do pipeline está em um notebook do Jupyter em um repositório do GitHub.
O que é modelagem de propensão?
A modelagem de propensão prevê ações que um consumidor pode realizar. Exemplos de estimativa de propensão incluem prever quais consumidores provavelmente comprarão um produto, inscrever-se em um serviço ou até mesmo se desligar e não se tornar mais um cliente ativo de uma marca.
A saída de um modelo de propensão é uma pontuação entre 0 e 1 para cada consumidor, em que essa pontuação representa a probabilidade de o consumidor realizar essa ação. Um dos principais fatores que levam as organizações à modelagem de propensão é a necessidade de fazer mais com os dados próprios. Para casos de uso de marketing, os melhores modelos de propensão incluem sinais de fontes on-line e off-line, como site analytics e dados de CRM.
Esta demonstração usa dados de amostra do GA4 no BigQuery. No seu caso de uso, considere outros sinais off-line.
Como o MLOps simplifica os pipelines de ML
A maioria dos modelos de ML não é usada na produção. Os resultados do modelo geram insights e, frequentemente, depois que as equipes de ciência de dados concluem um modelo, uma equipe de engenharia de ML ou engenharia de software precisa incluí-la no código para produção usando um framework, como Flask ou FastAPI. Esse processo geralmente exige que o modelo seja criado em um novo framework, o que significa que os dados precisam ser retransformados. Esse trabalho pode levar semanas ou meses, e muitos modelos não chegam à produção.
As operações de machine learning (MLOps) se tornaram importantes para receber valor de projetos de ML e MLOps e agora são um conjunto de habilidades em evolução para organizações de ciência de dados. Para ajudar as organizações a entender esse valor, o Google Cloud publicou um Guia de práticas para MLOps que fornece uma visão geral de MLOps.
Usando os princípios do MLOps e do Google Cloud, é possível enviar modelos para um endpoint usando um processo automático que remove grande parte da complexidade do processo manual. As ferramentas e o processo descritos neste documento discutem uma abordagem de propriedade do pipeline completa, o que ajuda você a colocar seus modelos em produção. O documento do guia de profissionais mencionado anteriormente fornece uma solução horizontal e um resumo do que é possível usar com MLOps e o Google Cloud.
O que é o Vertex AI Pipelines?
O Vertex AI Pipelines permite executar pipelines que foram criados usando o Kubeflow Pipelines ou o TensorFlow Extended (TFX). Sem a Vertex AI, a execução de qualquer um desses frameworks de código aberto em escala requer que você configure e mantenha seus próprios clusters do Kubernetes. O Vertex AI Pipelines aborda esse desafio. Por ser um serviço gerenciado, ele é escalonado verticalmente ou reduzido conforme necessário e não requer manutenção contínua.
Cada etapa do processo do Vertex AI Pipelines consiste em um contêiner independente que pode receber entradas ou produzir saídas na forma de artefatos. Por exemplo, se uma etapa no processo criar o conjunto de dados, a saída será o artefato do conjunto de dados. Esse artefato de conjunto de dados pode ser usado como entrada para a próxima etapa. Como cada componente é um contêiner separado, é necessário fornecer informações para cada componente do pipeline, como o nome da imagem base e uma lista de quaisquer dependências.
O processo de criação do pipeline
O exemplo descrito neste documento usa um notebook Juptyer para criar os componentes do pipeline e para compilar, executar e automatizar esses componentes. Como observado anteriormente, o notebook está em um repositório do GitHub.
É possível executar o código do notebook usando uma instância de notebooks gerenciados pelo usuário do Vertex AI Workbench, que processa a autenticação para você. O Vertex AI Workbench permite que você trabalhe com notebooks para criar máquinas, criar notebooks e se conectar ao Git. O Vertex AI Workbench inclui muitos outros recursos, mas eles não são abordados neste documento.
Quando a execução do pipeline for concluída, um diagrama semelhante ao seguinte será gerado no Vertex AI Pipelines:
O diagrama anterior é um gráfico acíclico dirigido (DAG, na sigla em inglês). Criar e revisar o DAG é uma etapa central para entender os pipelines de ML ou de dados. Os principais atributos dos DAGs são o fluxo dos componentes em uma única direção, nesse caso, de cima para baixo, e a ausência de ciclos. Ou seja, um componente pai não depende dos próprios componentes filhos. Alguns componentes podem ocorrer em paralelo, enquanto outros têm dependências e, portanto, ocorrem em série.
A caixa de seleção verde em cada componente significa que o código foi executado corretamente. Se houver erros, você verá um ponto de exclamação vermelho. Clique em cada componente no diagrama para ver mais detalhes do job.
O diagrama do DAG está incluído nessa seção do documento para servir como um modelo para cada componente criado pelo pipeline. Veja na lista a seguir uma descrição de cada componente.
O pipeline completo executa as seguintes etapas, conforme mostrado no diagrama do DAG:
create-input-view
: este componente cria uma visualização do BigQuery. O componente copia o SQL de um bucket do Cloud Storage e preenche os valores de parâmetros fornecidos. Essa visualização do BigQuery é o conjunto de dados de entrada usado em todos os modelos posteriormente no pipeline.build-bqml-logistic
: o pipeline usa o BigQuery ML para criar um modelo de regressão logística. Quando esse componente for concluído, um novo modelo ficará visível no console do BigQuery. Você pode usar esse objeto de modelo para visualizar o desempenho do modelo e depois para criar previsões.evaluate-bqml-logistic
: o pipeline usa esse componente para criar uma curva de precisão/recall (logistic_data_path
no diagrama do DAG) para a regressão logística. Esse artefato é armazenado em um bucket do Cloud Storage.build-bqml-xgboost
: este componente cria um modelo XGBoost usando o BigQuery ML. Quando esse componente for concluído, será possível visualizar um novo objeto de modelo (system.Model
) no console do BigQuery. Você pode usar esse objeto para ver o desempenho do modelo e depois para criar previsões.evaluate-bqml-xgboost
: este componente cria uma curva de precisão/recall chamadaxgboost_data_path
para o modelo XGBoost. Esse artefato é armazenado em um bucket do Cloud Storage.build-xgb-xgboost
: o pipeline cria um modelo XGBoost. Esse componente usa Python em vez do BigQuery ML para que você veja diferentes abordagens para criar o modelo. Quando esse componente é concluído, ele armazena um objeto de modelo e as métricas de desempenho em um bucket do Cloud Storage.deploy-xgb
: este componente implanta o modelo XGBoost. Ele cria um endpoint que permite previsões em lote ou on-line. Explore o endpoint na guia Modelos na página do console da Vertex AI. O endpoint é escalonado automaticamente para corresponder ao tráfego.build-bqml-automl
: o pipeline cria um modelo do AutoML usando o BigQuery ML. Quando esse componente for concluído, um novo objeto de modelo poderá ser visualizado no console do BigQuery. Você pode usar esse objeto para ver o desempenho do modelo e depois para criar previsões.evaluate-bqml-automl
: o pipeline cria uma curva de precisão/recall para o modelo do AutoML. O artefato é armazenado em um bucket do Cloud Storage.
O processo não envia os modelos do BigQuery ML para um endpoint. Isso ocorre porque é possível gerar previsões diretamente do objeto de modelo no BigQuery. Ao decidir entre usar o BigQuery ML e usar outras bibliotecas para sua solução, pense em como as previsões precisam ser geradas. Se uma predição em lote diária atender às suas necessidades, permanecer no ambiente do BigQuery poderá simplificar o fluxo de trabalho. No entanto, se você precisar de previsões em tempo real ou se o cenário precisar de uma funcionalidade que esteja em outra biblioteca, siga as etapas deste documento para enviar o modelo salvo a um endpoint.
Custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
O notebook do Jupyter para este cenário
As tarefas para criar e criar o pipeline são integradas a um notebook do Jupyter, que está em um repositório do GitHub.
Para executar as tarefas, você recebe o notebook e executa as células de código no notebook em ordem. O fluxo descrito neste documento presume que você está executando os notebooks no Vertex AI Workbench.
Abra o ambiente do Vertex AI Workbench
Comece clonando o repositório do GitHub em um ambiente Vertex AI Workbench.
- No Console do Google Cloud, selecione o projeto em que você quer criar o notebook.
Acesse a página do Vertex AI Workbench.
Na guia Notebooks gerenciados pelo usuário, clique em Novo notebook.
Na lista de tipos de notebook, escolha um notebook do Python 3.
Na caixa de diálogo Novo notebook, clique em Opções avançadas e, em Tipo de máquina, selecione o tipo de máquina que você quer usar. Se não tiver certeza, escolha n1-standard-1 (1 cVPU, 3.75 GB de RAM).
Clique em Criar.
Leva alguns instantes para que o ambiente de notebook seja criado.
Quando o notebook for criado, selecione-o e clique em Abrir o JupyterLab.
O ambiente do JupyterLab é aberto no navegador.
Para iniciar uma guia de terminal, selecione File > New > Launcher.
Clique no ícone Terminal na guia Acesso rápido.
No terminal, clone o repositório do GitHub
mlops-on-gcp
:git clone https://github.com/GoogleCloudPlatform/cloud-for-marketing/
Quando o comando for concluído, você verá a pasta
cloud-for-marketing
no navegador de arquivos.
Definir configurações de notebooks
Antes de executar o notebook, é preciso configurá-lo. O notebook requer um bucket do Cloud Storage para armazenar artefatos de pipeline. Portanto, comece criando esse bucket.
- Crie um bucket do Cloud Storage no qual o notebook pode armazenar artefatos de pipeline. O nome do bucket precisa ser globalmente exclusivo.
- Na pasta
cloud-for-marketing/marketing-analytics/predicting/kfp_pipeline/
, abra o notebookPropensity_Pipeline.ipynb
. - No notebook, defina o valor da variável
PROJECT_ID
como o ID do projeto do Google Cloud em que você quer executar o pipeline. - Defina o valor da variável
BUCKET_NAME
como o nome do bucket que você acabou de criar.
O restante deste documento descreve snippets de código que são importantes para entender como o pipeline funciona. Para ver a implementação completa, consulte o repositório do GitHub (em inglês).
Criar a visualização do BigQuery
A primeira etapa no pipeline gera os dados de entrada, que serão usados para criar cada modelo. Este componente do Vertex AI Pipelines gera uma visualização do BigQuery. Para simplificar o processo de criação da visualização, alguns SQL já foram gerados e salvos em um arquivo de texto no GitHub.
O código para cada componente começa decorando (modificando uma classe pai ou
a função por meio de atributos) da classe de componentes Vertex AI Pipelines. Em seguida, o código
define a função create_input_view
, que é uma etapa no pipeline.
A função requer várias entradas. No momento, alguns desses valores são codificados no código, como as datas de início e término. Ao automatizar o pipeline, é possível modificar o código para usar valores adequados (por exemplo, usando a função CURRENT_DATE
para uma data). ou você pode atualizar o componente para usar esses valores como
parâmetros, em vez de mantê-los codificados. Também é necessário alterar o valor de ga_data_ref
para o nome da sua tabela do GA4 e definir o valor da variável conversion
como sua conversão. Neste exemplo, usamos os dados públicos de amostra do GA4.
A listagem a seguir mostra o código para o componente create-input-view
.
@component( # this component builds a BigQuery view, which will be the underlying source for model packages_to_install=["google-cloud-bigquery", "google-cloud-storage"], base_image="python:3.9", output_component_file="output_component/create_input_view.yaml", ) def create_input_view(view_name: str, data_set_id: str, project_id: str, bucket_name: str, blob_path: str ): from google.cloud import bigquery from google.cloud import storage client = bigquery.Client(project=project_id) dataset = client.dataset(data_set_id) table_ref = dataset.table(view_name) ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*' conversion = "hits.page.pageTitle like '%Shopping Cart%'" start_date = '20170101' end_date = '20170131' def get_sql(bucket_name, blob_path): from google.cloud import storage storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) blob = bucket.get_blob(blob_path) content = blob.download_as_string() return content def if_tbl_exists(client, table_ref): ... else: content = get_sql() content = str(content, 'utf-8') create_base_feature_set_query = content. format(start_date = start_date, end_date = end_date, ga_data_ref = ga_data_ref, conversion = conversion) shared_dataset_ref = client.dataset(data_set_id) base_feature_set_view_ref = shared_dataset_ref.table(view_name) base_feature_set_view = bigquery.Table(base_feature_set_view_ref) base_feature_set_view.view_query = create_base_feature_set_query.format(project_id) base_feature_set_view = client.create_table(base_feature_set_view)
Crie o modelo do BigQuery ML
Após a criação da visualização, execute o componente chamado build_bqml_logistic
para
criar um modelo do BigQuery ML. Esse bloco de notebook é um componente essencial. Usando a visualização de treinamento que você criou no primeiro bloco, você cria um modelo do BigQuery ML. Neste exemplo, o notebook usa regressão logística.
Para informações sobre os tipos de modelo e os hiperparâmetros disponíveis, consulte a documentação de referência do BigQuery ML.
A listagem a seguir mostra o código desse componente.
@component( # this component builds a logistic regression with BigQuery ML packages_to_install=["google-cloud-bigquery"], base_image="python:3.9", output_component_file="output_component/create_bqml_model_logistic.yaml" ) def build_bqml_logistic(project_id: str, data_set_id: str, model_name: str, training_view: str ): from google.cloud import bigquery client = bigquery.Client(project=project_id) model_name = f"{project_id}.{data_set_id}.{model_name}" training_set = f"{project_id}.{data_set_id}.{training_view}" build_model_query_bqml_logistic = ''' CREATE OR REPLACE MODEL `{model_name}` OPTIONS(model_type='logistic_reg' , INPUT_LABEL_COLS = ['label'] , L1_REG = 1 , DATA_SPLIT_METHOD = 'RANDOM' , DATA_SPLIT_EVAL_FRACTION = 0.20 ) AS SELECT * EXCEPT (fullVisitorId, label), CASE WHEN label is null then 0 ELSE label end as label FROM `{training_set}` '''.format(model_name = model_name, training_set = training_set) job_config = bigquery.QueryJobConfig() client.query(build_model_query_bqml_logistic, job_config=job_config)
Usar o XGBoost em vez do BigQuery ML
O componente ilustrado na seção anterior usa o BigQuery ML. Na próxima seção de notebooks, você verá como usar o XGBoost diretamente no Python em vez de usar o BigQuery ML.
Execute o componente chamado build_bqml_xgboost
para criar o componente para executar um
modelo de classificação XGBoost padrão com uma pesquisa de grade. Em seguida, o código salva
o modelo como um artefato no bucket do Cloud Storage que você criou.
A função é compatível com outros parâmetros (metrics
e model
) para artefatos de saída. Esses parâmetros são exigidos pelo Vertex AI Pipelines.
@component( # this component builds an xgboost classifier with xgboost packages_to_install=["google-cloud-bigquery", "xgboost", "pandas", "sklearn", "joblib", "pyarrow"], base_image="python:3.9", output_component_file="output_component/create_xgb_model_xgboost.yaml" ) def build_xgb_xgboost(project_id: str, data_set_id: str, training_view: str, metrics: Output[Metrics], model: Output[Model] ): ... data_set = f"{project_id}.{data_set_id}.{training_view}" build_df_for_xgboost = ''' SELECT * FROM `{data_set}` '''.format(data_set = data_set) ... xgb_model = XGBClassifier(n_estimators=50, objective='binary:hinge', silent=True, nthread=1, eval_metric="auc") random_search = RandomizedSearchCV(xgb_model, param_distributions=params, n_iter=param_comb, scoring='precision', n_jobs=4, cv=skf.split(X_train,y_train), verbose=3, random_state=1001 ) random_search.fit(X_train, y_train) xgb_model_best = random_search.best_estimator_ predictions = xgb_model_best.predict(X_test) score = accuracy_score(y_test, predictions) auc = roc_auc_score(y_test, predictions) precision_recall = precision_recall_curve(y_test, predictions) metrics.log_metric("accuracy",(score * 100.0)) metrics.log_metric("framework", "xgboost") metrics.log_metric("dataset_size", len(df)) metrics.log_metric("AUC", auc) dump(xgb_model_best, model.path + ".joblib")
Criar um endpoint
Execute o componente chamado deploy_xgb
para criar um endpoint usando o
modelo XGBoost da seção anterior. O componente usa o artefato
do modelo XGBoost anterior, cria um contêiner e implanta o endpoint, além de
fornecer o URL do endpoint como um artefato para visualizá-lo. Quando
essa etapa for concluída, um endpoint da Vertex AI será criado e
poderá ser visualizado na página do console da Vertex AI.
@component( # Deploys xgboost model packages_to_install=["google-cloud-aiplatform", "joblib", "sklearn", "xgboost"], base_image="python:3.9", output_component_file="output_component/xgboost_deploy_component.yaml", ) def deploy_xgb( model: Input[Model], project_id: str, vertex_endpoint: Output[Artifact], vertex_model: Output[Model] ): from google.cloud import aiplatform aiplatform.init(project=project_id) deployed_model = aiplatform.Model.upload( display_name="tai-propensity-test-pipeline", artifact_uri = model.uri.replace("model", ""), serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest" ) endpoint = deployed_model.deploy(machine_type="n1-standard-4") # Save data to the output params vertex_endpoint.uri = endpoint.resource_name vertex_model.uri = deployed_model.resource_name
Definir o pipeline
Para definir o pipeline, defina cada operação com base nos componentes criados anteriormente. Em seguida, especifique a ordem dos elementos do pipeline se eles não forem explicitamente chamados no componente.
Por exemplo, o código a seguir no notebook define um pipeline. Nesse
caso, o código exige que o componente build_bqml_logistic_op
seja executado após o
componente create_input_view_op
.
@dsl.pipeline( # Default pipeline root. You can override it when submitting the pipeline. pipeline_root=PIPELINE_ROOT, # A name for the pipeline. name="pipeline-test", description='Propensity BigQuery ML Test' ) def pipeline(): create_input_view_op = create_input_view( view_name = VIEW_NAME, data_set_id = DATA_SET_ID, project_id = PROJECT_ID, bucket_name = BUCKET_NAME, blob_path = BLOB_PATH ) build_bqml_logistic_op = build_bqml_logistic( project_id = PROJECT_ID, data_set_id = DATA_SET_ID, model_name = 'bqml_logistic_model', training_view = VIEW_NAME ) # several components have been deleted for brevity build_bqml_logistic_op.after(create_input_view_op) build_bqml_xgboost_op.after(create_input_view_op) build_bqml_automl_op.after(create_input_view_op) build_xgb_xgboost_op.after(create_input_view_op) evaluate_bqml_logistic_op.after(build_bqml_logistic_op) evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op) evaluate_bqml_automl_op.after(build_bqml_automl_op)
Compilar e executar o pipeline
Agora é possível compilar e executar o pipeline.
O código a seguir no notebook define o valor enable_caching
como verdadeiro para ativar o armazenamento em cache. Quando o armazenamento em cache é ativado, as execuções anteriores em que um componente foi concluído não são executadas novamente. Essa sinalização é útil
especialmente quando você está testando o pipeline porque, quando o armazenamento em cache está ativado, a
execução é concluída mais rapidamente e usa menos recursos.
compiler.Compiler().compile( pipeline_func=pipeline, package_path="pipeline.json" ) TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S") run = pipeline_jobs.PipelineJob( display_name="test-pipeine", template_path="pipeline.json", job_id="test-{0}".format(TIMESTAMP), enable_caching=True ) run.run()
Automatize o pipeline
Nesta etapa, você iniciou o primeiro pipeline. Acesse o console Vertex AI Pipelines no console para ver o status desse job. Você pode observar como cada contêiner é criado e executado. Também é possível rastrear erros em componentes específicos nesta seção clicando em cada um deles.
Para programar o pipeline, crie uma função do Cloud e use um programador semelhante a um cron job.
O código na última seção do notebook programa o pipeline para ser executado uma vez por dia, conforme mostrado no snippet de código a seguir:
from kfp.v2.google.client import AIPlatformClient api_client = AIPlatformClient(project_id=PROJECT_ID, region='us-central1' ) api_client.create_schedule_from_job_spec( job_spec_path='pipeline.json', schedule='0 * * * *', enable_caching=False )
Usar o canal concluído em produção
O pipeline concluído executou as seguintes tarefas:
- criou um conjunto de dados de entrada.
- treinou vários modelos usando o BigQuery ML e o XGBoost do Python.
- analisou resultados de modelos.
- implantou o modelo XGBoost.
Você também automatizou o pipeline usando o Cloud Functions e o Cloud Scheduler para execução diária.
O pipeline definido no notebook foi criado para ilustrar maneiras de criar vários modelos. No entanto, isso não seria possível porque ele já foi criado em um cenário de produção. No entanto, é possível usar esse pipeline como um guia e modificar os componentes para atender às suas necessidades. Por exemplo, é possível editar o processo de criação de recursos para aproveitar seus dados, modificar períodos e, talvez, criar modelos alternativos. Escolha também o modelo entre aqueles ilustrados que atendam melhor aos seus requisitos de produção.
Quando o pipeline estiver pronto para produção, será possível implementar tarefas adicionais. Por exemplo, é possível implementar um modelo campeão/desafiador, em que a cada dia um novo modelo é criado e o novo modelo (o desafiador) e o atual (o campeão) são pontuados em novos dados. Você coloca o novo modelo em produção somente se o desempenho dele for melhor do que o atual. Para monitorar o progresso do seu sistema, mantenha um registro do desempenho do modelo de cada dia e visualize o desempenho em alta.
A seguir
- Para saber como usar MLOps para criar sistemas de ML prontos para produção, consulte Guia de práticas para MLOps.
- Para saber mais sobre a Vertex AI, consulte a documentação da Vertex AI.
- Para saber mais sobre o Kubeflow Pipelines, consulte a documentação do KFP (em inglês).
- Para saber mais sobre o TensorFlow Extended, consulte o Guia do usuário do TFX.
- Para mais arquiteturas de referência, diagramas e práticas recomendadas, confira a Central de arquitetura do Cloud.
Colaboradores
Autor: Tai Conley | Engenheiro de clientes do Cloud
Outro colaborador: Lars Ahlfors | Engenheiro de clientes do Cloud