Agent templates in Agent Engine are defined as Python classes. The following steps show you how to create a custom template for instantiating agents that are deployable on Vertex AI:
- Basic example
- (Optional) Stream responses
- (Optional) Register custom methods
- (Optional) Provide type annotations
- (Optional) Tracing using OpenTelemetry
- (Optional) Working with environment variables
- (Optional) Integrating with Secret Manager
- (Optional) Handling credentials
Basic example
To give a basic example, the following Python class is a template for
instantiating agents that are deployable on Vertex AI (you can give the
CLASS_NAME
variable a value such as MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Deployment considerations
When writing your Python class, the following three methods are important:
__init__()
:- Use this method for only agent configuration parameters. For example, you can use this method to collect the model parameters and safety attributes as input arguments from your users. You can also use this method to collect parameters such as the project ID, the region, application credentials, and API keys.
- The constructor returns an object that must be "pickle-able" for
it to be deployable to Agent Engine. Therefore, you should initialize
service clients and establish connections to databases in the
.set_up
method instead of the__init__
method. - This method is optional. If it's not specified, Vertex AI uses the default Python constructor for the class.
set_up()
:- You must use this method to define agent initialization logic. For example, you use this method to establish connections to databases or dependent services, import dependent packages, or precompute data that's used for serving queries.
- This method is optional. If it's not specified, Vertex AI assumes
that the agent doesn't need to call a
.set_up
method before serving user queries.
query()
/stream_query()
:- Use
query()
to return the complete response as a single result. - Use
stream_query()
to return the response in chunks as it becomes available, enabling a streaming experience. Thestream_query
method must return an iterable object (for example a generator) to enable streaming. - You can implement both methods if you want to support both single-response and streaming interactions with your agent.
- You should give this method a clear docstring that defines what it does,
documents its attributes, and provides type annotations for its inputs.
Avoid variable arguments in the
query
andstream_query
method.
- Use
Instantiate the agent locally
You can create a local instance of your agent using the following code:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Test the query
method
You can test the agent by sending queries to the local instance:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
The response is a dictionary that's similar to the following:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Streaming responses
To stream responses to queries, you can define a method named stream_query
that yields responses. As an example, the following template extends the basic
example to stream responses and is deployable on Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
Here are some key things to keep in mind when using the streaming API:
- Maximum timeout: The maximum timeout for streaming responses is 10 minutes. If your agent requires longer processing times, consider breaking down the task into smaller chunks.
- Streaming models and chains: LangChain's Runnable interface supports streaming, so you can stream responses from not only agents, but also models and chains.
- LangChain compatibility: Note that asynchronous methods such as
LangChain's
astream_event
method isn't supported at the moment. - Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), you should throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.
Test the stream_query
method
You can test the streaming query locally by calling the stream_query
method
and iterating through the results. Here's an example:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
This code prints each chunk of the response as it's generated. The output might look something like this:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In this example, each chunk contains different information about the response, such as the actions taken by the agent, the messages exchanged, and the final output.
Registering custom methods
By default, the methods query
and stream_query
are registered as operations
in the deployed agent.
You can override the default behavior and define the set of operations to be
registered using the register_operations
method.
Operations can be registered as either standard (represented by an empty string
""
) or streaming ("stream"
) execution modes.
To register multiple operations, you can define a method named
register_operations
that lists the methods to be made available to users when
the agent is deployed. In the following example code, the register_operations
method will result in the deployed agent registering query
and get_state
as
operations that run synchronously, and stream_query
and get_state_history
as
operations that streams the responses:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
You can test the custom methods by calling them directly on the local instance
of the agent, similarly to how you would test the query
and
stream_query
methods.
Providing Type Annotations
You can use type annotations to specify the expected input and output types of
your agent methods. When the agent is deployed, only JSON-serializable types are
supported in the input and output of the operations supported by the agent. The
schemas of the inputs and outputs can be annotated using TypedDict
or Pydantic
models.
In the following example, we annotate the input as a TypedDict
, and convert
the raw output from .get_state
(which is a NamedTuple
) into a serializable
dictionary using its ._asdict()
method:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Tracing using OpenTelemetry
To enable tracing with instrumentation libraries that support OpenTelemetry, you
can import and initialize them in the .set_up
method.
As an example, the following template is a modification of the basic example to export traces to Cloud Trace:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Working with Environment Variables
To set environment variables in a manner that works when the agent is deployed,
you will set them inside the .set_up
method, for example:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Integrating with Secret Manager
To integrate with Secret Manager:
Install the client library by running
pip install google-cloud-secret-manager
Follow the instructions in Grant roles for a deployed agent to grant the Reasoning Engine service agent the "Secret Manager Secret Accessor" role (
roles/secretmanager.secretAccessor
) through the Google Cloud console.Import and initialize the client in the
.set_up
method and get the corresponding secret when needed. As an example, the following template is a modification of the basic example to use an API key forChatAnthropic
that was stored in Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Handling credentials
When the agent is deployed, it might need to handle different types of credentials:
- Application default credentials (ADC) commonly arising from service accounts,
- OAuth commonly arising from user accounts, and
- Identity providers for credentials from external accounts (workload identity federation).
Application default credentials
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
It can be used in code in the following way:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
For details, visit How Application Default Credentials works.
OAuth
User credentials are typically obtained using OAuth 2.0.
If you have an access token (e.g. from oauthlib
),
you can create a google.oauth2.credentials.Credentials
instance. In addition,
if you obtain a refresh token, you can also specify the refresh token and token
URI to allow the credentials to be automatically refreshed:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Here, the TOKEN_URI
, CLIENT_ID
and
CLIENT_SECRET
are based on Create an OAuth client credential.
If you don't have an access token, you can use google_auth_oauthlib.flow
to
perform the OAuth 2.0 Authorization Grant Flow
to obtain a corresponding google.oauth2.credentials.Credentials
instance:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
For details, visit the documentation for the google_auth_oauthlib.flow
module.
Identity Provider
If you want to authenticate users using email/password, phone number, social providers like Google, Facebook or GitHub, or a custom authentication mechanism, you can use Identity Platform or Firebase Authentication, or any identity provider that supports OpenID Connect (OIDC).
For details, visit Accessing resources from an OIDC identity provider.