I modelli di agenti in Vertex AI Agent Engine sono definiti come classi Python. I seguenti passaggi mostrano come creare un modello personalizzato per l'istanza di agenti che possono essere implementati su Vertex AI:
- Esempio di base
- (Facoltativo) Risposte dinamiche
- (Facoltativo) Registrare metodi personalizzati
- (Facoltativo) Fornisci annotazioni di tipo
- (Facoltativo) Invio di tracce a Cloud Trace
- (Facoltativo) Utilizzo delle variabili di ambiente
- (Facoltativo) Integrazione con Secret Manager
- (Facoltativo) Gestione delle credenziali
- (Facoltativo) Gestione degli errori
Esempio di base
Per fare un esempio di base, la seguente classe Python è un modello per
l'istanza di agenti di cui è possibile eseguire il deployment su Vertex AI (puoi assegnare alla
variabile CLASS_NAME
un valore come MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerazioni sul deployment
Quando scrivi la classe Python, sono importanti i tre metodi seguenti:
__init__()
:- Utilizza questo metodo solo per i parametri di configurazione dell'agente. Ad esempio, puoi utilizzare questo metodo per raccogliere i parametri del modello e gli attributi di sicurezza come argomenti di input dai tuoi utenti. Puoi anche utilizzare questo metodo per raccogliere parametri come l'ID progetto, la regione, le credenziali dell'applicazione e le chiavi API.
- Il costruttore restituisce un oggetto che deve essere "serializzabile" per
poter essere sottoposto a deployment in Vertex AI Agent Engine. Pertanto, devi inizializzare
i client di servizio e stabilire connessioni ai database nel metodo
.set_up
anziché nel metodo__init__
. - Questo metodo è facoltativo. Se non viene specificato, Vertex AI utilizza il costruttore Python predefinito per la classe.
set_up()
:- Devi utilizzare questo metodo per definire la logica di inizializzazione dell'agente. Ad esempio, utilizzi questo metodo per stabilire connessioni a database o servizi dipendenti, importare pacchetti dipendenti o precalcolare i dati utilizzati per l'elaborazione delle query.
- Questo metodo è facoltativo. Se non viene specificato, Vertex AI presuppone
che l'agente non debba chiamare un metodo
.set_up
prima di rispondere alle query degli utenti.
query()
/stream_query()
:- Utilizza
query()
per restituire la risposta completa come singolo risultato. - Utilizza
stream_query()
per restituire la risposta in blocchi man mano che diventa disponibile, consentendo un'esperienza di streaming. Il metodostream_query
deve restituire un oggetto iterabile (ad esempio un generatore) per abilitare lo streaming. - Puoi implementare entrambi i metodi se vuoi supportare sia le interazioni con risposta singola sia quelle di streaming con il tuo agente.
- Devi fornire a questo metodo una docstring chiara che definisca cosa fa,
documenti i suoi attributi e fornisca annotazioni di tipo per i suoi input.
Evita argomenti variabili nel metodo
query
estream_query
.
- Utilizza
Istanzia l'agente localmente
Puoi creare un'istanza locale del tuo agente utilizzando il seguente codice:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testare il metodo query
Puoi testare l'agente inviando query all'istanza locale:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Eseguire query in modo asincrono
Per rispondere alle query in modo asincrono, puoi definire un metodo (ad esempio async_query
)
che restituisce una coroutine Python. Ad esempio, il seguente modello estende l'esempio di base per rispondere in modo asincrono ed è implementabile su Vertex AI:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testare il metodo async_query
Puoi testare l'agente localmente chiamando il metodo async_query
. Ecco un
esempio:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Risposte dinamiche
Per trasmettere in streaming le risposte alle query, puoi definire un metodo denominato stream_query
che genera le risposte. Ad esempio, il seguente modello estende l'esempio
di base per trasmettere in streaming le risposte ed è implementabile su Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Ecco alcuni aspetti chiave da tenere presente quando utilizzi l'API di streaming:
- Timeout massimo: il timeout massimo per le risposte di streaming è di 10 minuti. Se il tuo agente richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in parti più piccole.
- Modelli e catene di streaming: l'interfaccia Runnable di LangChain supporta lo streaming, in modo da poter trasmettere in streaming le risposte non solo degli agenti, ma anche di modelli e catene.
- Compatibilità con LangChain: tieni presente che al momento non sono supportati metodi asincroni come il metodo
astream_event
di LangChain. - Limita la generazione di contenuti: se riscontri problemi di contropressione (in cui il produttore genera dati più velocemente di quanto il consumer possa elaborarli), devi limitare la velocità di generazione dei contenuti. In questo modo puoi evitare overflow del buffer e garantire un'esperienza di streaming fluida.
Testare il metodo stream_query
Puoi testare la query di streaming localmente chiamando il metodo stream_query
e scorrendo i risultati. Ecco un esempio:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe essere simile al seguente:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In questo esempio, ogni blocco contiene informazioni diverse sulla risposta, come le azioni intraprese dall'agente, i messaggi scambiati e l'output finale.
Risposte dinamiche in modo asincrono
Per trasmettere le risposte in streaming in modo asincrono, puoi definire un metodo (ad es. async_stream_query
)
che restituisce un generatore asincrono. Ad esempio, il seguente modello estende l'esempio di base per trasmettere le risposte in modo asincrono ed è implementabile su Vertex AI:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testare il metodo async_stream_query
Analogamente al codice per testare le query di streaming, puoi testare l'agente localmente chiamando il metodo async_stream_query
e scorrendo i risultati. Ecco un esempio:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe essere simile al seguente:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Registrazione di metodi personalizzati
Per impostazione predefinita, i metodi query
e stream_query
vengono registrati come operazioni
nell'agente di cui è stato eseguito il deployment.
Puoi ignorare il comportamento predefinito e definire l'insieme di operazioni da registrare utilizzando il metodo register_operations
.
Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentata da una stringa vuota
""
) o di streaming ("stream"
).
Per registrare più operazioni, puoi definire un metodo denominato
register_operations
che elenca i metodi da rendere disponibili agli utenti quando
l'agente viene implementato. Nel seguente codice di esempio, il metodo register_operations
comporterà la registrazione dell'agente di cui è stato eseguito il deployment di query
e get_state
come
operazioni eseguite in modo sincrono e di stream_query
e get_state_history
come
operazioni che trasmettono in streaming le risposte:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Puoi testare i metodi personalizzati chiamandoli direttamente sull'istanza locale dell'agente, in modo simile a come testeresti i metodi query
e stream_query
.
Fornire annotazioni di tipo
Puoi utilizzare le annotazioni di tipo per specificare i tipi di input e output previsti dei metodi dell'agente. Quando l'agente viene implementato, nell'input e nell'output delle operazioni supportate dall'agente sono supportati solo i tipi serializzabili in JSON. Gli
schemi degli input e degli output possono essere annotati utilizzando TypedDict
o modelli Pydantic.
Nell'esempio seguente, annotiamo l'input come TypedDict
e convertiamo
l'output non elaborato da .get_state
(che è un NamedTuple
) in un dizionario serializzabile
utilizzando il metodo ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Invio di tracce a Cloud Trace
Per inviare tracce a Cloud Trace con librerie di strumentazione che supportano OpenTelemetry, puoi importarle e inizializzarle nel metodo .set_up
. Per i framework di agenti comuni, potresti essere in grado di utilizzare l'integrazione di OpenTelemetry in combinazione con un framework di strumentazione come OpenInference o OpenLLMetry. Google Cloud
Ad esempio, il seguente modello è una modifica dell'esempio di base per esportare le tracce in Cloud Trace:
OpenInference
Per prima cosa, installa il pacchetto richiesto
utilizzando pip
eseguendo
pip install openinference-instrumentation-langchain==0.1.34
Poi importa e inizializza lo strumento:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Per prima cosa, installa il pacchetto richiesto
utilizzando pip
eseguendo
pip install opentelemetry-instrumentation-langchain==0.38.10
Poi importa e inizializza lo strumento:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Utilizzo delle variabili di ambiente
Per impostare le variabili di ambiente, assicurati che siano disponibili tramite os.environ
durante lo sviluppo e segui le istruzioni riportate in Definisci le variabili di ambiente
durante il deployment dell'agente.
Integrazione con Secret Manager
Per l'integrazione con Secret Manager:
Installa la libreria client eseguendo
pip install google-cloud-secret-manager
Segui le istruzioni riportate in Concedere ruoli per un agente di cui è stato eseguito il deployment per concedere all'account di servizio il ruolo "Secret Manager Secret Accessor" (
roles/secretmanager.secretAccessor
) tramite la console Google Cloud .Importa e inizializza il client nel metodo
.set_up
e ottieni il segreto corrispondente quando necessario. Ad esempio, il seguente modello è una modifica dell'esempio di base per utilizzare una chiave API perChatAnthropic
che è stata memorizzata in Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Gestione delle credenziali
Quando l'agente viene implementato, potrebbe dover gestire diversi tipi di credenziali:
- Credenziali predefinite dell'applicazione (ADC), che derivano comunemente dai service account.
- OAuth, che si verifica comunemente a causa degli account utente.
- Provider di identità per le credenziali di account esterni (federazione delle identità per i carichi di lavoro).
Credenziali predefinite dell'applicazione
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Può essere utilizzato nel codice nel seguente modo:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Per maggiori dettagli, consulta Come funzionano le credenziali predefinite dell'applicazione.
OAuth
Le credenziali utente vengono in genere ottenute utilizzando OAuth 2.0.
Se hai un token di accesso (ad es. da oauthlib
),
puoi creare un'istanza google.oauth2.credentials.Credentials
. Inoltre,
se ottieni un token di aggiornamento, puoi specificare anche l'URI del token di aggiornamento e del token
per consentire l'aggiornamento automatico delle credenziali:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Qui, TOKEN_URI
, CLIENT_ID
e
CLIENT_SECRET
si basano su Creare una credenziale client OAuth.
Se non hai un token di accesso, puoi utilizzare google_auth_oauthlib.flow
per
eseguire il flusso di concessione dell'autorizzazione OAuth 2.0
per ottenere un'istanza google.oauth2.credentials.Credentials
corrispondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Per maggiori dettagli, consulta la documentazione del modulo google_auth_oauthlib.flow
.
Provider di identità
Se vuoi autenticare gli utenti utilizzando email/password, numero di telefono, provider social come Google, Facebook o GitHub o un meccanismo di autenticazione personalizzato, puoi utilizzare Identity Platform o Firebase Authentication o qualsiasi provider di identità che supporti OpenID Connect (OIDC).
Per maggiori dettagli, visita la pagina Accesso alle risorse da un provider di identità OIDC.
Gestione degli errori
Per garantire che gli errori dell'API vengano restituiti in un formato JSON strutturato, ti consigliamo di implementare la gestione degli errori all'interno del codice dell'agente utilizzando un blocco try...except
, che può essere estratto in un decoratore.
Sebbene Vertex AI Agent Engine possa gestire internamente vari codici di stato, Python non dispone di un modo standardizzato per rappresentare gli errori con codici di stato HTTP associati in tutti i tipi di eccezione. Tentare di mappare tutte le possibili eccezioni Python agli stati HTTP all'interno del servizio sottostante sarebbe complesso e difficile da gestire.
Un approccio più scalabile consiste nell'intercettare esplicitamente le eccezioni pertinenti all'interno dei metodi dell'agente o utilizzando un decoratore riutilizzabile come error_wrapper
. Puoi quindi associare i codici di stato appropriati (ad esempio aggiungendo gli attributi code
e error
alle eccezioni personalizzate o gestendo in modo specifico le eccezioni standard) e formattare l'errore come dizionario JSON per il valore restituito. Ciò richiede modifiche minime al codice all'interno dei metodi dell'agente stesso, spesso è sufficiente aggiungere il decoratore.
Di seguito è riportato un esempio di come puoi implementare la gestione degli errori nel tuo agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
Il codice precedente genera il seguente output:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
Passaggi successivi
- Valutare un agente.
- Esegui il deployment di un agente.
- Risolvi i problemi relativi allo sviluppo di un agente.
- Richiedere assistenza.