Pipeline für kontinuierliches Modelltraining erstellen

In diesem Dokument erfahren Sie, wie Sie eine Pipeline erstellen, die ein benutzerdefiniertes Modell automatisch entweder in regelmäßigen Abständen oder wenn neue Daten in den Datensatz eingefügt werden, mithilfe von Vertex AI Pipelines und Cloud Run-Funktionen trainiert.

Lernziele

Dieser Vorgang umfasst die folgenden Schritte:

  1. Rufen Sie in BigQuery ein Dataset ab und bereiten Sie es vor.

  2. Erstellen Sie ein benutzerdefiniertes Trainingspaket und laden Sie es hoch. Bei der Ausführung liest es Daten aus dem Dataset und trainiert das Modell.

  3. Erstellen Sie eine Vertex AI-Pipeline. Diese Pipeline führt das benutzerdefinierte Trainingspaket aus, lädt das Modell in die Vertex AI Model Registry hoch, führt den Bewertungsjob aus und sendet eine E-Mail-Benachrichtigung.

  4. Pipeline manuell ausführen

  5. Erstellen Sie eine Cloud Function mit einem Eventarc-Trigger, der die Pipeline ausführt, wenn neue Daten in das BigQuery-Dataset eingefügt werden.

Vorbereitung

Richten Sie Ihr Projekt und Notebook ein.

Projekt einrichten

  1. In the Google Cloud console, go to the project selector page.

    Go to project selector

  2. Select or create a Google Cloud project.

  3. Make sure that billing is enabled for your Google Cloud project.

Notebook erstellen

Wir verwenden ein Colab Enterprise-Notebook, um einen Teil des Codes in dieser Anleitung auszuführen.

  1. Wenn Sie nicht der Projektinhaber sind, bitten Sie einen Projektinhaber, Ihnen die IAM-Rollen roles/resourcemanager.projectIamAdmin und roles/aiplatform.colabEnterpriseUser zuzuweisen.

    Sie benötigen diese Rollen, um Colab Enterprise verwenden und sich selbst und Dienstkonten IAM-Rollen und ‑Berechtigungen zuweisen zu können.

    IAM aufrufen

  2. Rufen Sie in der Google Cloud Console die Colab Enterprise-Seite Notebooks auf.

    In Colab Enterprise werden Sie aufgefordert, die folgenden erforderlichen APIs zu aktivieren, falls noch nicht geschehen.

    • Vertex AI API
    • Dataform API
    • Compute Engine API

    Zu Colab Enterprise

  3. Wählen Sie im Menü Region die Region aus, in der Sie Ihr Notebook erstellen möchten. Wenn Sie sich nicht sicher sind, verwenden Sie us-central1 als Region.

    Verwenden Sie für alle Ressourcen in dieser Anleitung dieselbe Region.

  4. Klicken Sie auf Neues Notizbuch erstellen.

Ihr neues Notebook wird auf dem Tab Meine Notebooks angezeigt. Wenn Sie Code in Ihrem Notebook ausführen möchten, fügen Sie eine Codezelle hinzu und klicken Sie auf die Schaltfläche  Zelle ausführen.

Entwicklungsumgebung einrichten

  1. Installieren Sie in Ihrem Notebook die folgenden Python3-Pakete.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Legen Sie das Google Cloud CLI-Projekt fest:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID. Sie finden Ihre Projekt-ID gegebenenfalls in der Google Cloud Console.

  3. Gewähren Sie Ihrem Google-Konto Rollen:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Aktivieren Sie folgende APIs

    • Artifact Registry API
    • BigQuery API
    • Cloud Build API
    • Cloud Functions API
    • Cloud Logging API
    • Pub/Sub API
    • Cloud Run Admin API
    • Cloud Storage API
    • Eventarc API
    • Service Usage API
    • Vertex AI API
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Weisen Sie den Dienstkonten Ihres Projekts Rollen zu:

    1. Sehen Sie sich die Namen Ihrer Dienstkonten an

      ! gcloud iam service-accounts list
      

      Notieren Sie sich den Namen Ihres Compute-Dienstmitarbeiters. Er sollte das Format xxxxxxxx-compute@developer.gserviceaccount.com haben.

    2. Weisen Sie dem Dienst-Agent die erforderlichen Rollen zu.

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
      

