Develop a custom agent

Agent templates in Agent Engine are defined as Python classes. The following steps show you how to create a custom template for instantiating agents that are deployable on Vertex AI:

  1. Basic example
  2. (Optional) Stream responses
  3. (Optional) Register custom methods
  4. (Optional) Provide type annotations
  5. (Optional) Tracing using OpenTelemetry
  6. (Optional) Working with environment variables
  7. (Optional) Integrating with Secret Manager
  8. (Optional) Handling credentials

Basic example

To give a basic example, the following Python class is a template for instantiating agents that are deployable on Vertex AI (you can give the CLASS_NAME variable a value such as MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Deployment considerations

When writing your Python class, the following three methods are important:

  1. __init__():
    • Use this method for only agent configuration parameters. For example, you can use this method to collect the model parameters and safety attributes as input arguments from your users. You can also use this method to collect parameters such as the project ID, the region, application credentials, and API keys.
    • The constructor returns an object that must be "pickle-able" for it to be deployable to Agent Engine. Therefore, you should initialize service clients and establish connections to databases in the .set_up method instead of the __init__ method.
    • This method is optional. If it's not specified, Vertex AI uses the default Python constructor for the class.
  2. set_up():
    • You must use this method to define agent initialization logic. For example, you use this method to establish connections to databases or dependent services, import dependent packages, or precompute data that's used for serving queries.
    • This method is optional. If it's not specified, Vertex AI assumes that the agent doesn't need to call a .set_up method before serving user queries.
  3. query() / stream_query():
    • Use query() to return the complete response as a single result.
    • Use stream_query() to return the response in chunks as it becomes available, enabling a streaming experience. The stream_query method must return an iterable object (for example a generator) to enable streaming.
    • You can implement both methods if you want to support both single-response and streaming interactions with your agent.
    • You should give this method a clear docstring that defines what it does, documents its attributes, and provides type annotations for its inputs. Avoid variable arguments in the query and stream_query method.

Instantiate the agent locally

You can create a local instance of your agent using the following code:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Test the query method

You can test the agent by sending queries to the local instance:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

The response is a dictionary that's similar to the following:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Streaming responses

To stream responses to queries, you can define a method named stream_query that yields responses. As an example, the following template extends the basic example to stream responses and is deployable on Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

Here are some key things to keep in mind when using the streaming API:

  • Maximum timeout: The maximum timeout for streaming responses is 10 minutes. If your agent requires longer processing times, consider breaking down the task into smaller chunks.
  • Streaming models and chains: LangChain's Runnable interface supports streaming, so you can stream responses from not only agents, but also models and chains.
  • LangChain compatibility: Note that asynchronous methods such as LangChain's astream_event method isn't supported at the moment.
  • Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), you should throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.

Test the stream_query method

You can test the streaming query locally by calling the stream_query method and iterating through the results. Here's an example:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

This code prints each chunk of the response as it's generated. The output might look something like this:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

In this example, each chunk contains different information about the response, such as the actions taken by the agent, the messages exchanged, and the final output.

Registering custom methods

By default, the methods query and stream_query are registered as operations in the deployed agent. You can override the default behavior and define the set of operations to be registered using the register_operations method. Operations can be registered as either standard (represented by an empty string "") or streaming ("stream") execution modes.

To register multiple operations, you can define a method named register_operations that lists the methods to be made available to users when the agent is deployed. In the following example code, the register_operations method will result in the deployed agent registering query and get_state as operations that run synchronously, and stream_query and get_state_history as operations that streams the responses:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

You can test the custom methods by calling them directly on the local instance of the agent, similarly to how you would test the query and stream_query methods.

Providing Type Annotations

You can use type annotations to specify the expected input and output types of your agent methods. When the agent is deployed, only JSON-serializable types are supported in the input and output of the operations supported by the agent. The schemas of the inputs and outputs can be annotated using TypedDict or Pydantic models.

In the following example, we annotate the input as a TypedDict, and convert the raw output from .get_state (which is a NamedTuple) into a serializable dictionary using its ._asdict() method:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Tracing using OpenTelemetry

To enable tracing with instrumentation libraries that support OpenTelemetry, you can import and initialize them in the .set_up method.

As an example, the following template is a modification of the basic example to export traces to Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Working with Environment Variables

To set environment variables in a manner that works when the agent is deployed, you will set them inside the .set_up method, for example:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Integrating with Secret Manager

To integrate with Secret Manager:

  1. Install the client library by running

    pip install google-cloud-secret-manager
  2. Follow the instructions in Grant roles for a deployed agent to grant the Reasoning Engine service agent the "Secret Manager Secret Accessor" role (roles/secretmanager.secretAccessor) through the Google Cloud console.

  3. Import and initialize the client in the .set_up method and get the corresponding secret when needed. As an example, the following template is a modification of the basic example to use an API key for ChatAnthropic that was stored in Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Handling credentials

When the agent is deployed, it might need to handle different types of credentials:

  1. Application default credentials (ADC) commonly arising from service accounts,
  2. OAuth commonly arising from user accounts, and
  3. Identity providers for credentials from external accounts (workload identity federation).

Application default credentials

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

It can be used in code in the following way:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

For details, visit How Application Default Credentials works.

OAuth

User credentials are typically obtained using OAuth 2.0.

If you have an access token (e.g. from oauthlib), you can create a google.oauth2.credentials.Credentials instance. In addition, if you obtain a refresh token, you can also specify the refresh token and token URI to allow the credentials to be automatically refreshed:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Here, the TOKEN_URI, CLIENT_ID and CLIENT_SECRET are based on Create an OAuth client credential.

If you don't have an access token, you can use google_auth_oauthlib.flow to perform the OAuth 2.0 Authorization Grant Flow to obtain a corresponding google.oauth2.credentials.Credentials instance:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

For details, visit the documentation for the google_auth_oauthlib.flow module.

Identity Provider

If you want to authenticate users using email/password, phone number, social providers like Google, Facebook or GitHub, or a custom authentication mechanism, you can use Identity Platform or Firebase Authentication, or any identity provider that supports OpenID Connect (OIDC).

For details, visit Accessing resources from an OIDC identity provider.

What's next