reasoning_engines.LangchainAgent
) for
developing an application. In this section, we go through the steps to customize
your own application template. This might be useful if you have needs that go
beyond what the prebuilt template provides.
An application template in Reasoning Engine is defined as a Python class. To
give an example, the following Python code is an example of a LangChain
application that is deployable on Vertex AI (you can give the
CLASS_NAME
variable a value such as MyAgent
):
from typing import Any, Callable, Iterable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
"""All unpickle-able logic should go here.
The .set_up() method should not be called for an object that is being
prepared for deployment.
"""
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
from langchain.tools.base import StructuredTool
from langchain_core import prompts
vertexai.init(project=self.project, location=self.location)
prompt = {
"input": lambda x: x["input"],
"agent_scratchpad": (
lambda x: format_to_tool_messages(x["intermediate_steps"])
),
} | prompts.ChatPromptTemplate.from_messages([
("user", "{input}"),
prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatVertexAI(model_name=self.model_name)
if self.tools:
llm = llm.bind_tools(tools=self.tools)
self.agent_executor = AgentExecutor(
agent=prompt | llm | ToolsAgentOutputParser(),
tools=[StructuredTool.from_function(tool) for tool in self.tools],
)
def query(self, input: str):
"""Query the application.
Args:
input: The user prompt.
Returns:
The output of querying the application with the given input.
"""
return self.agent_executor.invoke(input={"input": input})
def stream_query(self, input: str) -> Iterable[Any]:
"""Query the application and stream the output.
Args:
input: The user prompt.
Yields:
Chunks of the response as they become available.
"""
for chunk in self.agent_executor.stream(input={"input": input}):
yield chunk
When writing your Python class, the following three methods are important for reasoning engine:
__init__()
:- Use this method for only application configuration parameters. For example, you can use this method to collect the model parameters and safety attributes as input arguments from your users. You can also use this method to collect parameters such as the project ID, the region, application credentials, and API keys.
- The constructor returns an object that must be "pickle-able" for
it to be deployable to reasoning engine. Therefore, you should initialize service clients
and establish connections to databases in the
.set_up
method instead of in the__init__
method. - This method is optional. If it's not specified, Vertex AI uses the default Python constructor for the class.
set_up()
:- You must use this method to define application initialization logic. For example, you use this method to establish connections to databases or dependent services, import dependent packages, or precompute data that's used for serving queries.
- This method is optional. If it's not specified, Vertex AI assumes
that the application doesn't need to call a
.set_up
method before serving user queries.
query()
/stream_query()
:- Use
query()
to return the complete response as a single result. - Use
stream_query()
to return the response in chunks as it becomes available, enabling a streaming experience. Thestream_query
method must return an iterable object (for example a generator) to enable streaming. - You can implement both methods if you want to support both single-response and streaming interactions with your application.
- You should give this method a clear docstring that defines what it does,
documents its attributes, and provides type annotations for its inputs.
Avoid variable arguments in the
query
andstream_query
method.
- Use
Test the application locally
Instantiate the application in local memory using the following code:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project=PROJECT_ID,
location=LOCATION,
)
agent.set_up()
Test the query
method
You can test the application by sending test queries to the local instance:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
The response is a dictionary that's similar to the following:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Test the stream_query
method
You can test the streaming query locally by calling the stream_query
method
and iterating through the results. Here's an example:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
This code prints each chunk of the response as it's generated. The output might look something like this:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In this example, each chunk contains different information about the response, such as the actions taken by the agent, the messages exchanged, and the final output.
Streaming API
Here are some key things to keep in mind when using the streaming API:
- Maximum timeout: The maximum timeout for streaming responses is 10 minutes. If your application requires longer processing times, consider breaking down the task into smaller chunks.
- Streaming models and chains: LangChain's Runnable interface supports streaming, so you can stream responses from not only agents, but also models and chains.
- LangChain compatibility: Note that LangChain's
astream_event
method isn't supported. - Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.
Customize method names
By default, the methods query
and stream_query
are registered as operations
in the deployed application.
You can override the default behavior and define the set of operations to be
registered using the register_operations
method.
Operations can be registered as either standard (represented by an empty string
""
) or streaming ("stream"
) invocation modes.
In the following example code, the register_operations
method will result in
the deployed
application offering custom_method_1
and custom_method_2
as operations for
standard calls, and custom_stream_method_1
and custom_stream_method_2
as
operations for streaming calls.
These operations replace the default query
and stream_query
operations.
from typing import Dict, List, Any, Iterable
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
"": [
"custom_method_1", "custom_method_2",
],
"stream": [
"custom_stream_method_1", "custom_stream_method_2",
],
}
You can test the application by sending test queries to the instance like as follows:
response = agent.custom_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
)
for chunk in agent.custom_stream_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
You don't need to register methods for both invocation types. For example, to support only standard calls, you can do the following:
from typing import Dict, List, Any
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
# The list of synchronous methods to be registered as operations.
"": [
"custom_method_1", "custom_method_2",
],
}
In this example, only custom_method_1
and custom_method_2
are exposed as
operations in deployed applications.