Dataset abrufen und vorbereiten

In dieser Anleitung erstellen Sie ein Modell, mit dem die Fahrtkosten für eine Taxifahrt anhand von Merkmalen wie Fahrtzeit, Standort und Entfernung vorhergesagt werden. Wir verwenden Daten aus dem öffentlichen Dataset Chicago Taxi Trips. Dieses Dataset beinhaltet Taxifahrten von 2013 bis heute, die der Stadt Chicago in ihrer Rolle als Regulierungsbehörde gemeldet wurden. Um die Privatsphäre der Fahrer und Nutzer gleichzeitig zu schützen und dem Aggregator die Möglichkeit zu geben, die Daten zu analysieren, wird die Taxi-ID für jede Taxi-Medaillonnummer konsistent, ohne die Nummer zu zeigen. Erhebungsgebiete werden in einigen Fällen unterdrückt und die Zeiten werden auf die nächsten 15 Minuten aufgerundet.

Weitere Informationen finden Sie unter Chicago Taxi Trips im Marketplace.

Erstellen Sie ein BigQuery-Dataset

  1. Wechseln Sie in der Google Cloud Console zu BigQuery Studio.

    BigQuery aufrufen

  2. Suchen Sie im Bereich Explorer nach Ihrem Projekt, klicken Sie auf Aktionen und dann auf Dataset erstellen.

  3. Führen Sie auf der Seite Dataset erstellen die folgenden Schritte aus:

    • Geben Sie unter Dataset-ID mlops ein. Weitere Informationen finden Sie unter Datasets benennen.

    • Wählen Sie unter Standorttyp die Option „Mehrere Regionen“ aus. Wählen Sie beispielsweise USA (mehrere Regionen in den Vereinigten Staaten) aus, wenn Sie us-central1 verwenden. Nachdem ein Dataset erstellt wurde, kann der Standort nicht mehr geändert werden.

    • Klicken Sie auf Dataset erstellen.

Weitere Informationen finden Sie unter Datasets erstellen.

BigQuery-Tabelle erstellen und füllen

In diesem Abschnitt erstellen Sie die Tabelle und importieren Daten aus dem öffentlichen Dataset für ein Jahr in das Dataset Ihres Projekts.

  1. Zu BigQuery Studio

    BigQuery aufrufen

  2. Klicken Sie auf SQL-Abfrage erstellen und führen Sie die folgende SQL-Abfrage aus. Klicken Sie dazu auf Ausführen.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Mit dieser Abfrage wird die Tabelle <PROJECT_ID>.mlops.chicago erstellt und mit Daten aus der öffentlichen Tabelle bigquery-public-data.chicago_taxi_trips.taxi_trips gefüllt.

  3. Um das Tabellenschema aufzurufen, klicken Sie auf Zur Tabelle und dann auf den Tab Schema.

  4. Wenn Sie den Tabelleninhalt sehen möchten, klicken Sie auf den Tab Vorschau.

Erstellen Sie das benutzerdefinierte Trainingspaket und laden Sie es hoch

In diesem Abschnitt erstellen Sie ein Python-Paket mit dem Code, der den Datensatz liest, die Daten in Trainings- und Test-Sets aufteilt und Ihr benutzerdefiniertes Modell trainiert. Das Paket wird als eine der Aufgaben in Ihrer Pipeline ausgeführt. Weitere Informationen finden Sie unter Python-Trainingsanwendung für einen vordefinierten Container erstellen.

