Vertex AI Pipelines lets you orchestrate your machine learning (ML) workflows in a serverless manner. Before Vertex AI Pipelines can orchestrate your ML workflow, you must describe your workflow as a pipeline. ML pipelines are portable and scalable ML workflows that are based on containers and Google Cloud services.
This guide describes how to get started building ML pipelines.
Which pipelines SDK should I use?
Vertex AI Pipelines can run pipelines built using any of the following SDKs:
Kubeflow Pipelines SDK v1.8 or later (v2 is recommended)
TensorFlow Extended v0.30.0 or later
If you use TensorFlow in an ML workflow that processes terabytes of structured data or text data, we recommend that you build your pipeline using TFX.
- To learn more about building a TFX pipeline, follow the TFX getting started tutorials.
- To learn more about using Vertex AI Pipelines to run a TFX pipeline, follow the TFX on Google Cloud tutorials.
For other use cases, we recommend that you build your pipeline using the Kubeflow Pipelines SDK. By building a pipeline with the Kubeflow Pipelines SDK, you can implement your workflow by building custom components or reusing prebuilt components, such as the Google Cloud Pipeline Components. Google Cloud Pipeline Components make it easier to use Vertex AI services like AutoML in your pipeline.
This guide describes how to build pipelines using the Kubeflow Pipelines SDK.
Before you begin
Before you build and run your pipelines, use the following instructions to set up your Google Cloud project and development environment.
To get your Google Cloud project ready to run ML pipelines, follow the instructions in the guide to configuring your Google Cloud project.
To build your pipeline using the Kubeflow Pipelines SDK, install the Kubeflow Pipelines SDK v1.8 or later.
To use Vertex AI Python client in your pipelines, install the Vertex AI client libraries v1.7 or later.
To use Vertex AI services in your pipelines, install the Google Cloud Pipeline Components SDK.
Getting started building a pipeline
To orchestrate your ML workflow on Vertex AI Pipelines, you must first describe your workflow as a pipeline. The following sample demonstrates how to use the Google Cloud Pipeline Components with Vertex AI to create a dataset, train a model using AutoML, and deploy the trained model for predictions.
Before you run the following code sample, you must set up authentication.
How to set up authentication
To set up authentication, you must create a service account key, and set an environment variable for the path to the service account key.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
- In the Service account name field, enter a name.
- Optional: In the Service account description field, enter a description.
- Click Create.
- Click the Select a role field. Under All roles, select Vertex AI > Vertex AI User.
-
-
Click Done to create the service account.
Do not close your browser window. You will use it in the next step.
Create a service account key for authentication:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, then Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
- Click to return to the list of service accounts.
Click the name of the service account that you use to run pipelines. The Service account details page appears.
If you followed the instructions in the guide to configuring your project for Vertex AI Pipelines, this is the same service account that you created in the Configure a service account with granular permissions section. Otherwise, Vertex AI uses the Compute Engine default service account to run pipelines. The Compute Engine default service account is named like the following:
PROJECT_NUMBER-compute@developer.gserviceaccount.com
- Click the Permissions tab.
- Click Grant access. The Add principals panel appears.
- In the New principals box, enter the email address for the service account you created in a previous step.
- In the Role drop-down list, select Service accounts > Service account user.
- Click Save
Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.
Example: Linux or macOS
Replace [PATH] with the path of the JSON file that contains your service account key.
export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"
For example:
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"
Example: Windows
Replace [PATH] with the path of the JSON file that contains your service account key, and [FILE_NAME] with the filename.
With PowerShell:
$env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"
For example:
$env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\[FILE_NAME].json"
With command prompt:
set GOOGLE_APPLICATION_CREDENTIALS=[PATH]
Define your workflow using Kubeflow Pipelines DSL package
The kfp.dsl
package contains the domain-specific language (DSL) that you can
use to define and interact with pipelines and components.
Kubeflow pipeline components are factory functions that create pipeline
steps. Each component describes the inputs, outputs, and implementation of the
component. For example, in the code sample below, ds_op
is a component.
Components are used to create pipeline steps. When a pipeline runs, steps are executed as the data they depend on becomes available. For example, a training component could take a CSV file as an input and use it to train a model.
import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
name="automl-image-training-v2",
pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
# The first step of your workflow is a dataset generator.
# This step takes a Google Cloud Pipeline Component, providing the necessary
# input arguments, and uses the Python variable `ds_op` to define its
# output. Note that here the `ds_op` only stores the definition of the
# output but not the actual returned object from the execution. The value
# of the object is not accessible at the dsl.pipeline level, and can only be
# retrieved by providing it as the input to a downstream component.
ds_op = ImageDatasetCreateOp(
project=project_id,
display_name="flowers",
gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
)
# The second step is a model training component. It takes the dataset
# outputted from the first step, supplies it as an input argument to the
# component (see `dataset=ds_op.outputs["dataset"]`), and will put its
# outputs into `training_job_run_op`.
training_job_run_op = AutoMLImageTrainingJobRunOp(
project=project_id,
display_name="train-iris-automl-mbsdk-1",
prediction_type="classification",
model_type="CLOUD",
dataset=ds_op.outputs["dataset"],
model_display_name="iris-classification-model-mbsdk",
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=8000,
)
# The third and fourth step are for deploying the model.
create_endpoint_op = EndpointCreateOp(
project=project_id,
display_name = "create-endpoint",
)
model_deploy_op = ModelDeployOp(
model=training_job_run_op.outputs["model"],
endpoint=create_endpoint_op.outputs['endpoint'],
automatic_resources_min_replica_count=1,
automatic_resources_max_replica_count=1,
)
Replace the following:
- PROJECT_ID: The Google Cloud project that this pipeline runs in.
PIPELINE_ROOT_PATH: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
The pipeline root can be set as an argument of the
@kfp.dsl.pipeline
annotation on the pipeline function, or it can be set when you callcreate_run_from_job_spec
to create a pipeline run.
Compile your pipeline into a YAML file
After the workflow of your pipeline is defined, you can proceed to compile the pipeline into YAML format. The YAML file includes all the information for executing your pipeline on Vertex AI Pipelines.
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path='image_classif_pipeline.yaml'
)
Submit your pipeline run
After the workflow of your pipeline is compiled into the YAML format, you can use the Vertex AI Python client to submit and run your pipeline.
import google.cloud.aiplatform as aip
# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
project=project_id,
location=PROJECT_REGION,
)
# Prepare the pipeline job
job = aip.PipelineJob(
display_name="automl-image-training-v2",
template_path="image_classif_pipeline.yaml",
pipeline_root=pipeline_root_path,
parameter_values={
'project_id': project_id
}
)
job.submit()
Replace the following:
- PROJECT_REGION: The region that this pipeline runs in.
In the preceding example:
- A Kubeflow pipeline is defined as a Python function.
The function is annotated with the
@kfp.dsl.pipeline
decorator, which specifies the pipeline's name and root path. The pipeline root path is the location where the pipeline's artifacts are stored. - The pipeline's workflow steps are created using the Google Cloud Pipeline Components. By using the outputs of a component
as an input of another component, you define the pipeline's workflow as a
graph. For example:
training_job_run_op
depends on thedataset
output ofds_op
. - You compile the pipeline using
kfp.compiler.Compiler
. - You create a pipeline run on Vertex AI Pipelines using the Vertex AI Python client. When you run a pipeline, you can override the pipeline name and the pipeline root path. Pipeline runs can be grouped using the pipeline name. Overriding the pipeline name can help you distinguish between production and experimental pipeline runs.
To learn more about building pipelines, read the building Kubeflow pipelines section, and follow the samples and tutorials.
Test a pipeline locally (optional)
After you define your pipelines and components, you can test the component code by executing the code in your local authoring environment. By executing your pipeline or a component locally, you can identify and debug potential issues before you create a pipeline run in a remote environment, such as Vertex AI Pipelines. For more information about locally executing pipelines and components, see Local execution in the KFP documentation.
This page shows you how to define and run a pipeline that consists of two tasks.
Set up your local environment
Install the latest minor version of the Kubeflow Pipelines (KFP) SDK v2:
pip install --upgrade kfp>=2,<3
Optional: Install Docker.
Use the following code sample to define a simple pipeline:
from kfp import dsl # Define a component to add two numbers. @dsl.component def add(a: int, b: int) -> int: return a + b # Define a simple pipeline using the component. @dsl.pipeline def addition_pipeline(x: int, y: int, z: int) -> int: task1 = add(a=x, b=y) task2 = add(a=task1.output, b=z) return task2.output
Invoke a local execution
Initialize a local session using the local.init()
function. When you use
local.init()
, the KFP SDK locally executes your pipelines and components when
you call them.
When you use local.init()
, you must specify a runner type. The runner type
indicates how KFP should run each task.
Use the following sample to specify the DockerRunner
runner type for running each task in a container. For more information about local runners supported
by KFP, see Local runners
in the KFP documentation.
from kfp import local
local.init(runner=local.DockerRunner())
pipeline_task = addition_pipeline(x=1, y=2, z=3)
Use the following code to view the output of the pipeline task upon local execution:
print(f'Result: {pipeline_task.output}')
Building Kubeflow pipelines
Use the following process to build a pipeline.
Design your pipeline as a series of components. To promote reusability, each component should have a single responsibility. Whenever possible, design your pipeline to reuse proven components such as the Google Cloud Pipeline Components.
Build any custom components that are required to implement your ML workflow using Kubeflow Pipelines SDK. Components are self-contained sets of code that perform a step in your ML workflow. Use the following options to create your pipeline components.
Package your component's code as a container image. This option lets you include code in your pipeline that was written in any language that can be packaged as a container image.
Implement your component's code as a standalone Python function and use the Kubeflow Pipelines SDK to package your function as a component. This option makes it easier to build Python-based components.
Build your pipeline as a Python function.
Learn more about defining your pipeline as a Python function.
Use the Kubeflow Pipelines SDK compiler to compile your pipeline.
from kfp import compiler compiler.Compiler().compile( pipeline_func=PIPELINE_FUNCTION, package_path=PIPELINE_PACKAGE_PATH)
Replace the following:
- PIPELINE_FUNCTION: The name of your pipeline's function.
- PIPELINE_PACKAGE_PATH: The path to where to store your compiled pipeline.
Accessing Google Cloud resources in a pipeline
If you do not specify a service account when you run a pipeline, Vertex AI Pipelines uses the Compute Engine default service account to run your pipeline. Vertex AI Pipelines also uses a pipeline run's service account to authorize your pipeline to access Google Cloud resources. The Compute Engine default service account has the Project Editor role by default. This may grant your pipelines excessive access to Google Cloud resources in your Google Cloud project.
We recommend that you create a service account to run your pipelines and then grant this account granular permissions to the Google Cloud resources that are needed to run your pipeline.
Learn more about using Identity and Access Management to create a service account and manage the access granted to a service account.
Keep your pipelines up-to-date
The SDK clients and container images that you use to build and run pipelines are periodically updated to new versions to patch security vulnerabilities and add new functionality. To keep your pipelines up to date with the latest version, we recommend that you do the following:
Review the Vertex AI framework support policy and Supported frameworks list.
Subscribe to the Vertex AI release notes and the PyPi.org RSS feeds for SDKs you use (Kubeflow Pipelines SDK, Google Cloud Pipeline Components SDK, or TensorFlow Extended SDK) to stay aware of new releases.
If you have a pipeline template or definition that references a container with security vulnerabilities, you should do the following:
Install the latest patched version of the SDK.
Rebuild and recompile your pipeline template or definition.
Re-upload the template or definition to Artifact Registry or Cloud Storage.
What's next
- Read the introduction to Vertex AI Pipelines to learn more about orchestrating ML workflows.
- Learn how to run a pipeline.
- Visualize and analyze the results of your pipeline runs.