Dans Vertex AI Agent Engine, les modèles d'agents sont définis comme des classes Python. Les étapes suivantes vous montrent comment créer un modèle personnalisé pour instancier des agents déployables sur Vertex AI :
- Exemple de base
- (Facultatif) Diffuser les réponses
- (Facultatif) Enregistrer des méthodes personnalisées
- (Facultatif) Fournir des annotations de type
- (Facultatif) Envoyer des traces à Cloud Trace
- (Facultatif) Utiliser des variables d'environnement
- (Facultatif) Intégration à Secret Manager
- (Facultatif) Gestion des identifiants
- (Facultatif) Gestion des erreurs
Exemple de base
Pour vous donner un exemple de base, la classe Python suivante est un modèle pour instancier des agents déployables sur Vertex AI (vous pouvez attribuer à la variable CLASS_NAME
une valeur telle que MyAgent
) :
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Remarques relatives au déploiement
Lorsque vous écrivez votre classe Python, les trois méthodes suivantes sont importantes :
__init__()
:- N'utilisez cette méthode que pour les paramètres de configuration de l'agent. Par exemple, vous pouvez utiliser cette méthode pour collecter les paramètres du modèle et les attributs de sécurité en tant qu'arguments d'entrée de vos utilisateurs. Vous pouvez également utiliser cette méthode pour collecter des paramètres tels que l'ID de projet, la région, les identifiants d'application et les clés API.
- Le constructeur renvoie un objet qui doit pouvoir être "picklé" pour qu'il puisse être déployé sur Vertex AI Agent Engine. Par conséquent, vous devez initialiser les clients de service et établir des connexions aux bases de données dans la méthode
.set_up
plutôt que dans la méthode__init__
. - Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI utilise le constructeur Python par défaut de la classe.
set_up()
:- Vous devez utiliser cette méthode pour définir la logique d'initialisation de l'agent. Par exemple, cette méthode vous permet d'établir des connexions à des bases de données ou de services dépendants, d'importer des packages dépendants ou de précalculer des données utilisées pour diffuser des requêtes.
- Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI suppose que l'agent n'a pas besoin d'appeler une méthode
.set_up
avant de diffuser les requêtes utilisateur.
query()
/stream_query()
:- Utilisez
query()
pour renvoyer la réponse complète sous forme de résultat unique. - Utilisez
stream_query()
pour renvoyer la réponse par blocs à mesure qu'elle est disponible, ce qui permet une expérience de streaming. La méthodestream_query
doit renvoyer un objet itérable (par exemple, un générateur) pour activer le streaming. - Vous pouvez implémenter les deux méthodes si vous souhaitez prendre en charge les interactions à réponse unique et en flux continu avec votre agent.
- Vous devez fournir à cette méthode un docstring clair qui définit ce qu'elle fait, documente ses attributs et fournit des annotations de type pour ses entrées.
Évitez les arguments de variables dans les méthodes
query
etstream_query
.
- Utilisez
Instancier l'agent localement
Vous pouvez créer une instance locale de votre agent à l'aide du code suivant :
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Tester la méthode query
Vous pouvez tester l'agent en envoyant des requêtes à l'instance locale :
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La réponse est un dictionnaire semblable à celui-ci :
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Interroger de manière asynchrone
Pour répondre aux requêtes de manière asynchrone, vous pouvez définir une méthode (telle que async_query
) qui renvoie une coroutine Python. Par exemple, le modèle suivant étend l'exemple de base pour répondre de manière asynchrone et peut être déployé sur Vertex AI :
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Tester la méthode async_query
Vous pouvez tester l'agent localement en appelant la méthode async_query
. Exemple :
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
La réponse est un dictionnaire semblable à celui-ci :
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Réponses en streaming
Pour diffuser des réponses aux requêtes, vous pouvez définir une méthode nommée stream_query
qui génère des réponses. Par exemple, le modèle suivant étend l'exemple de base pour diffuser des réponses et peut être déployé sur Vertex AI :
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Voici quelques points clés à retenir lorsque vous utilisez l'API de streaming :
- Délai avant expiration maximal : le délai avant expiration maximal pour les réponses en streaming est de 10 minutes. Si votre agent nécessite des temps de traitement plus longs, envisagez de diviser la tâche en plus petites parties.
- Modèles et chaînes de streaming : l'interface Runnable de LangChain est compatible avec le streaming. Vous pouvez donc diffuser des réponses non seulement à partir d'agents, mais aussi de modèles et de chaînes.
- Compatibilité avec LangChain : notez que les méthodes asynchrones telles que la méthode
astream_event
de LangChain ne sont pas prises en charge pour le moment. - Limitez la génération de contenu : si vous rencontrez des problèmes de contre-pression (le producteur génère des données plus rapidement que le consommateur ne peut les traiter), vous devez limiter votre taux de génération de contenu. Cela peut aider à éviter les dépassements de mémoire tampon et à garantir une expérience de streaming fluide.
Tester la méthode stream_query
Vous pouvez tester la requête de flux en local en appelant la méthode stream_query
et en parcourant les résultats. Exemple :
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Ce code imprime chaque bloc de la réponse à mesure qu'il est généré. Le résultat peut ressembler à ceci :
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Dans cet exemple, chaque bloc contient différentes informations sur la réponse, comme les actions effectuées par l'agent, les messages échangés et le résultat final.
Diffuser des réponses de manière asynchrone
Pour diffuser des réponses de manière asynchrone, vous pouvez définir une méthode (par exemple, async_stream_query
) qui renvoie un générateur asynchrone. Par exemple, le modèle suivant étend l'exemple de base pour diffuser les réponses de manière asynchrone et est déployable sur Vertex AI :
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Tester la méthode async_stream_query
Comme pour le code de test des requêtes de streaming, vous pouvez tester l'agent localement en appelant la méthode async_stream_query
et en parcourant les résultats. Exemple :
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Ce code imprime chaque bloc de la réponse à mesure qu'il est généré. Le résultat peut ressembler à ceci :
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Enregistrer des méthodes personnalisées
Par défaut, les méthodes query
et stream_query
sont enregistrées en tant qu'opérations dans l'agent déployé.
Vous pouvez remplacer le comportement par défaut et définir l'ensemble des opérations à enregistrer à l'aide de la méthode register_operations
.
Les opérations peuvent être enregistrées en tant que modes d'exécution standard (représentés par une chaîne vide ""
) ou de streaming ("stream"
).
Pour enregistrer plusieurs opérations, vous pouvez définir une méthode nommée register_operations
qui liste les méthodes à mettre à la disposition des utilisateurs lorsque l'agent est déployé. Dans l'exemple de code suivant, la méthode register_operations
entraînera l'enregistrement de query
et get_state
en tant qu'opérations s'exécutant de manière synchrone, et de stream_query
et get_state_history
en tant qu'opérations diffusant les réponses :
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Vous pouvez tester les méthodes personnalisées en les appelant directement sur l'instance locale de l'agent, de la même manière que vous testeriez les méthodes query
et stream_query
.
Fournir des annotations de type
Vous pouvez utiliser des annotations de type pour spécifier les types d'entrée et de sortie attendus de vos méthodes d'agent. Lorsque l'agent est déployé, seuls les types sérialisables au format JSON sont acceptés dans les entrées et les sorties des opérations qu'il prend en charge. Les schémas des entrées et des sorties peuvent être annotés à l'aide de modèles TypedDict
ou Pydantic.
Dans l'exemple suivant, nous annotons l'entrée en tant que TypedDict
et convertissons la sortie brute de .get_state
(qui est un NamedTuple
) en dictionnaire sérialisable à l'aide de sa méthode ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Envoyer des traces à Cloud Trace
Pour envoyer des traces à Cloud Trace avec des bibliothèques d'instrumentation compatibles avec OpenTelemetry, vous pouvez les importer et les initialiser dans la méthode .set_up
. Pour les frameworks d'agents courants, vous pouvez utiliser l'intégration Open Telemetry Google Cloud en combinaison avec un framework d'instrumentation tel qu'OpenInference ou OpenLLMetry.
Par exemple, le modèle suivant est une modification de l'exemple de base pour exporter les traces vers Cloud Trace :
OpenInference
Commencez par installer le package requis à l'aide de pip
en exécutant
pip install openinference-instrumentation-langchain==0.1.34
Ensuite, importez et initialisez l'instrumenteur :
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Commencez par installer le package requis à l'aide de pip
en exécutant
pip install opentelemetry-instrumentation-langchain==0.38.10
Ensuite, importez et initialisez l'instrumenteur :
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Utiliser des variables d'environnement
Pour définir des variables d'environnement, assurez-vous qu'elles sont disponibles via os.environ
pendant le développement et suivez les instructions de la section Définir des variables d'environnement lors du déploiement de l'agent.
Intégrer une API à Secret Manager
Pour intégrer Secret Manager :
Installez la bibliothèque cliente en exécutant
pip install google-cloud-secret-manager
Suivez les instructions de la section Attribuer des rôles à un agent déployé pour accorder le rôle "Accesseur de secrets Secret Manager" (
roles/secretmanager.secretAccessor
) au compte de service via la console Google Cloud .Importez et initialisez le client dans la méthode
.set_up
, puis obtenez le code secret correspondant si nécessaire. Par exemple, le modèle suivant est une modification de l'exemple de base pour utiliser une clé API pourChatAnthropic
qui a été stockée dans Secret Manager :
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Gérer les identifiants
Une fois l'agent déployé, il peut avoir besoin de gérer différents types d'identifiants :
- Identifiants par défaut de l'application (ADC) provenant généralement de comptes de service
- OAuth provenant généralement de comptes utilisateur
- Fournisseurs d'identité pour les identifiants de comptes externes (fédération d'identité de charge de travail).
Identifiants par défaut de l'application
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Il peut être utilisé dans le code de la manière suivante :
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Pour en savoir plus, consultez Fonctionnement des identifiants par défaut de l'application.
OAuth
Les identifiants utilisateur sont généralement obtenus à l'aide d'OAuth 2.0.
Si vous disposez d'un jeton d'accès (par exemple, à partir de oauthlib
), vous pouvez créer une instance google.oauth2.credentials.Credentials
. De plus, si vous obtenez un jeton d'actualisation, vous pouvez également spécifier le jeton d'actualisation et l'URI du jeton pour permettre l'actualisation automatique des identifiants :
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Ici, TOKEN_URI
, CLIENT_ID
et CLIENT_SECRET
sont basés sur Créer un identifiant client OAuth.
Si vous ne disposez pas de jeton d'accès, vous pouvez utiliser google_auth_oauthlib.flow
pour effectuer le flux d'attribution d'autorisation OAuth 2.0 afin d'obtenir une instance google.oauth2.credentials.Credentials
correspondante :
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Pour en savoir plus, consultez la documentation du module google_auth_oauthlib.flow
.
Fournisseur d'identité
Si vous souhaitez authentifier les utilisateurs à l'aide d'une combinaison adresse e-mail/mot de passe, d'un numéro de téléphone, d'un compte de réseau social tel que Google, Facebook ou GitHub, ou d'un mécanisme d'authentification personnalisé, vous pouvez utiliser Identity Platform, Firebase Authentication ou tout fournisseur d'identité compatible avec OpenID Connect (OIDC).
Pour en savoir plus, consultez Accéder aux ressources depuis un fournisseur d'identité OIDC.
Traiter les erreurs
Pour vous assurer que les erreurs d'API sont renvoyées dans un format JSON structuré, nous vous recommandons d'implémenter la gestion des exceptions dans le code de votre agent à l'aide d'un bloc try...except
, qui peut être abstrait dans un décorateur.
Bien que Vertex AI Agent Engine puisse gérer différents codes d'état en interne, Python ne dispose pas d'une méthode standardisée pour représenter les erreurs avec les codes d'état HTTP associés pour tous les types d'exception. Tenter de mapper toutes les exceptions Python possibles aux états HTTP dans le service sous-jacent serait complexe et difficile à gérer.
Une approche plus évolutive consiste à intercepter explicitement les exceptions pertinentes dans les méthodes de votre agent ou à l'aide d'un décorateur réutilisable tel que error_wrapper
. Vous pouvez ensuite associer les codes d'état appropriés (par exemple, en ajoutant des attributs code
et error
aux exceptions personnalisées ou en gérant spécifiquement les exceptions standards) et mettre en forme l'erreur en tant que dictionnaire JSON pour la valeur renvoyée. Cela nécessite une modification minimale du code dans les méthodes de l'agent lui-même, souvent en ajoutant simplement le décorateur.
Voici un exemple d'implémentation de la gestion des exceptions dans votre agent :
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
Le code précédent génère le résultat suivant :
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
Étapes suivantes
- Évaluez un agent.
- Déployez un agent.
- Résolvez les problèmes liés au développement d'un agent.
- Accédez à l'assistance.