Benutzerdefiniertes Trainingspaket erstellen

  1. Erstellen Sie in Ihrem Colab-Notebook übergeordnete Ordner für die Trainingsanwendung:

    !mkdir -p training_package/trainer
    
  2. Erstellen Sie mit dem folgenden Befehl in jedem Ordner eine Datei __init__.py, um dies zu einem Paket zu machen:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Die neuen Dateien und Ordner finden Sie im klicken. Ordner Dateien.

  3. Erstellen Sie im Bereich Dateien im Ordner training_package/trainer eine Datei namens task.py mit folgendem Inhalt:

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    Der Code führt folgende Aufgaben aus:

    1. Auswahl von Merkmalen.
    2. Umwandeln der Abhol- und Ablieferungs-Datenzeit von UTC in die Ortszeit von Chicago.
    3. Extrahieren von Datum, Stunde, Wochentag, Monat und Jahr aus der DateTime der Abholung.
    4. Berechnen der Dauer der Fahrt anhand der Start- und Endzeit.
    5. Identifizieren und Markieren von Fahrten, die an einem Flughafen gestartet oder beendet wurden, basierend auf den Gemeindegebiete.
    6. Das Random Forest-Regressionsmodell wird mit dem scikit-learn-Framework trainiert, um den Fahrpreis für die Taxifahrt vorherzusagen.
    7. Das trainierte Modell wird in einer Pickle-Datei model.pkl gespeichert.

      Der ausgewählte Ansatz und die Feature-Entwicklung basieren auf der explorativen Datenanalyse und der Analyse Predicting Chicago Taxi Fare.

  4. Erstellen Sie im Bereich Dateien im Ordner training_package eine Datei namens setup.py mit folgendem Inhalt:

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. Führen Sie in Ihrem Notebook setup.py aus, um die Quelldistribution für Ihre Trainingsanwendung zu erstellen:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

Am Ende dieses Abschnitts sollte der Bereich Dateien unter training-package die folgenden Dateien und Ordner enthalten.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Benutzerdefiniertes Trainingspaket in Cloud Storage hochladen

  1. Cloud Storage-Bucket erstellen

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
    

    Ersetzen Sie die folgenden Parameterwerte:

    • REGION: Wählen Sie dieselbe Region aus, die Sie beim Erstellen des Colab-Notebooks ausgewählt haben.

    • BUCKET_NAME: Der Bucket-Name.

  2. Laden Sie das Trainingspaket in den Cloud Storage-Bucket hoch.

    # Copy the training package to the bucket
    ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Pipeline erstellen

Eine Pipeline ist eine Beschreibung eines MLOps-Workflows als Graph von Schritten, die als Pipelineaufgaben bezeichnet werden.

In diesem Abschnitt definieren Sie Ihre Pipelineaufgaben, kompilieren sie in YAML und registrieren die Pipeline in Artifact Registry, damit sie versioniert und mehrmals von einem einzelnen Nutzer oder von mehreren Nutzern ausgeführt werden kann.

Hier sehen Sie eine Visualisierung der Aufgaben in unserer Pipeline, einschließlich Modelltraining, Modellupload, Modellbewertung und E-Mail-Benachrichtigung:

Pipelinevisualisierung

Weitere Informationen finden Sie unter Pipelinevorlagen erstellen.

Konstanten definieren und Clients initialisieren

  1. Definieren Sie in Ihrem Notebook die Konstanten, die in späteren Schritten verwendet werden:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Ersetzen Sie NOTIFY_EMAIL durch eine E-Mail-Adresse. Unabhängig davon, ob der Pipeline-Job erfolgreich oder nicht erfolgreich abgeschlossen wurde, wird eine E-Mail an diese E-Mail-Adresse gesendet.

  2. Initialisieren Sie das Vertex AI SDK mit dem Projekt, dem Staging-Bucket, dem Speicherort und dem Test.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Pipelineaufgaben definieren

Definieren Sie in Ihrem Notebook die Pipeline custom_model_training_evaluation_pipeline:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

Ihre Pipeline besteht aus einem Graph von Aufgaben, die die folgenden Google Cloud-Pipeline-Komponenten verwenden:

Pipeline kompilieren

Kompilieren Sie die Pipeline mit dem Kubeflow Pipelines (KFP) Compiler in eine YAML-Datei, die eine hermetische Darstellung Ihrer Pipeline enthält.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

In Ihrem Arbeitsverzeichnis sollte eine YAML-Datei mit dem Namen vertex-pipeline-datatrigger-tutorial.yaml angezeigt werden.

