本教程介绍如何使用 Vertex AI Pipelines 运行端到端机器学习工作流,包括以下任务:
- 导入和转换数据。
- 使用所选的机器学习框架训练模型。
- 将经过训练的模型导入 Vertex AI Model Registry。
- 可选:使用 Vertex AI Prediction 部署用于在线服务的模型。
准备工作
确保您已完成设置 Google Cloud 项目和开发环境中的任务 1-3。
安装 Python 版 Vertex AI SDK 和 Kubeflow Pipelines SDK:
python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
运行机器学习模型训练流水线
在以下标签页中选择训练目标和机器学习框架,以获取可在您的环境中运行的示例代码。示例代码会执行以下操作:
- 从组件代码库加载组件,以用作流水线基本组件。
- 创建组件任务并使用参数在任务之间传递数据,从而构建流水线。
- 提交流水线以在 Vertex AI Pipelines 上执行。请参阅 Vertex AI Pipelines 价格。
将代码复制到开发环境中并运行代码。
表格分类
TensorFlow
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
binarize_column_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Binarize_column/in_CSV_format/component.yaml")
split_rows_into_subsets_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/dataset_manipulation/Split_rows_into_subsets/in_CSV/component.yaml")
create_fully_connected_tensorflow_network_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Create_fully_connected_network/component.yaml")
train_model_using_Keras_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Train_model_using_Keras/on_CSV/component.yaml")
predict_with_TensorFlow_model_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Predict/on_CSV/component.yaml")
upload_Tensorflow_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_Tensorflow_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_classification_model_using_TensorFlow_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
training_set_fraction = 0.8
# Deploying the model might incur additional costs over time
deploy_model = False
classification_label_column = "class"
all_columns = [label_column] + feature_columns
dataset = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
dataset = select_columns_using_Pandas_on_CSV_data_op(
table=dataset,
column_names=all_columns,
).outputs["transformed_table"]
dataset = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=dataset,
replacement_value="0",
# # Optional:
# column_names=None, # =[...]
).outputs["transformed_table"]
classification_dataset = binarize_column_using_Pandas_on_CSV_data_op(
table=dataset,
column_name=label_column,
predicate=" > 0",
new_column_name=classification_label_column,
).outputs["transformed_table"]
split_task = split_rows_into_subsets_op(
table=classification_dataset,
fraction_1=training_set_fraction,
)
classification_training_data = split_task.outputs["split_1"]
classification_testing_data = split_task.outputs["split_2"]
network = create_fully_connected_tensorflow_network_op(
input_size=len(feature_columns),
# Optional:
hidden_layer_sizes=[10],
activation_name="elu",
output_activation_name="sigmoid",
# output_size=1,
).outputs["model"]
model = train_model_using_Keras_on_CSV_op(
training_data=classification_training_data,
model=network,
label_column_name=classification_label_column,
# Optional:
loss_function_name="binary_crossentropy",
number_of_epochs=10,
#learning_rate=0.1,
#optimizer_name="Adadelta",
#optimizer_parameters={},
#batch_size=32,
#metric_names=["mean_absolute_error"],
#random_seed=0,
).outputs["trained_model"]
predictions = predict_with_TensorFlow_model_on_CSV_data_op(
dataset=classification_testing_data,
model=model,
# label_column_name needs to be set when doing prediction on a dataset that has labels
label_column_name=classification_label_column,
# Optional:
# batch_size=1000,
).outputs["predictions"]
vertex_model_name = upload_Tensorflow_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func = train_tabular_classification_model_using_TensorFlow_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
PyTorch
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
binarize_column_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Binarize_column/in_CSV_format/component.yaml")
create_fully_connected_pytorch_network_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Create_fully_connected_network/component.yaml")
train_pytorch_model_from_csv_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Train_PyTorch_model/from_CSV/component.yaml")
create_pytorch_model_archive_with_base_handler_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Create_PyTorch_Model_Archive/with_base_handler/component.yaml")
upload_PyTorch_model_archive_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_PyTorch_model_archive/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_classification_model_using_PyTorch_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
# Deploying the model might incur additional costs over time
deploy_model = False
classification_label_column = "class"
all_columns = [label_column] + feature_columns
training_data = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
training_data = select_columns_using_Pandas_on_CSV_data_op(
table=training_data,
column_names=all_columns,
).outputs["transformed_table"]
# Cleaning the NaN values.
training_data = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=training_data,
replacement_value="0",
#replacement_type_name="float",
).outputs["transformed_table"]
classification_training_data = binarize_column_using_Pandas_on_CSV_data_op(
table=training_data,
column_name=label_column,
predicate=" > 0",
new_column_name=classification_label_column,
).outputs["transformed_table"]
network = create_fully_connected_pytorch_network_op(
input_size=len(feature_columns),
# Optional:
hidden_layer_sizes=[10],
activation_name="elu",
output_activation_name="sigmoid",
# output_size=1,
).outputs["model"]
model = train_pytorch_model_from_csv_op(
model=network,
training_data=classification_training_data,
label_column_name=classification_label_column,
loss_function_name="binary_cross_entropy",
# Optional:
#number_of_epochs=1,
#learning_rate=0.1,
#optimizer_name="Adadelta",
#optimizer_parameters={},
#batch_size=32,
#batch_log_interval=100,
#random_seed=0,
).outputs["trained_model"]
model_archive = create_pytorch_model_archive_with_base_handler_op(
model=model,
# Optional:
# model_name="model",
# model_version="1.0",
).outputs["Model archive"]
vertex_model_name = upload_PyTorch_model_archive_to_Google_Cloud_Vertex_AI_op(
model_archive=model_archive,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func=train_tabular_classification_model_using_PyTorch_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
XGBoost
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
binarize_column_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Binarize_column/in_CSV_format/component.yaml")
split_rows_into_subsets_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/dataset_manipulation/Split_rows_into_subsets/in_CSV/component.yaml")
train_XGBoost_model_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/XGBoost/Train/component.yaml")
xgboost_predict_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/XGBoost/Predict/component.yaml")
upload_XGBoost_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_XGBoost_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_classification_model_using_XGBoost_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
training_set_fraction = 0.8
# Deploying the model might incur additional costs over time
deploy_model = False
classification_label_column = "class"
all_columns = [label_column] + feature_columns
dataset = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
dataset = select_columns_using_Pandas_on_CSV_data_op(
table=dataset,
column_names=all_columns,
).outputs["transformed_table"]
dataset = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=dataset,
replacement_value="0",
# # Optional:
# column_names=None, # =[...]
).outputs["transformed_table"]
classification_dataset = binarize_column_using_Pandas_on_CSV_data_op(
table=dataset,
column_name=label_column,
predicate="> 0",
new_column_name=classification_label_column,
).outputs["transformed_table"]
split_task = split_rows_into_subsets_op(
table=classification_dataset,
fraction_1=training_set_fraction,
)
classification_training_data = split_task.outputs["split_1"]
classification_testing_data = split_task.outputs["split_2"]
model = train_XGBoost_model_on_CSV_op(
training_data=classification_training_data,
label_column_name=classification_label_column,
objective="binary:logistic",
# Optional:
#starting_model=None,
#num_iterations=10,
#booster_params={},
#booster="gbtree",
#learning_rate=0.3,
#min_split_loss=0,
#max_depth=6,
).outputs["model"]
# Predicting on the testing data
predictions = xgboost_predict_on_CSV_op(
data=classification_testing_data,
model=model,
# label_column needs to be set when doing prediction on a dataset that has labels
label_column_name=classification_label_column,
).outputs["predictions"]
vertex_model_name = upload_XGBoost_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func = train_tabular_classification_model_using_XGBoost_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
Scikit-learn
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
binarize_column_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Binarize_column/in_CSV_format/component.yaml")
train_logistic_regression_model_using_scikit_learn_from_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/1f5cf6e06409b704064b2086c0a705e4e6b4fcde/community-content/pipeline_components/ML_frameworks/Scikit_learn/Train_logistic_regression_model/from_CSV/component.yaml")
upload_Scikit_learn_pickle_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_Scikit-learn_pickle_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_classification_logistic_regression_model_using_Scikit_learn_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
# Deploying the model might incur additional costs over time
deploy_model = False
classification_label_column = "class"
all_columns = [label_column] + feature_columns
training_data = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
training_data = select_columns_using_Pandas_on_CSV_data_op(
table=training_data,
column_names=all_columns,
).outputs["transformed_table"]
# Cleaning the NaN values.
training_data = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=training_data,
replacement_value="0",
#replacement_type_name="float",
).outputs["transformed_table"]
classification_training_data = binarize_column_using_Pandas_on_CSV_data_op(
table=training_data,
column_name=label_column,
predicate="> 0",
new_column_name=classification_label_column,
).outputs["transformed_table"]
model = train_logistic_regression_model_using_scikit_learn_from_CSV_op(
dataset=classification_training_data,
label_column_name=classification_label_column,
# Optional:
#penalty="l2",
#solver="lbfgs",
#max_iterations=100,
#multi_class_mode="auto",
#random_seed=0,
).outputs["model"]
vertex_model_name = upload_Scikit_learn_pickle_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
sklearn_vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func = train_tabular_classification_logistic_regression_model_using_Scikit_learn_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
表格回归
TensorFlow
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
split_rows_into_subsets_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/dataset_manipulation/Split_rows_into_subsets/in_CSV/component.yaml")
create_fully_connected_tensorflow_network_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Create_fully_connected_network/component.yaml")
train_model_using_Keras_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Train_model_using_Keras/on_CSV/component.yaml")
predict_with_TensorFlow_model_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/tensorflow/Predict/on_CSV/component.yaml")
upload_Tensorflow_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_Tensorflow_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_regression_model_using_Tensorflow_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
training_set_fraction = 0.8
# Deploying the model might incur additional costs over time
deploy_model = False
all_columns = [label_column] + feature_columns
dataset = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
dataset = select_columns_using_Pandas_on_CSV_data_op(
table=dataset,
column_names=all_columns,
).outputs["transformed_table"]
dataset = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=dataset,
replacement_value="0",
# # Optional:
# column_names=None, # =[...]
).outputs["transformed_table"]
split_task = split_rows_into_subsets_op(
table=dataset,
fraction_1=training_set_fraction,
)
training_data = split_task.outputs["split_1"]
testing_data = split_task.outputs["split_2"]
network = create_fully_connected_tensorflow_network_op(
input_size=len(feature_columns),
# Optional:
hidden_layer_sizes=[10],
activation_name="elu",
# output_activation_name=None,
# output_size=1,
).outputs["model"]
model = train_model_using_Keras_on_CSV_op(
training_data=training_data,
model=network,
label_column_name=label_column,
# Optional:
#loss_function_name="mean_squared_error",
number_of_epochs=10,
#learning_rate=0.1,
#optimizer_name="Adadelta",
#optimizer_parameters={},
#batch_size=32,
metric_names=["mean_absolute_error"],
#random_seed=0,
).outputs["trained_model"]
predictions = predict_with_TensorFlow_model_on_CSV_data_op(
dataset=testing_data,
model=model,
# label_column_name needs to be set when doing prediction on a dataset that has labels
label_column_name=label_column,
# Optional:
# batch_size=1000,
).outputs["predictions"]
vertex_model_name = upload_Tensorflow_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func=train_tabular_regression_model_using_Tensorflow_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
PyTorch
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
create_fully_connected_pytorch_network_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Create_fully_connected_network/component.yaml")
train_pytorch_model_from_csv_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Train_PyTorch_model/from_CSV/component.yaml")
create_pytorch_model_archive_with_base_handler_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/PyTorch/Create_PyTorch_Model_Archive/with_base_handler/component.yaml")
upload_PyTorch_model_archive_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_PyTorch_model_archive/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_regression_model_using_PyTorch_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
all_columns = [label_column] + feature_columns
# Deploying the model might incur additional costs over time
deploy_model = False
training_data = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
training_data = select_columns_using_Pandas_on_CSV_data_op(
table=training_data,
column_names=all_columns,
).outputs["transformed_table"]
# Cleaning the NaN values.
training_data = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=training_data,
replacement_value="0",
#replacement_type_name="float",
).outputs["transformed_table"]
network = create_fully_connected_pytorch_network_op(
input_size=len(feature_columns),
# Optional:
hidden_layer_sizes=[10],
activation_name="elu",
# output_activation_name=None,
# output_size=1,
).outputs["model"]
model = train_pytorch_model_from_csv_op(
model=network,
training_data=training_data,
label_column_name=label_column,
# Optional:
#loss_function_name="mse_loss",
#number_of_epochs=1,
#learning_rate=0.1,
#optimizer_name="Adadelta",
#optimizer_parameters={},
#batch_size=32,
#batch_log_interval=100,
#random_seed=0,
).outputs["trained_model"]
model_archive = create_pytorch_model_archive_with_base_handler_op(
model=model,
# Optional:
# model_name="model",
# model_version="1.0",
).outputs["Model archive"]
vertex_model_name = upload_PyTorch_model_archive_to_Google_Cloud_Vertex_AI_op(
model_archive=model_archive,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func=train_tabular_regression_model_using_PyTorch_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
XGBoost
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
split_rows_into_subsets_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/dataset_manipulation/Split_rows_into_subsets/in_CSV/component.yaml")
train_XGBoost_model_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/XGBoost/Train/component.yaml")
xgboost_predict_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/XGBoost/Predict/component.yaml")
upload_XGBoost_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_XGBoost_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_regression_model_using_XGBoost_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
training_set_fraction = 0.8
# Deploying the model might incur additional costs over time
deploy_model = False
all_columns = [label_column] + feature_columns
dataset = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
dataset = select_columns_using_Pandas_on_CSV_data_op(
table=dataset,
column_names=all_columns,
).outputs["transformed_table"]
dataset = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=dataset,
replacement_value="0",
# # Optional:
# column_names=None, # =[...]
).outputs["transformed_table"]
split_task = split_rows_into_subsets_op(
table=dataset,
fraction_1=training_set_fraction,
)
training_data = split_task.outputs["split_1"]
testing_data = split_task.outputs["split_2"]
model = train_XGBoost_model_on_CSV_op(
training_data=training_data,
label_column_name=label_column,
# Optional:
#starting_model=None,
#num_iterations=10,
#booster_params={},
#objective="reg:squarederror",
#booster="gbtree",
#learning_rate=0.3,
#min_split_loss=0,
#max_depth=6,
).outputs["model"]
# Predicting on the testing data
predictions = xgboost_predict_on_CSV_op(
data=testing_data,
model=model,
# label_column needs to be set when doing prediction on a dataset that has labels
label_column_name=label_column,
).outputs["predictions"]
vertex_model_name = upload_XGBoost_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func = train_tabular_regression_model_using_XGBoost_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
Scikit-learn
# python3 -m pip install "kfp<2.0.0" "google-cloud-aiplatform>=1.16.0" --upgrade --quiet
from kfp import components
# %% Loading components
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
train_linear_regression_model_using_scikit_learn_from_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/1f5cf6e06409b704064b2086c0a705e4e6b4fcde/community-content/pipeline_components/ML_frameworks/Scikit_learn/Train_linear_regression_model/from_CSV/component.yaml")
upload_Scikit_learn_pickle_model_to_Google_Cloud_Vertex_AI_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Upload_Scikit-learn_pickle_model/component.yaml")
deploy_model_to_endpoint_op = components.load_component_from_url("https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/399405402d95f4a011e2d2e967c96f8508ba5688/community-content/pipeline_components/google-cloud/Vertex_AI/Models/Deploy_to_endpoint/component.yaml")
# %% Pipeline definition
def train_tabular_regression_linear_model_using_Scikit_learn_pipeline():
dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"] # Excluded "trip_total"
label_column = "tips"
all_columns = [label_column] + feature_columns
# Deploying the model might incur additional costs over time
deploy_model = False
training_data = download_from_gcs_op(
gcs_path=dataset_gcs_uri
).outputs["Data"]
training_data = select_columns_using_Pandas_on_CSV_data_op(
table=training_data,
column_names=all_columns,
).outputs["transformed_table"]
# Cleaning the NaN values.
training_data = fill_all_missing_values_using_Pandas_on_CSV_data_op(
table=training_data,
replacement_value="0",
#replacement_type_name="float",
).outputs["transformed_table"]
model = train_linear_regression_model_using_scikit_learn_from_CSV_op(
dataset=training_data,
label_column_name=label_column,
).outputs["model"]
vertex_model_name = upload_Scikit_learn_pickle_model_to_Google_Cloud_Vertex_AI_op(
model=model,
).outputs["model_name"]
# Deploying the model might incur additional costs over time
if deploy_model:
sklearn_vertex_endpoint_name = deploy_model_to_endpoint_op(
model_name=vertex_model_name,
).outputs["endpoint_name"]
pipeline_func = train_tabular_regression_linear_model_using_Scikit_learn_pipeline
# %% Pipeline submission
if __name__ == '__main__':
from google.cloud import aiplatform
aiplatform.PipelineJob.from_pipeline_func(pipeline_func=pipeline_func).submit()
对于提供的代码示例,请注意以下事项:
- Kubeflow 流水线定义为 Python 函数。
- 流水线的工作流步骤是使用 Kubeflow 流水线组件创建的。通过使用组件的输出作为另一个组件的输入,您可以将流水线的工作流定义为图。例如,
fill_all_missing_values_using_Pandas_on_CSV_data_op
组件任务依赖于select_columns_using_Pandas_on_CSV_data_op
组件任务中的transformed_table
输出。 - 您可以使用 Vertex AI SDK for Python 创建在 Vertex AI Pipelines 上运行的流水线。
监控流水线
在 Google Cloud 控制台的 Vertex AI 部分中,转到流水线页面并打开运行标签页。
后续步骤
- 如需详细了解 Vertex AI Pipelines,请参阅 Vertex AI Pipelines 简介。