为自定义作业创建训练流水线

使用 create_training_pipeline 方法为自定义作业创建训练流水线。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.aiplatform.v1.LocationName;
import com.google.cloud.aiplatform.v1.Model;
import com.google.cloud.aiplatform.v1.ModelContainerSpec;
import com.google.cloud.aiplatform.v1.PipelineServiceClient;
import com.google.cloud.aiplatform.v1.PipelineServiceSettings;
import com.google.cloud.aiplatform.v1.TrainingPipeline;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.Value;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;

public class CreateTrainingPipelineCustomJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "PROJECT";
    String displayName = "DISPLAY_NAME";
    String modelDisplayName = "MODEL_DISPLAY_NAME";
    String containerImageUri = "CONTAINER_IMAGE_URI";
    String baseOutputDirectoryPrefix = "BASE_OUTPUT_DIRECTORY_PREFIX";
    createTrainingPipelineCustomJobSample(
        project, displayName, modelDisplayName, containerImageUri, baseOutputDirectoryPrefix);
  }

  static void createTrainingPipelineCustomJobSample(
      String project,
      String displayName,
      String modelDisplayName,
      String containerImageUri,
      String baseOutputDirectoryPrefix)
      throws IOException {
    PipelineServiceSettings settings =
        PipelineServiceSettings.newBuilder()
            .setEndpoint("us-central1-aiplatform.googleapis.com:443")
            .build();
    String location = "us-central1";

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (PipelineServiceClient client = PipelineServiceClient.create(settings)) {
      JsonObject jsonMachineSpec = new JsonObject();
      jsonMachineSpec.addProperty("machineType", "n1-standard-4");

      // A working docker image can be found at
      // gs://cloud-samples-data/ai-platform/mnist_tfrecord/custom_job
      // This sample image accepts a set of arguments including model_dir.
      JsonObject jsonContainerSpec = new JsonObject();
      jsonContainerSpec.addProperty("imageUri", containerImageUri);
      JsonArray jsonArgs = new JsonArray();
      jsonArgs.add("--model_dir=$(AIP_MODEL_DIR)");
      jsonContainerSpec.add("args", jsonArgs);

      JsonObject jsonJsonWorkerPoolSpec0 = new JsonObject();
      jsonJsonWorkerPoolSpec0.addProperty("replicaCount", 1);
      jsonJsonWorkerPoolSpec0.add("machineSpec", jsonMachineSpec);
      jsonJsonWorkerPoolSpec0.add("containerSpec", jsonContainerSpec);

      JsonArray jsonWorkerPoolSpecs = new JsonArray();
      jsonWorkerPoolSpecs.add(jsonJsonWorkerPoolSpec0);

      JsonObject jsonBaseOutputDirectory = new JsonObject();
      // The GCS location for outputs must be accessible by the project's AI Platform
      // service account.
      jsonBaseOutputDirectory.addProperty("output_uri_prefix", baseOutputDirectoryPrefix);

      JsonObject jsonTrainingTaskInputs = new JsonObject();
      jsonTrainingTaskInputs.add("workerPoolSpecs", jsonWorkerPoolSpecs);
      jsonTrainingTaskInputs.add("baseOutputDirectory", jsonBaseOutputDirectory);

      Value.Builder trainingTaskInputsBuilder = Value.newBuilder();
      JsonFormat.parser().merge(jsonTrainingTaskInputs.toString(), trainingTaskInputsBuilder);
      Value trainingTaskInputs = trainingTaskInputsBuilder.build();
      String trainingTaskDefinition =
          "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml";
      String imageUri = "gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest";
      ModelContainerSpec containerSpec =
          ModelContainerSpec.newBuilder().setImageUri(imageUri).build();
      Model modelToUpload =
          Model.newBuilder()
              .setDisplayName(modelDisplayName)
              .setContainerSpec(containerSpec)
              .build();
      TrainingPipeline trainingPipeline =
          TrainingPipeline.newBuilder()
              .setDisplayName(displayName)
              .setTrainingTaskDefinition(trainingTaskDefinition)
              .setTrainingTaskInputs(trainingTaskInputs)
              .setModelToUpload(modelToUpload)
              .build();
      LocationName parent = LocationName.of(project, location);
      TrainingPipeline response = client.createTrainingPipeline(parent, trainingPipeline);
      System.out.format("response: %s\n", response);
      System.out.format("Name: %s\n", response.getName());
    }
  }
}

Python

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Python 设置说明执行操作。如需了解详情,请参阅 Vertex AI Python API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

def create_training_pipeline_custom_job_sample(
    project: str,
    display_name: str,
    model_display_name: str,
    container_image_uri: str,
    base_output_directory_prefix: str,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.PipelineServiceClient(client_options=client_options)

    training_task_inputs_dict = {
        "workerPoolSpecs": [
            {
                "replicaCount": 1,
                "machineSpec": {"machineType": "n1-standard-4"},
                "containerSpec": {
                    # A working docker image can be found at gs://cloud-samples-data/ai-platform/mnist_tfrecord/custom_job
                    "imageUri": container_image_uri,
                    "args": [
                        # AIP_MODEL_DIR is set by the service according to baseOutputDirectory.
                        "--model_dir=$(AIP_MODEL_DIR)",
                    ],
                },
            }
        ],
        "baseOutputDirectory": {
            # The GCS location for outputs must be accessible by the project's AI Platform service account.
            "output_uri_prefix": base_output_directory_prefix
        },
    }
    training_task_inputs = json_format.ParseDict(training_task_inputs_dict, Value())

    training_task_definition = "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml"
    image_uri = "gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest"

    training_pipeline = {
        "display_name": display_name,
        "training_task_definition": training_task_definition,
        "training_task_inputs": training_task_inputs,
        "model_to_upload": {
            "display_name": model_display_name,
            "container_spec": {"image_uri": image_uri},
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_training_pipeline(
        parent=parent, training_pipeline=training_pipeline
    )
    print("response:", response)

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器