Pipeline als Vorlage hochladen

  1. Erstellen Sie ein Repository vom Typ KFP in Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Laden Sie die kompilierte Pipeline in das Repository hoch.

    from kfp.registry import RegistryClient
    
    host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. Prüfen Sie in der Google Cloud Console, ob Ihre Vorlage unter Pipeline-Vorlagen angezeigt wird.

    Pipeline-Vorlagen aufrufen

Pipeline manuell ausführen

Führen Sie die Pipeline manuell aus, um sicherzustellen, dass sie funktioniert.

  1. Geben Sie in Ihrem Notebook die Parameter an, die zum Ausführen der Pipeline als Job erforderlich sind.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Erstellen Sie einen Pipelinejob und führen Sie ihn aus.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    Der Vorgang dauert etwa 30 Minuten.

  3. In der Console sollte auf der Seite Pipelines ein neuer Pipelinelauf angezeigt werden:

    Zu den Pipeline-Ausführungen

  4. Nach Abschluss der Pipelineausführung sollte in der Vertex AI Model Registry entweder ein neues Modell mit dem Namen taxifare-prediction-model oder eine neue Modellversion angezeigt werden:

    Zu Model Registry

  5. Außerdem sollte ein neuer Batchvorhersagejob angezeigt werden:

    Zu "Batchvorhersagen"

Pipeline automatisch ausführen

Es gibt zwei Möglichkeiten, die Pipeline automatisch auszuführen: nach einem Zeitplan oder wenn neue Daten in den Datensatz eingefügt werden.

Pipeline nach Zeitplan ausführen

  1. Rufen Sie in Ihrem Notebook PipelineJob.create_schedule auf.

    job_schedule = job.create_schedule(
      display_name="mlops tutorial schedule",
      cron="0 0 1 * *", #
      max_concurrent_run_count=1,
      max_run_count=12,
    )
    

    Mit dem Ausdruck cron wird der Job so geplant, dass er jeden 1. des Monats um 00:00 Uhr UTC ausgeführt wird.

    In dieser Anleitung sollen nicht mehrere Jobs gleichzeitig ausgeführt werden. Daher setzen wir max_concurrent_run_count auf 1.

  2. Prüfen Sie in der Google Cloud Console, ob Ihre schedule in den Zeitplänen für Pipelines aufgeführt ist.

    Zu „Pipelines“ > „Zeitpläne“

Pipeline bei neuen Daten ausführen

Funktion mit Eventarc-Trigger erstellen

Erstellen Sie eine Cloud Functions-Funktion (2. Generation), die die Pipeline ausführt, wenn neue Daten in die BigQuery-Tabelle eingefügt werden.

Insbesondere verwenden wir einen Eventarc, um die Funktion jedes Mal auszulösen, wenn ein google.cloud.bigquery.v2.JobService.InsertJob-Ereignis auftritt. Die Funktion führt dann die Pipeline-Vorlage aus.

Weitere Informationen finden Sie unter Eventarc-Trigger und Unterstützte Ereignistypen.

  1. Rufen Sie in der Google Cloud Console die Cloud Run-Funktionen auf.

    Zu den Cloud Run-Funktionen

  2. Klicken Sie auf die Schaltfläche Funktion erstellen. Auf der Seite Konfiguration:

    1. Wählen Sie als Umgebung 2nd gen aus.

    2. Geben Sie als Funktionsname mlops ein.

    3. Wählen Sie unter Region dieselbe Region wie für Ihren Cloud Storage-Bucket und Ihr Artifact Registry-Repository aus.

    4. Wählen Sie unter Trigger die Option Sonstige Trigger aus. Der Bereich Eventarc-Trigger wird geöffnet.

      1. Wählen Sie als Triggertyp die Option Google-Quellen aus.

      2. Wählen Sie als Ereignisanbieter BigQuery aus.

      3. Wählen Sie als Ereignistyp google.cloud.bigquery.v2.JobService.InsertJob aus.

      4. Wählen Sie unter Ressource die Option Bestimmte Ressource aus und geben Sie die BigQuery-Tabelle an.

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. Wählen Sie im Feld Region einen Speicherort für den Eventarc-Trigger aus, falls vorhanden. Weitere Informationen finden Sie unter Triggerstandort.

      6. Klicken Sie auf Trigger speichern.

    5. Wenn Sie aufgefordert werden, Dienstkonten Rollen zuzuweisen, klicken Sie auf Alle gewähren.

  3. Klicken Sie auf Weiter, um zur Seite Code zu gelangen. Auf der Seite Code:

    1. Legen Sie als Laufzeit „Python 3.12“ fest.

    2. Legen Sie als Einstiegspunkt mlops_entrypoint fest.

    3. Öffnen Sie die Datei main.py mit dem Inline-Editor und ersetzen Sie den Inhalt durch Folgendes:

      Ersetzen Sie PROJECT_ID,REGION und BUCKET_NAME durch die Werte,die Sie zuvor verwendet haben.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Öffnen Sie die Datei requirements.txt und ersetzen Sie den Inhalt durch Folgendes:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Klicken Sie auf Bereitstellen, um die Funktion bereitzustellen.

