Crie um pipeline para a preparação contínua de modelos

Este documento explica os passos necessários para criar um pipeline que prepara automaticamente um modelo personalizado de forma periódica ou quando são inseridos novos dados no conjunto de dados através das funções do Cloud Run e dos pipelines da Vertex AI.

Objetivos

Os seguintes passos abrangem este processo:

  1. Adquira e prepare o conjunto de dados no BigQuery.

  2. Crie e carregue um pacote de preparação personalizado. Quando executado, lê dados do conjunto de dados e prepara o modelo.

  3. Crie um Vertex AI Pipeline. Este pipeline executa o pacote de preparação personalizado, carrega o modelo para o Registo de modelos do Vertex AI, executa a tarefa de avaliação e envia uma notificação por email.

  4. Execute o pipeline manualmente.

  5. Crie uma Cloud Function com um acionador do Eventarc que execute o pipeline sempre que novos dados forem inseridos no conjunto de dados do BigQuery.

Antes de começar

Configure o seu projeto e notebook.

Configuração do projeto

  1. In the Google Cloud console, go to the project selector page.

    Go to project selector

  2. Select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
  3. Verify that billing is enabled for your Google Cloud project.

Criar bloco de notas

Usamos um bloco de notas do Colab Enterprise para executar parte do código neste tutorial.

  1. Se não for o proprietário do projeto, peça a um proprietário do projeto que lhe conceda as funções de IAM roles/resourcemanager.projectIamAdmin e roles/aiplatform.colabEnterpriseUser.

    Tem de ter estas funções para usar o Colab Enterprise e conceder funções e autorizações do IAM a si próprio e a contas de serviço.

    Aceda ao IAM

  2. Na Google Cloud consola, aceda à página Blocos de notas do Colab Enterprise.

    O Colab Enterprise pede-lhe que ative as seguintes APIs necessárias se ainda não estiverem ativadas.

    • API Vertex AI
    • API Dataform
    • API Compute Engine

    Aceda ao Colab Enterprise

  3. No menu Região, selecione a região onde quer criar o seu bloco de notas. Se não tiver a certeza, use us-central1 como região.

    Use a mesma região para todos os recursos neste tutorial.

  4. Clique em Criar um novo bloco de notas.

O novo bloco de notas é apresentado no separador Os meus blocos de notas. Para executar código no seu bloco de notas, adicione uma célula de código e clique no botão  Executar célula.

Configure o ambiente de programação

  1. No seu bloco de notas, instale os seguintes pacotes Python3.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Defina o projeto da CLI Google Cloud executando o seguinte comando:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Substitua PROJECT_ID pelo ID do seu projeto. Se necessário, pode localizar o ID do projeto na Google Cloud consola.

  3. Atribua funções à sua Conta Google:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Ative as seguintes APIs

    • API Artifact Registry
    • API BigQuery
    • API Cloud Build
    • Cloud Functions API
    • Cloud Logging API
    • Pub/Sub API
    • Cloud Run Admin API
    • API Cloud Storage
    • API Eventarc
    • API Service Usage
    • API Vertex AI
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Conceda funções às contas de serviço do seu projeto:

    1. Veja os nomes das suas contas de serviço

      ! gcloud iam service-accounts list
      

      Tome nota do nome do agente do serviço Compute. Deve estar no formato xxxxxxxx-compute@developer.gserviceaccount.com.

    2. Conceda as funções necessárias ao agente do serviço.

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
      

Adquira e prepare o conjunto de dados

Neste tutorial, vai criar um modelo que prevê a tarifa de uma viagem de táxi com base em funcionalidades como o tempo de viagem, a localização e a distância. Vamos usar dados do conjunto de dados público Chicago Taxi Trips. Este conjunto de dados inclui viagens de táxi de 2013 até ao presente, comunicadas à cidade de Chicago na sua função de organismo regulador. Para proteger a privacidade dos condutores e dos utilizadores do táxi em simultâneo e permitir que o agregador analise os dados, o ID do táxi é mantido consistente para qualquer número de medalhão de táxi específico, mas não mostra o número. Os setores censitários são suprimidos em alguns casos e as horas são arredondadas aos 15 minutos mais próximos.

