reasoning_engines.LangchainAgent
) per sviluppare un'applicazione. In questa sezione vengono descritti i passaggi per personalizzare il tuo modello di applicazione. Questa opzione può essere utile se le tue esigenze vanno oltre quelle offerte dal modello predefinito.
Un modello di applicazione in Reasoning Engine è definito come classe Python. Per fare un esempio, il seguente codice Python è un esempio di applicazione LangChain di cui è possibile eseguire il deployment su Vertex AI (puoi assegnare alla variabile CLASS_NAME
un valore come MyAgent
):
from typing import Any, Callable, Iterable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
"""All unpickle-able logic should go here.
The .set_up() method should not be called for an object that is being
prepared for deployment.
"""
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
from langchain.tools.base import StructuredTool
from langchain_core import prompts
vertexai.init(project=self.project, location=self.location)
prompt = {
"input": lambda x: x["input"],
"agent_scratchpad": (
lambda x: format_to_tool_messages(x["intermediate_steps"])
),
} | prompts.ChatPromptTemplate.from_messages([
("user", "{input}"),
prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatVertexAI(model_name=self.model_name)
if self.tools:
llm = llm.bind_tools(tools=self.tools)
self.agent_executor = AgentExecutor(
agent=prompt | llm | ToolsAgentOutputParser(),
tools=[StructuredTool.from_function(tool) for tool in self.tools],
)
def query(self, input: str):
"""Query the application.
Args:
input: The user prompt.
Returns:
The output of querying the application with the given input.
"""
return self.agent_executor.invoke(input={"input": input})
def stream_query(self, input: str) -> Iterable[Any]:
"""Query the application and stream the output.
Args:
input: The user prompt.
Yields:
Chunks of the response as they become available.
"""
for chunk in self.agent_executor.stream(input={"input": input}):
yield chunk
Quando scrivi la classe Python, i seguenti tre metodi sono importanti per il motore di ragionamento:
__init__()
:- Utilizza questo metodo solo per i parametri di configurazione dell'applicazione. Ad esempio, puoi utilizzare questo metodo per raccogliere i parametri del modello e gli attributi di sicurezza come argomenti di input dagli utenti. Puoi anche utilizzare questo metodo per raccogliere parametri come l'ID progetto, la regione, le credenziali dell'applicazione e le chiavi API.
- Il costruttore restituisce un oggetto che deve essere "pickle-able" per poter essere implementato nel motore di ragionamento. Pertanto, devi inizializzare i client di servizio
e stabilire connessioni ai database nel metodo
.set_up
anziché nel metodo__init__
. - Questo metodo è facoltativo. Se non è specificato, Vertex AI utilizza il costruttore Python predefinito per la classe.
set_up()
:- Devi utilizzare questo metodo per definire la logica di inizializzazione dell'applicazione. Ad esempio, lo utilizzi per stabilire connessioni a database o servizi dipendenti, importare pacchetti dipendenti o precompilare i dati utilizzati per l'invio di query.
- Questo metodo è facoltativo. Se non è specificato, Vertex AI assume
che l'applicazione non debba chiamare un metodo
.set_up
prima di rispondere alle query degli utenti.
query()
/stream_query()
:- Utilizza
query()
per restituire la risposta completa come singolo risultato. - Utilizza
stream_query()
per restituire la risposta in blocchi man mano che diventano disponibili, in modo da offrire un'esperienza di streaming. Il metodostream_query
deve restituire un oggetto iterabile (ad esempio un generatore) per abilitare lo streaming. - Puoi implementare entrambi i metodi se vuoi supportare sia le interazioni con risposta singola sia quelle in streaming con la tua applicazione.
- Devi fornire a questo metodo una docstring chiara che ne definisce la funzionalità, ne documenta gli attributi e fornisce annotazioni di tipo per gli input.
Evita gli argomenti variabili nel metodo
query
estream_query
.
- Utilizza
Testa l'applicazione localmente
Esegui l'inizializzazione dell'applicazione nella memoria locale utilizzando il seguente codice:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project=PROJECT_ID,
location=LOCATION,
)
agent.set_up()
Testa il metodo query
Puoi testare l'applicazione inviando query di test all'istanza locale:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Testa il metodo stream_query
Puoi testare la query di streaming localmente chiamando il metodo stream_query
e iterando i risultati. Ecco un esempio:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe avere il seguente aspetto:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In questo esempio, ogni chunk contiene informazioni diverse sulla risposta, come le azioni intraprese dall'agente, i messaggi scambiati e l'output finale.
API Streaming
Di seguito sono riportati alcuni aspetti chiave da tenere presente quando utilizzi l'API di streaming:
- Timeout massimo: il timeout massimo per le risposte in streaming è di 10 minuti. Se la tua applicazione richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in parti più piccole.
- Modelli e catene in streaming: l'interfaccia Runnable di LangChain supporta lo streaming, quindi puoi trasmettere in streaming le risposte non solo dagli agenti, ma anche da modelli e catene.
- Compatibilità con LangChain: tieni presente che il metodo
astream_event
di LangChain non è supportato. - Rallenta la generazione di contenuti: se riscontri problemi di backpressure (in cui il produttore genera dati più velocemente di quanto il consumatore possa elaborarli), rallenta la frequenza di generazione dei contenuti. In questo modo puoi evitare gli overflow del buffer e garantire un'esperienza di streaming fluida.
Personalizzare i nomi dei metodi
Per impostazione predefinita, i metodi query
e stream_query
sono registrati come operazioni
nell'applicazione di cui è stato eseguito il deployment.
Puoi ignorare il comportamento predefinito e definire l'insieme di operazioni da registrare utilizzando il metodo register_operations
.
Le operazioni possono essere registrate come modalità di chiamata standard (rappresentata da una stringa vuota""
) o di streaming ("stream"
).
Nel seguente codice di esempio, il metodo register_operations
farà in modo che l'applicazione di cui è stato eseguito il deployment offra custom_method_1
e custom_method_2
come operazioni per le chiamate standard e custom_stream_method_1
e custom_stream_method_2
come operazioni per le chiamate in streaming.
Queste operazioni sostituiscono le operazioni predefinite query
e stream_query
.
from typing import Dict, List, Any, Iterable
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
"": [
"custom_method_1", "custom_method_2",
],
"stream": [
"custom_stream_method_1", "custom_stream_method_2",
],
}
Puoi testare l'applicazione inviando query di test all'istanza come segue:
response = agent.custom_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
)
for chunk in agent.custom_stream_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Non è necessario registrare i metodi per entrambi i tipi di chiamata. Ad esempio, per supportare solo le chiamate standard, puoi procedere nel seguente modo:
from typing import Dict, List, Any
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
# The list of synchronous methods to be registered as operations.
"": [
"custom_method_1", "custom_method_2",
],
}
In questo esempio, solo custom_method_1
e custom_method_2
sono esposti come operazioni nelle applicazioni di cui è stato eseguito il deployment.