Dokumen ini memandu Anda melalui langkah-langkah yang diperlukan untuk mem-build pipeline yang secara otomatis melatih model kustom pada jadwal berkala atau saat data baru disisipkan ke dalam set data menggunakan Vertex AI Pipelines dan fungsi Cloud Run.
Tujuan
Langkah-langkah berikut membahas proses ini:
Mendapatkan dan menyiapkan set data di BigQuery.
Buat dan upload paket pelatihan kustom. Saat dijalankan, kode ini akan membaca data dari set data dan melatih model.
Buat Pipeline Vertex AI. Pipeline ini menjalankan paket pelatihan kustom, mengupload model ke Vertex AI Model Registry, menjalankan tugas evaluasi, dan mengirim notifikasi email.
Jalankan pipeline secara manual.
Buat Cloud Function dengan pemicu Eventarc yang menjalankan pipeline setiap kali data baru disisipkan ke set data BigQuery.
Sebelum Memulai
Siapkan project dan notebook Anda.
Penyiapan project
-
In the Google Cloud console, go to the project selector page.
-
Select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
Buat notebook
Kita menggunakan notebook Colab Enterprise untuk menjalankan beberapa kode dalam tutorial ini.
Jika Anda bukan pemilik project, minta pemilik project untuk memberi Anda peran IAM
roles/resourcemanager.projectIamAdmin
danroles/aiplatform.colabEnterpriseUser
.Anda harus memiliki peran ini untuk menggunakan Colab Enterprise dan memberikan peran serta izin IAM kepada diri sendiri dan akun layanan.
Di konsol Google Cloud, buka halaman Notebook Colab Enterprise.
Colab Enterprise akan meminta Anda untuk mengaktifkan API yang diperlukan berikut jika belum diaktifkan.
- Vertex AI API
- Dataform API
- Compute Engine API
Di menu Region, pilih region tempat Anda ingin membuat notebook. Jika Anda tidak yakin, gunakan us-central1 sebagai region.
Gunakan region yang sama untuk semua resource dalam tutorial ini.
Klik Buat notebook baru.
Notebook baru Anda akan muncul di tab Notebook saya. Untuk menjalankan kode di notebook, tambahkan sel kode dan klik tombol Run cell.
Menyiapkan lingkungan pengembangan
Di notebook, instal paket Python3 berikut.
! pip3 install google-cloud-aiplatform==1.34.0 \ google-cloud-pipeline-components==2.6.0 \ kfp==2.4.0 \ scikit-learn==1.0.2 \ mlflow==2.10.0
Tetapkan project Google Cloud CLI dengan menjalankan perintah berikut:
PROJECT_ID = "PROJECT_ID" # Set the project id ! gcloud config set project {PROJECT_ID}
Ganti PROJECT_ID dengan project ID Anda. Jika perlu, Anda dapat menemukan project ID di konsol Google Cloud.
Berikan peran ke Akun Google Anda:
! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
Mengaktifkan API berikut
- Artifact Registry API
- BigQuery API
- Cloud Build API
- Cloud Functions API
- Cloud Logging API
- Pub/Sub API
- Cloud Run Admin API
- Cloud Storage API
- API Eventarc
- Service Usage API
- Vertex AI API
! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
Berikan peran ke akun layanan project Anda:
Melihat nama akun layanan Anda
! gcloud iam service-accounts list
Catat nama agen layanan Compute Anda. URL tersebut harus dalam format
xxxxxxxx-compute@developer.gserviceaccount.com
.Berikan peran yang diperlukan ke agen layanan.
! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
Mendapatkan dan menyiapkan set data
Dalam tutorial ini, Anda akan membuat model yang memprediksi tarif perjalanan taksi berdasarkan fitur seperti waktu perjalanan, lokasi, dan jarak. Kita akan menggunakan data dari set data Chicago Taxi Trips publik. Set data ini mencakup perjalanan taksi dari tahun 2013 hingga saat ini, yang dilaporkan ke Kota Chicago dalam perannya sebagai lembaga pengatur. Untuk melindungi privasi pengemudi dan pengguna taksi secara bersamaan serta memungkinkan agregator menganalisis data, ID Taksi tetap konsisten untuk setiap nomor medali taksi tertentu, tetapi tidak menampilkan nomor, Tract Sensus disembunyikan dalam beberapa kasus, dan waktu dibulatkan ke 15 menit terdekat.
Untuk mengetahui informasi selengkapnya, lihat Perjalanan Taksi Chicago di Marketplace.
Membuat set data BigQuery
Di konsol Google Cloud, buka BigQuery Studio.
Di panel Explorer, cari project Anda, klik
Actions, lalu klik Create dataset.Di halaman Create dataset:
Untuk Dataset ID, masukkan
mlops
. Untuk mengetahui informasi selengkapnya, lihat penamaan set data.Untuk Jenis lokasi, pilih multi-region Anda. Misalnya, pilih US (multiple regions in the United States) jika Anda menggunakan
us-central1
. Setelah set data dibuat, lokasi tidak dapat diubah.Klik Create dataset.
Untuk mengetahui informasi selengkapnya, lihat cara membuat set data.
Membuat dan mengisi tabel BigQuery
Di bagian ini, Anda akan membuat tabel dan mengimpor data selama satu tahun dari set data publik ke set data project.
Buka BigQuery Studio
Klik Create SQL Query dan jalankan kueri SQL berikut dengan mengklik
Run.CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago` AS ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Kueri ini membuat tabel
<PROJECT_ID>.mlops.chicago
dan mengisinya dengan data dari tabelbigquery-public-data.chicago_taxi_trips.taxi_trips
publik.Untuk melihat skema tabel, klik Go to table, lalu klik tab Schema.
Untuk melihat isi tabel, klik tab Preview.
Membuat dan mengupload paket pelatihan kustom
Di bagian ini, Anda akan membuat paket Python yang berisi kode yang membaca set data, membagi data menjadi set pelatihan dan pengujian, serta melatih model kustom Anda. Paket akan dijalankan sebagai salah satu tugas dalam pipeline Anda. Untuk informasi selengkapnya, lihat mem-build aplikasi pelatihan Python untuk container bawaan.
Membuat paket pelatihan kustom
Di notebook Colab, buat folder induk untuk aplikasi pelatihan:
!mkdir -p training_package/trainer
Buat file
__init__.py
di setiap folder untuk menjadikannya paket menggunakan perintah berikut:! touch training_package/__init__.py ! touch training_package/trainer/__init__.py
Anda dapat melihat file dan folder baru di panel folder File.
Di panel File, buat file bernama
task.py
di folder training_package/trainer dengan konten berikut.# Import the libraries from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import OneHotEncoder, StandardScaler from google.cloud import bigquery, bigquery_storage from sklearn.ensemble import RandomForestRegressor from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from google import auth from scipy import stats import numpy as np import argparse import joblib import pickle import csv import os # add parser arguments parser = argparse.ArgumentParser() parser.add_argument('--project-id', dest='project_id', type=str, help='Project ID.') parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"), type=str, help='Dir to save the data and the trained model.') parser.add_argument('--bq-source', dest='bq_source', type=str, help='BigQuery data source for training data.') args = parser.parse_args() # data preparation code BQ_QUERY = """ with tmp_table as ( SELECT trip_seconds, trip_miles, fare, tolls, company, pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp, DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp, CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport, FROM `{}` WHERE dropoff_latitude IS NOT NULL and dropoff_longitude IS NOT NULL and pickup_latitude IS NOT NULL and pickup_longitude IS NOT NULL and fare > 0 and trip_miles > 0 and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99 ORDER BY RAND() LIMIT 10000) SELECT *, EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year, EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month, EXTRACT(DAY FROM trip_start_timestamp) trip_start_day, EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour, FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week FROM tmp_table """.format(args.bq_source) # Get default credentials credentials, project = auth.default() bqclient = bigquery.Client(credentials=credentials, project=args.project_id) bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials) df = ( bqclient.query(BQ_QUERY) .result() .to_dataframe(bqstorage_client=bqstorageclient) ) # Add 'N/A' for missing 'Company' df.fillna(value={'company':'N/A','tolls':0}, inplace=True) # Drop rows containing null data. df.dropna(how='any', axis='rows', inplace=True) # Pickup and dropoff locations distance df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100 # Remove extremes, outliers possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance'] df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy() # Reduce location accuracy df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3}) # Drop the timestamp col X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1) # Split the data into train and test X_train, X_test = train_test_split(X, test_size=0.10, random_state=123) ## Format the data for batch predictions # select string cols string_cols = X_test.select_dtypes(include='object').columns # Add quotes around string fields X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"') # Add quotes around column names X_test.columns = ['\"' + col + '\"' for col in X_test.columns] # Save DataFrame to csv X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Save test data without the target for batch predictions X_test.drop('\"fare\"',axis=1,inplace=True) X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Separate the target column y_train=X_train.pop('fare') # Get the column indexes col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)} # Create a column transformer pipeline ct_pipe = ColumnTransformer(transformers=[ ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]), ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]), ('std_scaler', StandardScaler(), [ col_index_dict['trip_start_year'], col_index_dict['abs_distance'], col_index_dict['pickup_longitude'], col_index_dict['pickup_latitude'], col_index_dict['dropoff_longitude'], col_index_dict['dropoff_latitude'], col_index_dict['trip_miles'], col_index_dict['trip_seconds']]) ]) # Add the random-forest estimator to the pipeline rfr_pipe = Pipeline([ ('ct', ct_pipe), ('forest_reg', RandomForestRegressor( n_estimators = 20, max_features = 1.0, n_jobs = -1, random_state = 3, max_depth=None, max_leaf_nodes=None, )) ]) # train the model rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5) rfr_rmse = np.sqrt(-rfr_score) print ("Crossvalidation RMSE:",rfr_rmse.mean()) final_model=rfr_pipe.fit(X_train, y_train) # Save the model pipeline with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file: pickle.dump(final_model, model_file)
Kode ini menyelesaikan tugas-tugas berikut:
- Pemilihan fitur.
- Mengubah waktu data pengambilan dan pengantaran dari UTC ke waktu lokal Chicago.
- Mengekstrak tanggal, jam, hari dalam seminggu, bulan, dan tahun dari waktu pengambilan.
- Menghitung durasi perjalanan menggunakan waktu mulai dan waktu berakhir.
- Mengidentifikasi dan menandai perjalanan yang dimulai atau berakhir di bandara berdasarkan area komunitas.
- Model regresi Random Forest dilatih untuk memprediksi tarif perjalanan taksi menggunakan framework scikit-learn.
Model yang dilatih disimpan ke dalam file pickle
model.pkl
.Pendekatan yang dipilih dan pembuatan fitur didasarkan pada eksplorasi dan analisis data di Memprediksi Tarif Taksi Chicago.
Di panel File, buat file bernama
setup.py
di folder training_package dengan konten berikut.from setuptools import find_packages from setuptools import setup REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"] setup( name='trainer', version='0.1', install_requires=REQUIRED_PACKAGES, packages=find_packages(), include_package_data=True, description='Training application package for chicago taxi trip fare prediction.' )
Di notebook, jalankan
setup.py
untuk membuat distribusi sumber untuk aplikasi pelatihan Anda:! cd training_package && python setup.py sdist --formats=gztar && cd ..
Di akhir bagian ini, panel File Anda akan berisi file dan folder berikut di bagian training-package
.
dist
trainer-0.1.tar.gz
trainer
__init__.py
task.py
trainer.egg-info
__init__.py
setup.py
Mengupload paket pelatihan kustom ke Cloud Storage
Membuat bucket Cloud Storage.
REGION="REGION" BUCKET_NAME = "BUCKET_NAME" BUCKET_URI = f"gs://{BUCKET_NAME}" ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
Ganti nilai parameter berikut:
REGION
: Pilih region yang sama dengan yang Anda pilih saat membuat notebook colab.BUCKET_NAME
: Nama bucket.
Upload paket pelatihan Anda ke bucket Cloud Storage.
# Copy the training package to the bucket ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
Membuat pipeline
Pipeline adalah deskripsi alur kerja MLOps sebagai grafik langkah yang disebut tugas pipeline.
Di bagian ini, Anda menentukan tugas pipeline, mengompilasikannya ke YAML, dan mendaftarkan pipeline di Artifact Registry sehingga dapat dikontrol versi dan berjalan beberapa kali, oleh satu pengguna atau beberapa pengguna.
Berikut adalah visualisasi tugas, termasuk pelatihan model, upload model, evaluasi model, dan notifikasi email, dalam pipeline kami:
Untuk mengetahui informasi selengkapnya, lihat membuat template pipeline.
Menentukan konstanta dan melakukan inisialisasi klien
Di notebook, tentukan konstanta yang akan digunakan di langkah berikutnya:
import os EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ] PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial" WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial" os.environ['AIP_MODEL_DIR'] = WORKING_DIR EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" PIPELINE_FILE = PIPELINE_NAME + ".yaml"
Ganti
NOTIFY_EMAIL
dengan alamat email. Saat tugas pipeline selesai, baik berhasil maupun gagal, email akan dikirim ke alamat email tersebut.Lakukan inisialisasi Vertex AI SDK dengan project, bucket staging, lokasi, dan eksperimen.
from google.cloud import aiplatform aiplatform.init( project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION, experiment=EXPERIMENT_NAME) aiplatform.autolog()
Menentukan tugas pipeline
Di notebook, tentukan custom_model_training_evaluation_pipeline
pipeline Anda:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform
# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
project: str,
location: str,
training_job_display_name: str,
worker_pool_specs: list,
base_output_dir: str,
prediction_container_uri: str,
model_display_name: str,
batch_prediction_job_display_name: str,
target_field_name: str,
test_data_gcs_uri: list,
ground_truth_gcs_source: list,
batch_predictions_gcs_prefix: str,
batch_predictions_input_format: str="csv",
batch_predictions_output_format: str="jsonl",
ground_truth_format: str="csv",
parent_model_resource_name: str=None,
parent_model_artifact_uri: str=None,
existing_model: bool=False
):
# Notification task
notify_task = VertexNotificationEmailOp(
recipients= EMAIL_RECIPIENTS
)
with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
# Train the model
custom_job_task = CustomTrainingJobOp(
project=project,
display_name=training_job_display_name,
worker_pool_specs=worker_pool_specs,
base_output_directory=base_output_dir,
location=location
)
# Import the unmanaged model
import_unmanaged_model_task = importer(
artifact_uri=base_output_dir,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": prediction_container_uri,
},
},
).after(custom_job_task)
with dsl.If(existing_model == True):
# Import the parent model to upload as a version
import_registry_model_task = importer(
artifact_uri=parent_model_artifact_uri,
artifact_class=artifact_types.VertexModel,
metadata={
"resourceName": parent_model_resource_name
},
).after(import_unmanaged_model_task)
# Upload the model as a version
model_version_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
parent_model=import_registry_model_task.outputs["artifact"],
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
with dsl.Else():
# Upload the model
model_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
# Get the model (or model version)
model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])
# Batch prediction
batch_predict_task = ModelBatchPredictOp(
project= project,
job_display_name= batch_prediction_job_display_name,
model= model_resource,
location= location,
instances_format= batch_predictions_input_format,
predictions_format= batch_predictions_output_format,
gcs_source_uris= test_data_gcs_uri,
gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
machine_type= 'n1-standard-2'
)
# Evaluation task
evaluation_task = ModelEvaluationRegressionOp(
project= project,
target_field_name= target_field_name,
location= location,
# model= model_resource,
predictions_format= batch_predictions_output_format,
predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
ground_truth_format= ground_truth_format,
ground_truth_gcs_source= ground_truth_gcs_source
)
return
Pipeline Anda terdiri dari grafik tugas yang menggunakan Komponen Pipeline Google Cloud berikut:
CustomTrainingJobOp
: Menjalankan tugas pelatihan kustom di Vertex AI.ModelUploadOp
: Mengupload model machine learning yang dilatih ke registry model.ModelBatchPredictOp
: Membuat tugas prediksi batch.ModelEvaluationRegressionOp
: Mengevaluasi tugas batch regresi.VertexNotificationEmailOp
: Mengirim notifikasi email.
Mengompilasi pipeline
Kompilasi pipeline menggunakan compiler Kubeflow Pipelines (KFP) ke file YAML yang berisi representasi hermetis pipeline Anda.
from kfp import dsl
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=custom_model_training_evaluation_pipeline,
package_path="{}.yaml".format(PIPELINE_NAME),
)
Anda akan melihat file YAML bernama vertex-pipeline-datatrigger-tutorial.yaml
di
direktori kerja.
Mengupload pipeline sebagai template
Buat repositori jenis
KFP
di Artifact Registry.REPO_NAME = "mlops" # Create a repo in the artifact registry ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
Upload pipeline yang dikompilasi ke repositori.
from kfp.registry import RegistryClient host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}" client = RegistryClient(host=host) TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline( file_name=PIPELINE_FILE, tags=["v1", "latest"], extra_headers={"description":"This is an example pipeline template."}) TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
Di konsol Google Cloud, pastikan template Anda muncul di Pipeline Templates.
Menjalankan pipeline secara manual
Untuk memastikan pipeline berfungsi, jalankan pipeline secara manual.
Di notebook, tentukan parameter yang diperlukan untuk menjalankan pipeline sebagai tugas.
DATASET_NAME = "mlops" TABLE_NAME = "chicago" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False }
Membuat dan menjalankan tugas pipeline.
# Create a pipeline job job = aiplatform.PipelineJob( display_name="triggered_custom_regression_evaluation", template_path=TEMPLATE_URI , parameter_values=parameters, pipeline_root=BUCKET_URI, enable_caching=False ) # Run the pipeline job job.run()
Tugas ini memerlukan waktu sekitar 30 menit untuk diselesaikan.
Di konsol, Anda akan melihat pipeline baru berjalan di halaman Pipelines:
Setelah pipeline selesai dijalankan, Anda akan melihat model baru bernama
taxifare-prediction-model
atau versi model baru di Vertex AI Model Registry:Anda juga akan melihat tugas prediksi batch baru:
Menjalankan pipeline secara otomatis
Ada dua cara untuk menjalankan pipeline secara otomatis: sesuai jadwal atau saat data baru dimasukkan ke set data.
Menjalankan pipeline sesuai jadwal
Di notebook, panggil
PipelineJob.create_schedule
.job_schedule = job.create_schedule( display_name="mlops tutorial schedule", cron="0 0 1 * *", # max_concurrent_run_count=1, max_run_count=12, )
Ekspresi
cron
menjadwalkan tugas untuk berjalan setiap tanggal 1 bulan pukul 12.00 UTC.Untuk tutorial ini, kita tidak ingin beberapa tugas berjalan secara serentak, jadi kita menetapkan
max_concurrent_run_count
ke 1.Di konsol Google Cloud, pastikan
schedule
Anda muncul di Jadwal pipeline.
Menjalankan pipeline saat ada data baru
Membuat fungsi dengan pemicu Eventarc
Buat Cloud Function (generasi ke-2) yang menjalankan pipeline setiap kali data baru disisipkan ke tabel BigQuery.
Secara khusus, kita menggunakan Eventarc untuk memicu fungsi setiap kali peristiwa google.cloud.bigquery.v2.JobService.InsertJob
terjadi. Fungsi tersebut kemudian menjalankan template pipeline.
Untuk informasi selengkapnya, lihat Pemicu Eventarc dan jenis peristiwa yang didukung.
Di konsol Google Cloud, buka fungsi Cloud Run.
Klik tombol Create Function. Di halaman Configuration:
Pilih generasi ke-2 sebagai lingkungan Anda.
Untuk Function name, gunakan mlops.
Untuk Region, pilih region yang sama dengan bucket Cloud Storage dan repositori Artifact Registry Anda.
Untuk Pemicu, pilih Pemicu lainnya. Panel Pemicu Eventarc akan terbuka.
Untuk Trigger Type, pilih Google Sources.
Untuk Penyedia Peristiwa, pilih BigQuery.
Untuk Jenis peristiwa, pilih
google.cloud.bigquery.v2.JobService.InsertJob
.Untuk Resource, pilih Specific resource dan tentukan tabel BigQuery
projects/PROJECT_ID/datasets/mlops/tables/chicago
Di kolom Region, pilih lokasi untuk pemicu Eventarc, jika ada. Lihat Lokasi pemicu untuk mengetahui informasi selengkapnya.
Klik Save Trigger.
Jika Anda diminta untuk memberikan peran ke akun layanan, klik Berikan Semua.
Klik Berikutnya untuk membuka halaman Kode. Di halaman Code:
Tetapkan Runtime ke python 3.12.
Tetapkan Entry point ke
mlops_entrypoint
.Dengan Editor Inline, buka file
main.py
dan ganti kontennya dengan kode berikut:Ganti
PROJECT_ID
,REGION
,BUCKET_NAME
dengan nilai yang Anda gunakan sebelumnya.import json import functions_framework import requests import google.auth import google.auth.transport.requests # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers! @functions_framework.cloud_event def mlops_entrypoint(cloudevent): # Print out the CloudEvent's (required) `type` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type print(f"Event type: {cloudevent['type']}") # Print out the CloudEvent's (optional) `subject` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject if 'subject' in cloudevent: # CloudEvent objects don't support `get` operations. # Use the `in` operator to verify `subject` is present. print(f"Subject: {cloudevent['subject']}") # Print out details from the `protoPayload` # This field encapsulates a Cloud Audit Logging entry # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure payload = cloudevent.data.get("protoPayload") if payload: print(f"API method: {payload.get('methodName')}") print(f"Resource name: {payload.get('resourceName')}") print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}") row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount') print(f"No. of rows: {row_count} !!") if row_count: if int(row_count) > 0: print ("Pipeline trigger Condition met !!") submit_pipeline_job() else: print ("No pipeline triggered !!!") def submit_pipeline_job(): PROJECT_ID = 'PROJECT_ID' REGION = 'REGION' BUCKET_NAME = "BUCKET_NAME" DATASET_NAME = "mlops" TABLE_NAME = "chicago" base_output_dir = BUCKET_NAME BUCKET_URI = "gs://{}".format(BUCKET_NAME) PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-mlops-pipeline-tutorial" EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" REPO_NAME ="mlops" TEMPLATE_NAME="custom-model-training-evaluation-pipeline" TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False } TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest" print("TEMPLATE URI: ", TEMPLATE_URI) request_body = { "name": PIPELINE_NAME, "displayName": PIPELINE_NAME, "runtimeConfig":{ "gcsOutputDirectory": PIPELINE_ROOT, "parameterValues": parameters, }, "templateUri": TEMPLATE_URI } pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) headers = { 'Authorization': 'Bearer {}'.format(creds.token), 'Content-Type': 'application/json; charset=utf-8' } response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body)) print(response.text)
Buka file
requirements.txt
dan ganti kontennya dengan yang berikut ini:requests==2.31.0 google-auth==2.25.1
Klik Deploy untuk men-deploy fungsi.
Memasukkan data untuk memicu pipeline
Di konsol Google Cloud, buka BigQuery Studio.
Klik Create SQL Query dan jalankan kueri SQL berikut dengan mengklik
Run.INSERT INTO `PROJECT_ID.mlops.chicago` ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Kueri SQL ini untuk menyisipkan baris baru ke dalam tabel.
Untuk memverifikasi apakah peristiwa dipicu, telusuri
pipeline trigger condition met
di log fungsi Anda.Jika fungsi berhasil dipicu, Anda akan melihat pipeline baru berjalan di Vertex AI Pipelines. Tugas pipeline memerlukan waktu sekitar 30 menit untuk diselesaikan.
Pembersihan
Untuk membersihkan semua resource Google Cloud yang digunakan untuk project ini, Anda dapat menghapus project Google Cloud yang digunakan untuk tutorial.
Atau, Anda dapat menghapus setiap resource yang dibuat untuk tutorial ini.
Hapus model sebagai berikut:
Di bagian Vertex AI, buka halaman Model Registry.
Di samping nama model, klik menu
Tindakan, lalu pilih Hapus model.
Hapus operasi pipeline:
Buka halaman Pipeline runs.
Di samping nama setiap operasi pipeline, klik menu
Tindakan, lalu pilih Hapus operasi pipeline.
Hapus tugas pelatihan kustom:
Di samping nama setiap tugas pelatihan kustom, klik menu
Tindakan, lalu pilih Hapus tugas pelatihan kustom.
Hapus tugas prediksi batch sebagai berikut:
Di samping nama setiap tugas prediksi batch, klik menu
Tindakan, lalu pilih Hapus tugas prediksi batch.