Para mais informações, consulte o artigo Viagens de táxi em Chicago no Marketplace.

Crie um conjunto de dados do BigQuery

  1. Na Google Cloud consola, aceda ao BigQuery Studio.

    Aceda ao BigQuery

  2. No painel Explorador, localize o seu projeto, clique em Ações e, de seguida, clique em Criar conjunto de dados.

  3. Na página Criar conjunto de dados:

    • Para o ID do conjunto de dados, introduza mlops. Para mais informações, consulte o artigo sobre a nomenclatura de conjuntos de dados.

    • Em Tipo de localização, escolha a sua região múltipla. Por exemplo, escolha US (várias regiões nos Estados Unidos) se estiver a usar o us-central1. Após a criação de um conjunto de dados, não é possível alterar a localização.

    • Clique em Criar conjunto de dados.

Para mais informações, veja como criar conjuntos de dados.

Crie e preencha a tabela do BigQuery

Nesta secção, cria a tabela e importa dados relativos a um ano do conjunto de dados público para o conjunto de dados do seu projeto.

  1. Aceda ao BigQuery Studio

    Aceda ao BigQuery

  2. Clique em Criar consulta SQL e execute a seguinte consulta SQL clicando em Executar.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta cria a tabela <PROJECT_ID>.mlops.chicago e preenche-a com dados da tabela pública bigquery-public-data.chicago_taxi_trips.taxi_trips.

  3. Para ver o esquema da tabela, clique em Aceder à tabela e, de seguida, clique no separador Esquema.

  4. Para ver o conteúdo da tabela, clique no separador Pré-visualizar.

Crie e carregue o pacote de preparação personalizado

Nesta secção, cria um pacote Python que contém o código que lê o conjunto de dados, divide os dados em conjuntos de preparação e de teste, e prepara o seu modelo personalizado. O pacote é executado como uma das tarefas no seu pipeline. Para mais informações, consulte o artigo criar uma aplicação de preparação Python para um contentor pré-criado.

Crie o pacote de preparação personalizado

  1. No seu bloco de notas do Colab, crie pastas principais para a aplicação de preparação:

    !mkdir -p training_package/trainer
    
  2. Crie um ficheiro __init__.py em cada pasta para a tornar um pacote através do seguinte comando:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Pode ver os novos ficheiros e pastas no painel da pasta Ficheiros.

  3. No painel Ficheiros, crie um ficheiro denominado task.py na pasta training_package/trainer com o seguinte conteúdo.

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.joblib"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    O código realiza as seguintes tarefas:

    1. Seleção de funcionalidades.
    2. Transformar a hora dos dados de recolha e entrega de UTC para a hora local de Chicago.
    3. Extrair a data, a hora, o dia da semana, o mês e o ano da data/hora de recolha.
    4. Calcular a duração da viagem com base na hora de início e de fim.
    5. Identificar e marcar viagens que começaram ou terminaram num aeroporto com base nas áreas comunitárias.
    6. O modelo de regressão de floresta aleatória é preparado para prever a tarifa da viagem de táxi através da framework scikit-learn.
    7. O modelo preparado é guardado num ficheiro pickle model.joblib.

      A abordagem e a engenharia de funcionalidades selecionadas baseiam-se na exploração e na análise de dados sobre a previsão da tarifa de táxi de Chicago.

  4. No painel Ficheiros, crie um ficheiro denominado setup.py na pasta training_package com o seguinte conteúdo.

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. No bloco de notas, execute setup.py para criar a distribuição de origem para a sua aplicação de preparação:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

No final desta secção, o painel Ficheiros deve conter os seguintes ficheiros e pastas em training-package.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Carregue o pacote de treino personalizado para o Cloud Storage

  1. Crie um contentor do Cloud Storage.

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
    

    Substitua os seguintes valores de parâmetros:

    • REGION: escolha a mesma região que escolhe quando cria o seu bloco de notas do Colab.

    • BUCKET_NAME: o nome do contentor.

  2. Carregue o pacote de preparação para o contentor do Cloud Storage.

    # Copy the training package to the bucket
    ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Crie o seu pipeline

