I modelli di agenti in Agent Engine sono definiti come classi Python. I seguenti passaggi mostrano come creare un modello personalizzato per l'istanziazione di agenti che possono essere dipiazzati su Vertex AI:
- Esempio di base
- (Facoltativo) Risposte dinamiche
- (Facoltativo) Registra i metodi personalizzati
- (Facoltativo) Fornisci annotazioni del tipo
- (Facoltativo) Monitoraggio tramite OpenTelemetry
- (Facoltativo) Utilizzo delle variabili di ambiente
- (Facoltativo) Integrazione con Secret Manager
- (Facoltativo) Gestione delle credenziali
Esempio di base
Per fare un esempio di base, la seguente classe Python è un modello per l'inizializzazione di agenti di cui è possibile eseguire il deployment su Vertex AI (puoi assegnare alla variabile CLASS_NAME
un valore come MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerazioni sul deployment
Quando scrivi la classe Python, sono importanti i seguenti tre metodi:
__init__()
:- Utilizza questo metodo solo per i parametri di configurazione dell'agente. Ad esempio, puoi utilizzare questo metodo per raccogliere i parametri del modello e gli attributi di sicurezza come argomenti di input dagli utenti. Puoi anche utilizzare questo metodo per raccogliere parametri come l'ID progetto, la regione, le credenziali dell'applicazione e le chiavi API.
- Il costruttore restituisce un oggetto che deve essere "pickle-able" per poter essere dipiegato in Agent Engine. Pertanto, devi inizializzare i client di servizio e stabilire connessioni ai database nel metodo
.set_up
anziché nel metodo__init__
. - Questo metodo è facoltativo. Se non è specificato, Vertex AI utilizza il costruttore Python predefinito per la classe.
set_up()
:- Devi utilizzare questo metodo per definire la logica di inizializzazione dell'agente. Ad esempio, lo utilizzi per stabilire connessioni a database o servizi dipendenti, importare pacchetti dipendenti o precompilare i dati utilizzati per la pubblicazione di query.
- Questo metodo è facoltativo. Se non è specificato, Vertex AI assume
che l'agente non debba chiamare un metodo
.set_up
prima di gestire le query degli utenti.
query()
/stream_query()
:- Utilizza
query()
per restituire la risposta completa come singolo risultato. - Utilizza
stream_query()
per restituire la risposta in blocchi man mano che diventano disponibili, in modo da offrire un'esperienza di streaming. Il metodostream_query
deve restituire un oggetto iterabile (ad esempio un generatore) per abilitare lo streaming. - Puoi implementare entrambi i metodi se vuoi supportare sia le interazioni con risposta singola sia quelle in streaming con l'agente.
- Devi fornire a questo metodo una docstring chiara che ne definisce la funzionalità, ne documenta gli attributi e fornisce annotazioni di tipo per gli input.
Evita gli argomenti variabili nel metodo
query
estream_query
.
- Utilizza
Crea un'istanza dell'agente localmente
Puoi creare un'istanza locale dell'agente utilizzando il seguente codice:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testa il metodo query
Puoi testare l'agente inviando query all'istanza locale:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Risposte dinamiche
Per eseguire lo streaming delle risposte alle query, puoi definire un metodo denominato stream_query
che genera le risposte. Ad esempio, il seguente modello estende l'esempio di base per lo streaming delle risposte ed è implementabile su Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
Di seguito sono riportati alcuni aspetti chiave da tenere presente quando utilizzi l'API di streaming:
- Timeout massimo: il timeout massimo per le risposte in streaming è di 10 minuti. Se il tuo agente richiede tempi di elaborazione più lunghi, ti consigliamo di suddividere l'attività in parti più piccole.
- Modelli e catene in streaming: l'interfaccia Runnable di LangChain supporta lo streaming, quindi puoi trasmettere in streaming le risposte non solo dagli agenti, ma anche da modelli e catene.
- Compatibilità con LangChain: tieni presente che i metodi asincroni come il metodo
astream_event
di LangChain non sono attualmente supportati. - Rallenta la generazione di contenuti: se riscontri problemi di backpressure (in cui il produttore genera dati più velocemente di quanto il consumatore possa elaborarli), devi rallentare la frequenza di generazione dei contenuti. In questo modo puoi evitare gli overflow del buffer e garantire un'esperienza di streaming fluida.
Testa il metodo stream_query
Puoi testare la query di streaming localmente chiamando il metodo stream_query
e iterando i risultati. Ecco un esempio:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe avere il seguente aspetto:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In questo esempio, ogni chunk contiene informazioni diverse sulla risposta, come le azioni intraprese dall'agente, i messaggi scambiati e l'output finale.
Registrazione di metodi personalizzati
Per impostazione predefinita, i metodi query
e stream_query
sono registrati come operazioni
nell'agente di cui è stato eseguito il deployment.
Puoi ignorare il comportamento predefinito e definire l'insieme di operazioni da registrare utilizzando il metodo register_operations
.
Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentata da una stringa vuota
""
) o in streaming ("stream"
).
Per registrare più operazioni, puoi definire un metodo denominato
register_operations
che elenca i metodi da rendere disponibili agli utenti al momento del deployment dell'agente. Nel seguente codice di esempio, il metodo register_operations
porterà l'agente di cui è stato eseguito il deployment a registrare query
e get_state
come operazioni eseguite in modo sincrono e stream_query
e get_state_history
come operazioni che trasmettono le risposte in streaming:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Puoi testare i metodi personalizzati chiamandoli direttamente nell'istanza locale dell'agente, in modo simile a come testare i metodi query
e stream_query
.
Fornire annotazioni di tipo
Puoi utilizzare le annotazioni del tipo per specificare i tipi di input e output previsti per i metodi dell'agente. Quando l'agente viene disegnato, nell'input e nell'output delle operazioni supportate dall'agente sono supportati solo i tipi serializzabili in JSON. Gli schemi di input e output possono essere annotati utilizzando modelli TypedDict
o Pydantic.
Nel seguente esempio, annottiamo l'input come TypedDict
e convertiamo
l'output non elaborato di .get_state
(che è un NamedTuple
) in un dizionario
serializzabile utilizzando il relativo metodo ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Monitoraggio con OpenTelemetry
Per attivare il monitoraggio con librerie di strumenti che supportano OpenTelemetry, puoi importarle e inizializzarle nel metodo .set_up
.
Ad esempio, il seguente modello è una modifica dell'esempio di base per esportare le tracce in Cloud Trace:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Utilizzo delle variabili di ambiente
Per impostare le variabili di ambiente in modo che funzionino al momento del deployment dell'agente,
devi impostarle all'interno del metodo .set_up
, ad esempio:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Integrazione con Secret Manager
Per eseguire l'integrazione con Secret Manager:
Installa la libreria client eseguendo
pip install google-cloud-secret-manager
Segui le istruzioni riportate in Concedere i ruoli per un agente di cui è stato eseguito il deployment per concedere all'agente di servizio del motore di ragionamento il ruolo "Accesso ai secret di Secret Manager" (
roles/secretmanager.secretAccessor
) tramite la console Google Cloud.Importa e inizializza il client nel metodo
.set_up
e recupera il segreto corrispondente quando necessario. Ad esempio, il seguente modello è una modifica dell'esempio di base per utilizzare una chiave API perChatAnthropic
archiviata in Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Gestione delle credenziali
Quando l'agente viene disegnato, potrebbe dover gestire diversi tipi di credenziali:
- Credenziali predefinite dell'applicazione (ADC) che in genere derivano dagli account di servizio,
- OAuth che in genere si verificano negli account utente e
- Provider di identità per le credenziali di account esterni (Federazione delle identità per i carichi di lavoro).
Credenziali predefinite dell'applicazione
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Può essere utilizzato nel codice nel seguente modo:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Per maggiori dettagli, vedi Come funzionano le credenziali predefinite dell'applicazione.
OAuth
In genere, le credenziali utente vengono ottenute utilizzando OAuth 2.0.
Se hai un token di accesso (ad es. da oauthlib
),
puoi creare un'istanza google.oauth2.credentials.Credentials
. Inoltre, se ottieni un token di aggiornamento, puoi anche specificare il token di aggiornamento e l'URI del token per consentire l'aggiornamento automatico delle credenziali:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Qui, TOKEN_URI
, CLIENT_ID
e
CLIENT_SECRET
si basano su Creare una credenziale client OAuth.
Se non hai un token di accesso, puoi utilizzare google_auth_oauthlib.flow
per eseguire il flusso di concessione dell'autorizzazione OAuth 2.0 per ottenere un'istanza google.oauth2.credentials.Credentials
corrispondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Per maggiori dettagli, consulta la documentazione del modulo google_auth_oauthlib.flow
.
Provider di identità
Se vuoi autenticare gli utenti utilizzando email/password, numero di telefono, provider social come Google, Facebook o GitHub o un meccanismo di autenticazione personalizzato, puoi utilizzare Identity Platform o Firebase Authentication o qualsiasi provider di identità che supporti OpenID Connect (OIDC).
Per maggiori dettagli, consulta Accesso alle risorse da un provider di identità OIDC.