Building a pipeline

Vertex AI Pipelines lets you orchestrate your machine learning (ML) workflows in a serverless manner. Before Vertex AI Pipelines can orchestrate your ML workflow, you must describe your workflow as a pipeline. ML pipelines are portable and scalable ML workflows that are based on containers and Google Cloud services.

This guide describes how to get started building ML pipelines.

Which pipelines SDK should I use?

Vertex AI Pipelines can run pipelines built using the Kubeflow Pipelines SDK v1.8.9 or higher, or TensorFlow Extended v0.30.0 or higher.

  • If you use TensorFlow in an ML workflow that processes terabytes of structured data or text data, we recommend that you build your pipeline using TFX.

  • For other use cases, we recommend that you build your pipeline using the Kubeflow Pipelines SDK. By building a pipeline with the Kubeflow Pipelines SDK, you can implement your workflow by building custom components or reusing prebuilt components, such as the Google Cloud Pipeline Components. Google Cloud Pipeline Components make it easier to use Vertex AI services like AutoML in your pipeline.

This guide describes how to build pipelines using the Kubeflow Pipelines SDK.

Before you begin

Before you build and run your pipelines, use the following instructions to set up your Google Cloud project and development environment.

  1. To get your Cloud project ready to run ML pipelines, follow the instructions in the guide to configuring your Cloud project.

  2. To build your pipeline using the Kubeflow Pipelines SDK, install the Kubeflow Pipelines SDK v1.8.9 or higher.

  3. To use Vertex AI Python client in your pipelines, install the Vertex AI client libraries v1.7 or higher.

  4. To use Vertex AI services in your pipelines, install the Google Cloud pipeline components.

Getting started building a pipeline

To orchestrate your ML workflow on Vertex AI Pipelines, you must first describe your workflow as a pipeline. The following sample demonstrates how to use the Google Cloud pipeline components to use Vertex AI to create a dataset, train a model using AutoML, and deploy the trained model for predictions.

Before you run the following code sample, you must set up authentication.

Define your workflow using Kubeflow Pipelines DSL package

The kfp.dsl package contains the domain-specific language (DSL) that you can use to define and interact with pipelines and components.

Kubeflow pipeline components are factory functions that create pipeline steps. Each component describes the inputs, outputs, and implementation of the component. For example, in the code sample below, ds_op is a component.

Components are used to create pipeline steps. When a pipeline runs, steps are executed as the data they depend on becomes available. For example, a training component could take a CSV file as an input and use it to train a model.

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

project_id = PROJECT_ID
region = REGION
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    # The first step of your workflow is a dataset generator.
    # This step takes a Google Cloud pipeline component, providing the necessary
    # input arguments, and uses the python variable `ds_op` to define its
    # output. Note that here the `ds_op` only stores the definition of the
    # output but not the actual returned object from the execution. The value
    # of the object is not accessible at the dsl.pipeline level, and can only be
    # retrieved by providing it as the input to a downstream component.
    ds_op = gcc_aip.ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    # The second step is a model training component. It takes the dataset
    # outputted from the first step, supplies it as an input argument to the
    # component (see `dataset=ds_op.outputs["dataset"]`), and will put its
    # outputs into `training_job_run_op`.
    training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        base_model=None,
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    # The third and fourth step are for deploying the model.
    create_endpoint_op = gcc_aip.EndpointCreateOp(
        project=project_id,
        display_name = "create-endpoint",
    )

    model_deploy_op = gcc_aip.ModelDeployOp(
        model=training_job_run_op.outputs["model"],
        endpoint=create_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    )

Replace the following:

  • PROJECT_ID: The Google Cloud project that this pipeline runs in.
  • REGION: The region that this pipeline runs in.
  • PIPELINE_ROOT_PATH: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.

    The pipeline root can be set as an argument of the @kfp.dsl.pipeline annotation on the pipeline function, or it can be set when you call create_run_from_job_spec to create a pipeline run.

Compile your pipeline into a JSON file

After the workflow of your pipeline is defined, you can proceed to compile the pipeline into a JSON format. The JSON file will include all the information for executing your pipeline on Vertex Pipelines service.

from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='image_classif_pipeline.json')

Submit your pipeline run

Once the workflow of your pipeline is compiled into the JSON format, you can use the Vertex AI python client to submit and run your pipeline.

import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="image_classif_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    }
)

job.run()

In the preceding example:

  1. A Kubeflow pipeline is defined as a Python function. The function is annotated with the @kfp.dsl.pipeline decorator, which specifies the pipeline's name and root path. The pipeline root path is the location where the pipeline's artifacts are stored.
  2. The pipeline's workflow steps are created using the Google Cloud pipeline components. By using the outputs of a component as an input of another component, you define the pipeline's workflow as a graph. For example: training_job_run_op depends on the dataset output of ds_op.
  3. You compile the pipeline using kfp.v2.compiler.Compiler.
  4. You create a pipeline run on Vertex AI Pipelines using the Vertex AI python client. When you run a pipeline, you can override the pipeline name and the pipeline root path. Pipeline runs can be grouped using the pipeline name. Overriding the pipeline name can help you distinguish between production and experimental pipeline runs.

To learn more about building pipelines, read the building Kubeflow pipelines section, and follow the samples and tutorials.

Building Kubeflow pipelines

Use the following process to build a pipeline.

  1. Design your pipeline as a series of components. To promote reusability, each component should have a single responsibility. Whenever possible, design your pipeline to reuse proven components such as the Google Cloud pipeline components.

    Learn more about designing pipelines.

  2. Build any custom components that are required to implement your ML workflow using Kubeflow Pipelines SDK v2. Components are self-contained sets of code that perform a step in your ML workflow. Use the following options to create your pipeline components.

  3. Build your pipeline as a Python function.

    Learn more about defining your pipeline as a Python function.

  4. Use the Kubeflow Pipelines SDK v2 compiler to compile your pipeline.

    from kfp.v2 import compiler
    
    compiler.Compiler().compile(
        pipeline_func=PIPELINE_FUNCTION,
        package_path=PIPELINE_PACKAGE_PATH)
    

    Replace the following:

    • PIPELINE_FUNCTION: The name of your pipeline's function.
    • PIPELINE_PACKAGE_PATH: The path to where to store your compiled pipeline.
  5. Run your pipeline using Google Cloud Console or Python.

Accessing Google Cloud resources in a pipeline

If you do not specify a service account when you run a pipeline, Vertex AI Pipelines uses the Compute Engine default service account to run your pipeline. Vertex AI Pipelines also uses a pipeline run's service account to authorize your pipeline to access Google Cloud resources. The Compute Engine default service account has the Project Editor role by default. This may grant your pipelines excessive access to Google Cloud resources in your Google Cloud project.

We recommend that you create a service account to run your pipelines and then grant this account granular permissions to the Google Cloud resources that are needed to run your pipeline.

Learn more about using Identity and Access Management to create a service account and manage the access granted to a service account.

What's next