Um pipeline é uma descrição de um fluxo de trabalho de MLOps como um gráfico de passos denominado tarefas de pipeline.

Nesta secção, define as tarefas do pipeline, compila-as para YAML e regista o pipeline no Artifact Registry para que possa ser controlado por versões e executado várias vezes por um único utilizador ou por vários utilizadores.

Segue-se uma visualização das tarefas, incluindo a preparação do modelo, o carregamento do modelo, a avaliação do modelo e a notificação por email, no nosso pipeline:

Visualização de pipelines

Para mais informações, consulte o artigo sobre criar modelos de pipelines.

Defina constantes e inicialize clientes

  1. No bloco de notas, defina as constantes que vão ser usadas nos passos posteriores:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Substitua NOTIFY_EMAIL por um endereço de email. Quando a tarefa do pipeline é concluída, com êxito ou sem êxito, é enviado um email para esse endereço de email.

  2. Inicialize o SDK do Vertex AI com o projeto, o contentor de preparação, a localização e a experiência.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Defina as tarefas da conduta

No bloco de notas, defina o pipeline custom_model_training_evaluation_pipeline:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

O seu pipeline consiste num gráfico de tarefas que usam os seguintes Google Cloud componentes do pipeline:

Compile a pipeline

Compile o pipeline através do compilador do Kubeflow Pipelines (KFP) num ficheiro YAML que contenha uma representação hermética do seu pipeline.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

Deve ver um ficheiro YAML denominado vertex-pipeline-datatrigger-tutorial.yaml no diretório de trabalho.

Carregue o pipeline como um modelo

  1. Crie um repositório do tipo KFP no Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Carregue o pipeline compilado para o repositório.

    from kfp.registry import RegistryClient
    
    host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. Na Google Cloud consola, verifique se o seu modelo aparece em Modelos de pipeline.

    Aceda aos modelos de pipelines

Execute manualmente a tubagem

Para se certificar de que o pipeline funciona, execute-o manualmente.

  1. No bloco de notas, especifique os parâmetros necessários para executar o pipeline como uma tarefa.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Crie e execute uma tarefa de pipeline.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    A tarefa demora cerca de 30 minutos a ser concluída.

  3. Na consola, deve ver uma nova execução do pipeline na página Pipelines:

    Aceda a Execuções de pipelines

  4. Após a conclusão da execução do pipeline, deve ver um novo modelo denominado taxifare-prediction-model ou uma nova versão do modelo no Registo de modelos da Vertex AI:

    Aceda ao Registo de modelos

  5. Também deve ver uma nova tarefa de previsão em lote:

    Aceda a Previsões em lote

Executar automaticamente o pipeline

Existem duas formas de executar automaticamente o pipeline: de acordo com um agendamento ou quando são inseridos novos dados no conjunto de dados.

Execute o pipeline com base numa programação

  1. No seu bloco de notas, chame PipelineJob.create_schedule.

    job_schedule = job.create_schedule(
      display_name="mlops tutorial schedule",
      cron="0 0 1 * *", #
      max_concurrent_run_count=1,
      max_run_count=12,
    )
    

    A expressão cron agenda a execução da tarefa no dia 1 de cada mês às 00:00 UTC.

    Para este tutorial, não queremos que vários trabalhos sejam executados em simultâneo, pelo que definimos max_concurrent_run_count como 1.

  2. Na Google Cloud consola, verifique se o seu schedule aparece em Agendamentos de pipelines.

    Aceda a Programações de pipelines

Executar o pipeline quando existirem novos dados

Crie uma função com um acionador do Eventarc

Crie uma função na nuvem (2.ª geração) que execute o pipeline sempre que novos dados forem inseridos na tabela do BigQuery.

Especificamente, usamos um Eventarc para acionar a função sempre que ocorre um evento google.cloud.bigquery.v2.JobService.InsertJob. Em seguida, a função executa o modelo de pipeline.

