构建流水线

借助 Vertex AI Pipelines,您可以采用无服务器方式编排机器学习 (ML) 工作流。在 Vertex AI Pipelines 可以编排机器学习工作流之前,您必须将工作流描述为流水线。机器学习流水线是可移植且可扩缩的机器学习工作流,它们基于容器和 Google Cloud 服务。

本指南介绍如何开始构建机器学习流水线。

我应该使用哪种流水线 SDK?

Vertex AI Pipelines 可以运行使用以下任一 SDK 构建的流水线:

  • Kubeflow Pipelines SDK v1.8 或更高版本(建议使用 v2)

  • TensorFlow Extended v0.30.0 或更高版本

如果您在处理 TB 级结构化数据或文本数据的机器学习工作流中使用 TensorFlow,我们建议您使用 TFX 构建流水线。

对于其他用例,我们建议您使用 Kubeflow Pipelines SDK 来构建流水线。若使用 Kubeflow Pipelines SDK 来构建流水线,您可以通过构建自定义组件或重复使用预构建组件(例如 Google Cloud 流水线组件)来实现工作流。Google Cloud 流水线组件可让您更轻松地在流水线中使用 AutoML 等 Vertex AI 服务。

本指南介绍如何使用 Kubeflow Pipelines SDK 来构建流水线。

准备工作

在构建和运行流水线之前,请按照以下说明设置 Google Cloud 项目和开发环境。

  1. 为了让您的 Google Cloud 项目为运行机器学习流水线做好准备,请按照配置 Google Cloud 项目指南中的说明操作。

  2. 如需使用 Kubeflow Pipelines SDK 构建流水线,请安装 Kubeflow Pipelines SDK v1.8 或更高版本

  3. 如需在流水线中使用 Vertex AI Python 客户端,请安装 Vertex AI 客户端库 v1.7 或更高版本

  4. 如需在流水线中使用 Vertex AI 服务,请安装 Google Cloud 流水线组件 SDK

开始构建流水线

如需在 Vertex AI Pipelines 上编排机器学习工作流,您必须先将工作流描述为流水线。以下示例演示如何将 Google Cloud 流水线组件与 Vertex AI 搭配使用来创建数据集、使用 AutoML 训练模型并部署经过训练的模型以进行预测。

在运行以下代码示例之前,必须先设置身份验证。

如何设置身份验证

如需设置身份验证,您必须创建服务账号密钥,并为服务账号密钥的路径设置环境变量。

  1. 创建服务账号:

    1. 在 Google Cloud 控制台中,转到创建服务账号页面。

      转到“创建服务账号”

    2. 服务账号名称字段中,输入一个名称。
    3. 可选:在服务账号说明字段中,输入说明。
    4. 点击创建
    5. 点击选择角色字段。在所有角色下,选择 Vertex AI > Vertex AI User
    6. 点击完成以创建服务账号。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  2. 创建用于身份验证的服务账号密钥:

    1. 在 Google Cloud 控制台中,点击您创建的服务账号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  3. 向新服务账号授予用于运行流水线的服务账号的访问权限。
    1. 点击 返回到服务账号列表。
    2. 点击用于运行流水线的服务账号的名称。此时会显示服务账号详情页面。

      如果您之前按照为 Vertex AI Pipelines 配置项目指南中的说明操作,则这是您在使用细化权限配置服务账号部分中创建的服务账号。否则,Vertex AI 将使用 Compute Engine 默认服务账号运行流水线。Compute Engine 默认服务账号的名称如下:PROJECT_NUMBER-compute@developer.gserviceaccount.com

    3. 点击权限标签页。
    4. 点击授予使用权限。此时会显示添加主账号面板。
    5. 新的主账号框中,输入您在上一步中创建的服务账号的电子邮件地址。
    6. 角色下拉列表中,选择服务账号 > 服务账号用户
    7. 点击保存
  4. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务账号密钥的 JSON 文件的路径。此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

    示例:Linux 或 macOS

    [PATH] 替换为包含您的服务账号密钥的 JSON 文件的路径。

    export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    例如:

    export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"

    示例:Windows

    [PATH] 替换为包含服务账号密钥的 JSON 文件的路径,将 [FILE_NAME] 替换为文件名。

    使用 PowerShell:

    $env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    例如:

    $env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\[FILE_NAME].json"

    使用命令提示符:

    set GOOGLE_APPLICATION_CREDENTIALS=[PATH]

使用 Kubeflow Pipelines DSL 软件包定义工作流

kfp.dsl 软件包包含网域特定的语言 (DSL),可用于定义流水线和组件并与之交互。

Kubeflow 流水线组件是创建流水线步骤的工厂函数。每个组件都描述了组件的输入、输出和实现。例如,在下面的代码示例中,ds_op 是一个组件。