Daten einfügen, um die Pipeline auszulösen

  1. Wechseln Sie in der Google Cloud Console zu BigQuery Studio.

    BigQuery aufrufen

  2. Klicken Sie auf SQL-Abfrage erstellen und führen Sie die folgende SQL-Abfrage aus. Klicken Sie dazu auf Ausführen.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Diese SQL-Abfrage fügt neue Zeilen in die Tabelle ein.

  3. Wenn Sie prüfen möchten, ob das Ereignis ausgelöst wurde, suchen Sie im Protokoll Ihrer Funktion nach pipeline trigger condition met.

    Zu den Cloud Run-Funktionen

  4. Wenn die Funktion erfolgreich ausgelöst wurde, sollte in Vertex AI Pipelines eine neue Pipeline ausgeführt werden. Die Ausführung des Pipeline-Jobs dauert etwa 30 Minuten.

    Zu Vertex AI Pipelines

Bereinigen

Wenn Sie alle für dieses Projekt verwendeten Google Cloud-Ressourcen bereinigen möchten, können Sie das Google Cloud-Projekt löschen, das Sie für diese Anleitung verwendet haben.

Andernfalls können Sie die einzelnen Ressourcen löschen, die Sie für diese Anleitung erstellt haben.

  1. Löschen Sie das Colab Enterprise-Notebook.

    Zu Colab Enterprise

  2. Löschen Sie das Dataset in BigQuery.

    BigQuery aufrufen

  3. Löschen Sie den Cloud Storage-Bucket.

    Cloud Storage aufrufen

  4. Löschen Sie das Modell so:

    1. Rufen Sie im Bereich „Vertex AI“ die Seite Model Registry auf.

      Zur Seite Model Registry

    2. Klicken Sie neben dem Namen Ihres Modells auf das Menü Aktionen und wählen Sie Modell löschen aus.

  5. So löschen Sie Pipelineausführungen:

    1. Rufen Sie die Seite Pipeline-Ausführungen auf.

      Zu den Pipeline-Ausführungen

    2. Klicken Sie neben dem Namen jeder Pipelineausführung auf das Menü Aktionen und wählen Sie Pipelineausführung löschen aus.

  6. Löschen Sie die benutzerdefinierten Trainingsjobs:

    1. Benutzerdefinierte Trainingsjobs aufrufen

    2. Klicken Sie neben dem Namen jedes benutzerdefinierten Trainingsjobs auf das Dreipunkt-Menü Aktionen und wählen Sie Benutzerdefinierten Trainingsjob löschen aus.

  7. So löschen Sie die Batchvorhersagejobs:

    1. Zur Seite "Batchvorhersagen"

    2. Klicken Sie neben dem Namen jedes Batchvorhersagejobs auf das Dreipunkt-Menü Aktionen und wählen Sie Batchvorhersagejob löschen aus.

  8. Löschen Sie das Repository aus Artifact Registry.

    Zu Artifact Registry

  9. Cloud Functions-Funktion löschen

    Zu den Cloud Run-Funktionen