Para mais informações, consulte Acionadores do Eventarc e tipos de eventos suportados.

  1. Na Google Cloud consola, aceda às funções do Cloud Run.

    Aceder às funções do Cloud Run

  2. Clique no botão Criar função. Na página Configuração:

    1. Selecione 2.ª geração como ambiente.

    2. Para Nome da função, use mlops.

    3. Para Região, selecione a mesma região que o seu contentor do Cloud Storage e o repositório do Artifact Registry.

    4. Para Acionador, selecione Outro acionador. É aberto o painel Eventarc Trigger.

      1. Para Tipo de acionador, escolha Origens Google.

      2. Para Fornecedor de eventos, escolha BigQuery.

      3. Para Tipo de evento, escolha google.cloud.bigquery.v2.JobService.InsertJob.

      4. Para Recurso, escolha Recurso específico e especifique a tabela do BigQuery

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. No campo Região, selecione uma localização para o acionador do Eventarc, se aplicável. Consulte Localização do acionador para mais informações.

      6. Clique em Guardar acionador.

    5. Se lhe for pedido que conceda funções a contas de serviço, clique em Conceder tudo.

  3. Clique em Seguinte para aceder à página Código. Na página Código:

    1. Defina o ambiente de execução como Python 3.12.

    2. Defina o Ponto de entrada como mlops_entrypoint.

    3. Com o editor inline, abra o ficheiro main.py e substitua o conteúdo pelo seguinte:

      Substitua PROJECT_ID,REGION e BUCKET_NAME pelos valores que usou anteriormente.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Abra o ficheiro requirements.txt e substitua o conteúdo pelo seguinte:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Clique em Implementar para implementar a função.

Insira dados para acionar a conduta

  1. Na Google Cloud consola, aceda ao BigQuery Studio.

    Aceda ao BigQuery

  2. Clique em Criar consulta SQL e execute a seguinte consulta SQL clicando em Executar.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta SQL para inserir novas linhas na tabela.

  3. Para verificar se o evento foi acionado, pesquise pipeline trigger condition met no registo da sua função.

    Aceder às funções do Cloud Run

  4. Se a função for acionada com êxito, deve ver uma nova execução do pipeline no Vertex AI Pipelines. A tarefa de pipeline demora cerca de 30 minutos a ser concluída.

    Aceder ao Vertex AI Pipelines

Limpar

Para limpar todos os Google Cloud recursos usados para este projeto, pode eliminar o Google Cloud projeto que usou para o tutorial.

Caso contrário, pode eliminar os recursos individuais que criou para este tutorial.

  1. Elimine o bloco de notas do Colab Enterprise.

    Aceda ao Colab Enterprise

  2. Elimine o conjunto de dados no BigQuery.

    Aceda ao BigQuery

  3. Elimine o contentor do Cloud Storage.

    Aceda ao Cloud Storage

  4. Elimine o modelo da seguinte forma:

    1. Na secção Vertex AI, aceda à página Registo de modelos.

      Aceda à página Registo de modelos

    2. Junto ao nome do modelo, clique no menu Ações e escolha Eliminar modelo.

  5. Elimine as execuções da pipeline:

    1. Aceda à página Execuções de pipelines.

      Aceda a Execuções de pipelines

    2. Junto ao nome de cada execução do pipeline, clique no menu Ações e escolha Eliminar execução do pipeline.

  6. Elimine as tarefas de preparação personalizada:

    1. Aceda a Tarefas de preparação personalizadas

    2. Junto ao nome de cada tarefa de preparação personalizada, clique no menu Ações e escolha Eliminar tarefa de preparação personalizada.

  7. Elimine as tarefas de previsão em lote da seguinte forma:

    1. Aceda à página Previsões em lote

    2. Junto ao nome de cada tarefa de previsão em lote, clique no menu Ações e escolha Eliminar tarefa de previsão em lote.

  8. Elimine o repositório do Artifact Registry.

    Aceda ao Artifact Registry

  9. Elimine a função do Google Cloud.

    Aceder às funções do Cloud Run