组件用于创建流水线步骤。当流水线运行时,步骤会在其依赖的数据可用时执行。例如,训练组件可以将 CSV 文件用作输入,并使用它训练模型。

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    # The first step of your workflow is a dataset generator.
    # This step takes a Google Cloud Pipeline Component, providing the necessary
    # input arguments, and uses the Python variable `ds_op` to define its
    # output. Note that here the `ds_op` only stores the definition of the
    # output but not the actual returned object from the execution. The value
    # of the object is not accessible at the dsl.pipeline level, and can only be
    # retrieved by providing it as the input to a downstream component.
    ds_op = ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    # The second step is a model training component. It takes the dataset
    # outputted from the first step, supplies it as an input argument to the
    # component (see `dataset=ds_op.outputs["dataset"]`), and will put its
    # outputs into `training_job_run_op`.
    training_job_run_op = AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    # The third and fourth step are for deploying the model.
    create_endpoint_op = EndpointCreateOp(
        project=project_id,
        display_name = "create-endpoint",
    )

    model_deploy_op = ModelDeployOp(
        model=training_job_run_op.outputs["model"],
        endpoint=create_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    )

请替换以下内容:

  • PROJECT_ID:在其中运行此流水线的 Google Cloud 项目。
  • PIPELINE_ROOT_PATH:指定流水线服务账号可以访问的 Cloud Storage URI。流水线运行的工件存储在流水线根目录中。

    流水线根目录可以在流水线函数上设置为 @kfp.dsl.pipeline 注释的参数,也可以在调用 create_run_from_job_spec 以创建流水线运行时设置。

将流水线编译为 YAML 文件

定义流水线的工作流后,您可以继续将流水线编译为 YAML 格式。YAML 文件包含在 Vertex AI Pipelines 上执行流水线的所有信息。

from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='image_classif_pipeline.yaml'
)

提交流水线运行

将流水线的工作流编译为 YAML 格式后,您可以使用 Vertex AI Python 客户端提交并运行该流水线。

import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=project_id,
    location=PROJECT_REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="image_classif_pipeline.yaml",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    }
)

job.submit()

请替换以下内容:

  • PROJECT_REGION:此流水线运行所在的区域。

在前面的示例中:

  1. Kubeflow 流水线定义为 Python 函数。该函数带有 @kfp.dsl.pipeline 修饰器,后者指定流水线的名称和根路径。流水线根路径是存储流水线工件的位置。
  2. 流水线的工作流步骤是使用 Google Cloud 流水线组件创建的。通过使用组件的输出作为另一个组件的输入,您可以将流水线的工作流定义为图表。例如:training_job_run_op 依赖于 ds_opdataset 输出。
  3. 您可以使用 kfp.compiler.Compiler 来编译流水线。
  4. 您可以使用 Vertex AI Python 客户端创建在 Vertex AI Pipelines 上运行的流水线。运行流水线时,您可以覆盖流水线名称和流水线根路径。可以使用流水线名称对流水线运行进行分组。覆盖流水线名称可以帮助您区分生产和实验性流水线运行。

如需详细了解如何构建流水线,请阅读构建 Kubeflow 流水线部分,并遵循示例和教程进行操作。

构建 Kubeflow 流水线

使用以下过程来构建流水线。

  1. 将流水线设计为一系列组件。为了提升可重用性,每个组件都应承担单一责任。如有可能,请将您的流水线设计为重复使用经过验证的组件,例如 Google Cloud 流水线组件

    详细了解如何设计流水线

  2. 使用 Kubeflow Pipelines SDK 构建实现机器学习工作流所需的任何自定义组件。组件是自包含代码集,用于执行机器学习工作流中的步骤。使用以下选项来创建流水线组件。

    • 将组件的代码打包为容器映像。通过此选项,您可以在流水线中包含用用任何语言编写、可打包为容器映像的代码。

      了解如何构建 Kubeflow 流水线组件

    • 将组件代码实现为独立的 Python 函数,并使用 Kubeflow Pipelines SDK 将函数打包为组件。此选项可让您更轻松地构建基于 Python 的组件。

      了解如何构建基于 Python 的组件

  3. 将流水线构建为 Python 函数。

    详细了解如何将流水线定义为 Python 函数

  4. 使用 Kubeflow Pipelines SDK 编译器来编译流水线。

    from kfp import compiler
    
    compiler.Compiler().compile(
        pipeline_func=PIPELINE_FUNCTION,
        package_path=PIPELINE_PACKAGE_PATH)
    

    请替换以下内容:

    • PIPELINE_FUNCTION:流水线函数的名称。
    • PIPELINE_PACKAGE_PATH:用于存储已编译流水线的路径。
  5. 使用 Google Cloud 控制台或 Python 运行流水线

访问流水线中的 Google Cloud 资源

如果您在运行流水线时未指定服务账号,则 Vertex AI Pipelines 将使用 Compute Engine 默认服务账号来运行流水线。Vertex AI Pipelines 还会使用流水线运行的服务账号来授权您的流水线访问 Google Cloud 资源。默认情况下,Compute Engine 默认服务账号具有 Project Editor 角色。这可能会向您的流水线授予对 Google Cloud 项目中 Google Cloud 资源的过多访问权限。

我们建议您创建服务账号以运行流水线,然后向此账号授予运行流水线所需的对 Google Cloud 资源的精细权限

详细了解如何使用 Identity and Access Management 来创建服务账号管理授予服务账号的访问权限

使流水线保持最新

您用于构建和运行流水线的 SDK 客户端和容器映像会定期更新为新版本,以修补安全漏洞并添加新功能。为了使您的流水线保持最新版本,我们建议您执行以下操作:

后续步骤