Customize an application template

In Develop the application, we used a prebuilt template (i.e. reasoning_engines.LangchainAgent) for developing an application. In this section, we go through the steps to customize your own application template. This might be useful if you have needs that go beyond what the prebuilt template provides.

An application template in Reasoning Engine is defined as a Python class. To give an example, the following Python code is an example of a LangChain application that is deployable on Vertex AI (you can give the CLASS_NAME variable a value such as MyAgent):

from typing import Any, Callable, Iterable, Sequence

class CLASS_NAME:
    def __init__(
            self,
            model: str,
            tools: Sequence[Callable],
            project: str,
            location: str,
        ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        """All unpickle-able logic should go here.

        The .set_up() method should not be called for an object that is being
        prepared for deployment.
        """
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langchain.agents import AgentExecutor
        from langchain.agents.format_scratchpad.tools import format_to_tool_messages
        from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
        from langchain.tools.base import StructuredTool
        from langchain_core import prompts

        vertexai.init(project=self.project, location=self.location)

        prompt = {
            "input": lambda x: x["input"],
            "agent_scratchpad": (
                lambda x: format_to_tool_messages(x["intermediate_steps"])
            ),
        } | prompts.ChatPromptTemplate.from_messages([
            ("user", "{input}"),
            prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])

        llm = ChatVertexAI(model_name=self.model_name)
        if self.tools:
            llm = llm.bind_tools(tools=self.tools)

        self.agent_executor = AgentExecutor(
            agent=prompt | llm | ToolsAgentOutputParser(),
            tools=[StructuredTool.from_function(tool) for tool in self.tools],
        )

    def query(self, input: str):
        """Query the application.

        Args:
            input: The user prompt.

        Returns:
            The output of querying the application with the given input.
        """
        return self.agent_executor.invoke(input={"input": input})

    def stream_query(self, input: str) -> Iterable[Any]:
        """Query the application and stream the output.

        Args:
            input: The user prompt.

        Yields:
            Chunks of the response as they become available.
        """
        for chunk in self.agent_executor.stream(input={"input": input}):
            yield chunk

When writing your Python class, the following three methods are important for reasoning engine:

  1. __init__():
    • Use this method for only application configuration parameters. For example, you can use this method to collect the model parameters and safety attributes as input arguments from your users. You can also use this method to collect parameters such as the project ID, the region, application credentials, and API keys.
    • The constructor returns an object that must be "pickle-able" for it to be deployable to reasoning engine. Therefore, you should initialize service clients and establish connections to databases in the .set_up method instead of in the __init__ method.
    • This method is optional. If it's not specified, Vertex AI uses the default Python constructor for the class.
  2. set_up():
    • You must use this method to define application initialization logic. For example, you use this method to establish connections to databases or dependent services, import dependent packages, or precompute data that's used for serving queries.
    • This method is optional. If it's not specified, Vertex AI assumes that the application doesn't need to call a .set_up method before serving user queries.
  3. query()/stream_query():
    • Use query() to return the complete response as a single result.
    • Use stream_query() to return the response in chunks as it becomes available, enabling a streaming experience. The stream_query method must return an iterable object (for example a generator) to enable streaming.
    • You can implement both methods if you want to support both single-response and streaming interactions with your application.
    • You should give this method a clear docstring that defines what it does, documents its attributes, and provides type annotations for its inputs. Avoid variable arguments in the query and stream_query method.

Test the application locally

Instantiate the application in local memory using the following code:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project=PROJECT_ID,
    location=LOCATION,
)
agent.set_up()

Test the query method

You can test the application by sending test queries to the local instance:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

The response is a dictionary that's similar to the following:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Test the stream_query method

You can test the streaming query locally by calling the stream_query method and iterating through the results. Here's an example:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

This code prints each chunk of the response as it's generated. The output might look something like this:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

In this example, each chunk contains different information about the response, such as the actions taken by the agent, the messages exchanged, and the final output.

Streaming API

Here are some key things to keep in mind when using the streaming API:

  • Maximum timeout: The maximum timeout for streaming responses is 10 minutes. If your application requires longer processing times, consider breaking down the task into smaller chunks.
  • Streaming models and chains: LangChain's Runnable interface supports streaming, so you can stream responses from not only agents, but also models and chains.
  • LangChain compatibility: Note that LangChain's astream_event method isn't supported.
  • Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.

Customize method names

By default, the methods query and stream_query are registered as operations in the deployed application. You can override the default behavior and define the set of operations to be registered using the register_operations method. Operations can be registered as either standard (represented by an empty string "") or streaming ("stream") invocation modes.

In the following example code, the register_operations method will result in the deployed application offering custom_method_1 and custom_method_2 as operations for standard calls, and custom_stream_method_1 and custom_stream_method_2 as operations for streaming calls. These operations replace the default query and stream_query operations.

from typing import Dict, List, Any, Iterable

class CLASS_NAME:
    # ... other methods ...

    def custom_method_1(...):
        # ...

    def custom_method_2(...):
        # ...

    def custom_stream_method_1(...) -> Iterable[Any]:
        # ...

    def custom_stream_method_2(...) -> Iterable[Any]:
        # ...

    def register_operations(self) -> Dict[str, List[str]]:
        return {
            "": [
                "custom_method_1", "custom_method_2",
            ],
            "stream": [
                "custom_stream_method_1", "custom_stream_method_2",
            ],
        }

You can test the application by sending test queries to the instance like as follows:

response = agent.custom_method_1(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

for chunk in agent.custom_stream_method_1(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

You don't need to register methods for both invocation types. For example, to support only standard calls, you can do the following:

from typing import Dict, List, Any

class CLASS_NAME:
    # ... other methods ...

    def custom_method_1(...):
        # ...

    def custom_method_2(...):
        # ...

    def custom_stream_method_1(...) -> Iterable[Any]:
        # ...

    def custom_stream_method_2(...) -> Iterable[Any]:
        # ...

    def register_operations(self) -> Dict[str, List[str]]:
        return {
            # The list of synchronous methods to be registered as operations.
            "": [
                "custom_method_1", "custom_method_2",
            ],
        }

In this example, only custom_method_1 and custom_method_2 are exposed as operations in deployed applications.

What's next