Build a pipeline

Vertex AI Pipelines lets you orchestrate your machine learning (ML) workflows in a serverless manner. Before Vertex AI Pipelines can orchestrate your ML workflow, you must describe your workflow as a pipeline. ML pipelines are portable and scalable ML workflows that are based on containers and Google Cloud services.

This guide describes how to get started building ML pipelines.

Which pipelines SDK should I use?

Vertex AI Pipelines can run pipelines built using any of the following SDKs:

  • Kubeflow Pipelines SDK v1.8 or later (v2 is recommended)

  • TensorFlow Extended v0.30.0 or later

If you use TensorFlow in an ML workflow that processes terabytes of structured data or text data, we recommend that you build your pipeline using TFX.

For other use cases, we recommend that you build your pipeline using the Kubeflow Pipelines SDK. By building a pipeline with the Kubeflow Pipelines SDK, you can implement your workflow by building custom components or reusing prebuilt components, such as the Google Cloud Pipeline Components. Google Cloud Pipeline Components make it easier to use Vertex AI services like AutoML in your pipeline.

This guide describes how to build pipelines using the Kubeflow Pipelines SDK.

Before you begin

Before you build and run your pipelines, use the following instructions to set up your Google Cloud project and development environment.

  1. To get your Google Cloud project ready to run ML pipelines, follow the instructions in the guide to configuring your Google Cloud project.

  2. Install v2 or later of the Kubeflow Pipelines SDK.

    pip install --upgrade kfp>=2,<3
    
  1. To use Vertex AI Python client in your pipelines, install the Vertex AI client libraries v1.7 or later.

  2. To use Vertex AI services in your pipelines, install the Google Cloud Pipeline Components SDK.

Getting started building a pipeline

To orchestrate your ML workflow on Vertex AI Pipelines, you must first describe your workflow as a pipeline. The following sample demonstrates how to use the Google Cloud Pipeline Components with Vertex AI to create a dataset, train a model using AutoML, and deploy the trained model for predictions.

Before you run the following code sample, you must set up authentication.

How to set up authentication

To set up authentication, you must create a service account key, and set an environment variable for the path to the service account key.

  1. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account

    2. In the Service account name field, enter a name.
    3. Optional: In the Service account description field, enter a description.
    4. Click Create.
    5. Click the Select a role field. Under All roles, select Vertex AI > Vertex AI User.
    6. Click Done to create the service account.

      Do not close your browser window. You will use it in the next step.

  2. Create a service account key for authentication:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  3. Grant your new service account access to the service account that you use to run pipelines.
    1. Click to return to the list of service accounts.
    2. Click the name of the service account that you use to run pipelines. The Service account details page appears.

      If you followed the instructions in the guide to configuring your project for Vertex AI Pipelines, this is the same service account that you created in the Configure a service account with granular permissions section. Otherwise, Vertex AI uses the Compute Engine default service account to run pipelines. The Compute Engine default service account is named like the following: PROJECT_NUMBER-compute@developer.gserviceaccount.com

    3. Click the Permissions tab.
    4. Click Grant access. The Add principals panel appears.
    5. In the New principals box, enter the email address for the service account you created in a previous step.
    6. In the Role drop-down list, select Service accounts > Service account user.
    7. Click Save
  4. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

    Example: Linux or macOS

    Replace [PATH] with the path of the JSON file that contains your service account key.

    export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    For example:

    export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"

    Example: Windows

    Replace [PATH] with the path of the JSON file that contains your service account key, and [FILE_NAME] with the filename.

    With PowerShell:

    $env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    For example:

    $env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\[FILE_NAME].json"

    With command prompt:

    set GOOGLE_APPLICATION_CREDENTIALS=[PATH]

Define your workflow using Kubeflow Pipelines DSL package

The kfp.dsl package contains the domain-specific language (DSL) that you can use to define and interact with pipelines and components.

Kubeflow pipeline components are factory functions that create pipeline steps. Each component describes the inputs, outputs, and implementation of the component. For example, in the code sample below, ds_op is a component.

Components are used to create pipeline steps. When a pipeline runs, steps are executed as the data they depend on becomes available. For example, a training component could take a CSV file as an input and use it to train a model.

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    # The first step of your workflow is a dataset generator.
    # This step takes a Google Cloud Pipeline Component, providing the necessary
    # input arguments, and uses the Python variable `ds_op` to define its
    # output. Note that here the `ds_op` only stores the definition of the
    # output but not the actual returned object from the execution. The value
    # of the object is not accessible at the dsl.pipeline level, and can only be
    # retrieved by providing it as the input to a downstream component.
    ds_op = ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    # The second step is a model training component. It takes the dataset
    # outputted from the first step, supplies it as an input argument to the
    # component (see `dataset=ds_op.outputs["dataset"]`), and will put its
    # outputs into `training_job_run_op`.
    training_job_run_op = AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    # The third and fourth step are for deploying the model.
    create_endpoint_op = EndpointCreateOp(
        project=project_id,
        display_name = "create-endpoint",
    )

    model_deploy_op = ModelDeployOp(
        model=training_job_run_op.outputs["model"],
        endpoint=create_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    )

Replace the following:

  • PROJECT_ID: The Google Cloud project that this pipeline runs in.
  • PIPELINE_ROOT_PATH: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.

    The pipeline root can be set as an argument of the @kfp.dsl.pipeline annotation on the pipeline function, or it can be set when you call create_run_from_job_spec to create a pipeline run.

Compile your pipeline into a YAML file

After the workflow of your pipeline is defined, you can proceed to compile the pipeline into YAML format. The YAML file includes all the information for executing your pipeline on Vertex AI Pipelines.

from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='image_classif_pipeline.yaml'
)

Submit your pipeline run

After the workflow of your pipeline is compiled into the YAML format, you can use the Vertex AI Python client to submit and run your pipeline.

import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=project_id,
    location=PROJECT_REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="image_classif_pipeline.yaml",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    }
)

job.submit()

Replace the following:

  • PROJECT_REGION: The region that this pipeline runs in.

In the preceding example:

  1. A Kubeflow pipeline is defined as a Python function. The function is annotated with the @kfp.dsl.pipeline decorator, which specifies the pipeline's name and root path. The pipeline root path is the location where the pipeline's artifacts are stored.
  2. The pipeline's workflow steps are created using the Google Cloud Pipeline Components. By using the outputs of a component as an input of another component, you define the pipeline's workflow as a graph. For example: training_job_run_op depends on the dataset output of ds_op.
  3. You compile the pipeline using kfp.compiler.Compiler.
  4. You create a pipeline run on Vertex AI Pipelines using the Vertex AI Python client. When you run a pipeline, you can override the pipeline name and the pipeline root path. Pipeline runs can be grouped using the pipeline name. Overriding the pipeline name can help you distinguish between production and experimental pipeline runs.

To learn more about building pipelines, read the building Kubeflow pipelines section, and follow the samples and tutorials.

Test a pipeline locally (optional)

After you define your pipelines and components, you can test the component code by executing the code in your local authoring environment. By executing your pipeline or a component locally, you can identify and debug potential issues before you create a pipeline run in a remote environment, such as Vertex AI Pipelines. For more information about locally executing pipelines and components, see Local execution in the KFP documentation.

This page shows you how to define and run a pipeline that consists of two tasks.

Set up your local environment

  1. Optional: Install Docker.

  2. Use the following code sample to define a simple pipeline:

    from kfp import dsl
    
    # Define a component to add two numbers.
    @dsl.component
    def add(a: int, b: int) -> int:
        return a + b
    
    # Define a simple pipeline using the component.
    @dsl.pipeline
    def addition_pipeline(x: int, y: int, z: int) -> int:
        task1 = add(a=x, b=y)
        task2 = add(a=task1.output, b=z)
        return task2.output
    

Invoke a local execution

Initialize a local session using the local.init() function. When you use local.init(), the KFP SDK locally executes your pipelines and components when you call them.

When you use local.init(), you must specify a runner type. The runner type indicates how KFP should run each task.

Use the following sample to specify the DockerRunner runner type for running each task in a container. For more information about local runners supported by KFP, see Local runners in the KFP documentation.

from kfp import local

local.init(runner=local.DockerRunner())

pipeline_task = addition_pipeline(x=1, y=2, z=3)

Use the following code to view the output of the pipeline task upon local execution:

print(f'Result: {pipeline_task.output}')

Building Kubeflow pipelines

Use the following process to build a pipeline.

  1. Design your pipeline as a series of components. To promote reusability, each component should have a single responsibility. Whenever possible, design your pipeline to reuse proven components such as the Google Cloud Pipeline Components.

    Learn more about designing pipelines.

  2. Build any custom components that are required to implement your ML workflow using Kubeflow Pipelines SDK. Components are self-contained sets of code that perform a step in your ML workflow. Use the following options to create your pipeline components.

  3. Build your pipeline as a Python function.

    Learn more about defining your pipeline as a Python function.

  4. Use the Kubeflow Pipelines SDK compiler to compile your pipeline.

    from kfp import compiler
    
    compiler.Compiler().compile(
        pipeline_func=PIPELINE_FUNCTION,
        package_path=PIPELINE_PACKAGE_PATH)
    

    Replace the following:

    • PIPELINE_FUNCTION: The name of your pipeline's function.
    • PIPELINE_PACKAGE_PATH: The path to where to store your compiled pipeline.
  5. Run your pipeline using Google Cloud console or Python.

Accessing Google Cloud resources in a pipeline

If you do not specify a service account when you run a pipeline, Vertex AI Pipelines uses the Compute Engine default service account to run your pipeline. Vertex AI Pipelines also uses a pipeline run's service account to authorize your pipeline to access Google Cloud resources. The Compute Engine default service account has the Project Editor role by default. This may grant your pipelines excessive access to Google Cloud resources in your Google Cloud project.

We recommend that you create a service account to run your pipelines and then grant this account granular permissions to the Google Cloud resources that are needed to run your pipeline.

Learn more about using Identity and Access Management to create a service account and manage the access granted to a service account.

Keep your pipelines up-to-date

The SDK clients and container images that you use to build and run pipelines are periodically updated to new versions to patch security vulnerabilities and add new functionality. To keep your pipelines up to date with the latest version, we recommend that you do the following:

What's next