Développer un agent personnalisé

Dans Agent Engine, les modèles d'agent sont définis comme des classes Python. Les étapes suivantes vous expliquent comment créer un modèle personnalisé pour instancier des agents pouvant être déployés sur Vertex AI:

  1. Exemple de base
  2. (Facultatif) Diffuser les réponses
  3. (Facultatif) Enregistrer des méthodes personnalisées
  4. (Facultatif) Fournir des annotations de type
  5. (Facultatif) Traçage à l'aide d'OpenTelemetry
  6. (Facultatif) Utiliser des variables d'environnement
  7. (Facultatif) Intégration à Secret Manager
  8. (Facultatif) Gérer les identifiants

Exemple de base

Pour vous donner un exemple de base, la classe Python suivante est un modèle d'instanciation d'agents déployables sur Vertex AI (vous pouvez attribuer à la variable CLASS_NAME une valeur telle que MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Remarques relatives au déploiement

Lorsque vous écrivez votre classe Python, les trois méthodes suivantes sont importantes:

  1. __init__() :
    • N'utilisez cette méthode que pour les paramètres de configuration de l'agent. Par exemple, vous pouvez utiliser cette méthode pour collecter les paramètres du modèle et les attributs de sécurité en tant qu'arguments d'entrée de vos utilisateurs. Vous pouvez également utiliser cette méthode pour collecter des paramètres tels que l'ID de projet, la région, les identifiants d'application et les clés API.
    • Le constructeur renvoie un objet qui doit pouvoir être "picklé" pour qu'il puisse être déployé sur le moteur d'agent. Par conséquent, vous devez initialiser les clients de service et établir des connexions aux bases de données dans la méthode .set_up plutôt que dans la méthode __init__.
    • Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI utilise le constructeur Python par défaut de la classe.
  2. set_up() :
    • Vous devez utiliser cette méthode pour définir la logique d'initialisation de l'agent. Par exemple, cette méthode vous permet d'établir des connexions à des bases de données ou de services dépendants, d'importer des packages dépendants ou de précalculer des données utilisées pour diffuser des requêtes.
    • Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI suppose que l'agent n'a pas besoin d'appeler une méthode .set_up avant de diffuser les requêtes utilisateur.
  3. query() / stream_query() :
    • Utilisez query() pour renvoyer la réponse complète en tant que résultat unique.
    • Utilisez stream_query() pour renvoyer la réponse par blocs à mesure qu'elle devient disponible, ce qui permet une expérience de streaming. La méthode stream_query doit renvoyer un objet itérable (par exemple, un générateur) pour activer le streaming.
    • Vous pouvez implémenter les deux méthodes si vous souhaitez prendre en charge les interactions en streaming et à réponse unique avec votre agent.
    • Vous devez fournir à cette méthode un docstring clair qui définit ce qu'elle fait, documente ses attributs et fournit des annotations de type pour ses entrées. Évitez les arguments de variables dans les méthodes query et stream_query.

Instancier l'agent en local

Vous pouvez créer une instance locale de votre agent à l'aide du code suivant:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Tester la méthode query

Vous pouvez tester l'agent en envoyant des requêtes à l'instance locale:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

La réponse est un dictionnaire semblable à celui-ci:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Réponses en streaming

Pour diffuser des réponses aux requêtes, vous pouvez définir une méthode nommée stream_query qui renvoie des réponses. Par exemple, le modèle suivant étend l'exemple de base pour diffuser des réponses et peut être déployé sur Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

Voici quelques points importants à garder à l'esprit lorsque vous utilisez l'API de streaming:

  • Délai avant expiration maximal: le délai avant expiration maximal pour les réponses en streaming est de 10 minutes. Si votre agent nécessite des temps de traitement plus longs, envisagez de diviser la tâche en plus petits morceaux.
  • Streaming de modèles et de chaînes: l'interface Runnable de LangChain est compatible avec le streaming. Vous pouvez donc diffuser des réponses provenant non seulement d'agents, mais aussi de modèles et de chaînes.
  • Compatibilité avec LangChain: notez que les méthodes asynchrones telles que la méthode astream_event de LangChain ne sont pas prises en charge pour le moment.
  • Limitez la génération de contenu: si vous rencontrez des problèmes de contre-pression (lorsque le producteur génère des données plus rapidement que le consommateur ne peut les traiter), vous devez limiter votre taux de génération de contenu. Cela peut aider à éviter les débordements de mémoire tampon et à garantir une expérience de streaming fluide.

Tester la méthode stream_query

Vous pouvez tester la requête de streaming en local en appelant la méthode stream_query et en itérant les résultats. Exemple :

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Ce code imprime chaque segment de la réponse à mesure qu'il est généré. Le résultat peut ressembler à ceci:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Dans cet exemple, chaque bloc contient différentes informations sur la réponse, telles que les actions effectuées par l'agent, les messages échangés et la sortie finale.

Enregistrer des méthodes personnalisées

Par défaut, les méthodes query et stream_query sont enregistrées en tant qu'opérations dans l'agent déployé. Vous pouvez remplacer le comportement par défaut et définir l'ensemble d'opérations à enregistrer à l'aide de la méthode register_operations. Les opérations peuvent être enregistrées en tant que modes d'exécution standard (représentés par une chaîne vide "") ou en streaming ("stream").

Pour enregistrer plusieurs opérations, vous pouvez définir une méthode nommée register_operations qui liste les méthodes à mettre à la disposition des utilisateurs lorsque l'agent est déployé. Dans l'exemple de code suivant, la méthode register_operations entraîne l'enregistrement de query et get_state par l'agent déployé en tant qu'opérations exécutées de manière synchrone, et de stream_query et get_state_history en tant qu'opérations qui transmettent les réponses en streaming:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Vous pouvez tester les méthodes personnalisées en les appelant directement sur l'instance locale de l'agent, comme vous le feriez pour les méthodes query et stream_query.

Fournir des annotations de type

Vous pouvez utiliser des annotations de type pour spécifier les types d'entrée et de sortie attendus de vos méthodes d'agent. Lorsque l'agent est déployé, seuls les types sérialisables au format JSON sont acceptés à l'entrée et à la sortie des opérations compatibles avec l'agent. Les schémas des entrées et des sorties peuvent être annotés à l'aide de modèles TypedDict ou Pydantic.

Dans l'exemple suivant, nous annotons l'entrée en tant que TypedDict et convertissons la sortie brute de .get_state (qui est un NamedTuple) en dictionnaire sérialisable à l'aide de sa méthode ._asdict():

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Traçage à l'aide d'OpenTelemetry

Pour activer le traçage avec des bibliothèques d'instrumentation compatibles avec OpenTelemetry, vous pouvez les importer et les initialiser dans la méthode .set_up.

Par exemple, le modèle suivant est une modification de l'exemple de base pour exporter des traces vers Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Utiliser des variables d'environnement

Pour définir des variables d'environnement de manière à ce qu'elles fonctionnent lorsque l'agent est déployé, vous devez les définir dans la méthode .set_up, par exemple:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Intégrer Secret Manager

Pour intégrer Secret Manager:

  1. Installez la bibliothèque cliente en exécutant

    pip install google-cloud-secret-manager
  2. Suivez les instructions de la section Attribuer des rôles à un agent déployé pour accorder au service agent du moteur de raisonnement le rôle "Accesseur de secrets du gestionnaire de secrets" (roles/secretmanager.secretAccessor) via la console Google Cloud.

  3. Importez et initialisez le client dans la méthode .set_up, puis obtenez le secret correspondant si nécessaire. Par exemple, le modèle suivant est une modification de l'exemple de base pour utiliser une clé API pour ChatAnthropic qui a été stockée dans Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Gérer les identifiants

Lorsque l'agent est déployé, il peut être amené à gérer différents types d'identifiants:

  1. Identifiants par défaut de l'application (ADC) provenant généralement de comptes de service ;
  2. OAuth, qui provient généralement des comptes utilisateur ;
  3. Fournisseurs d'identité pour les identifiants de comptes externes (fédération d'identité de charge de travail).

Identifiants par défaut de l'application

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Vous pouvez l'utiliser dans le code comme suit:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Pour en savoir plus, consultez Fonctionnement des identifiants par défaut de l'application.

OAuth

Les identifiants utilisateur sont généralement obtenus à l'aide d'OAuth 2.0.

Si vous disposez d'un jeton d'accès (par exemple, à partir de oauthlib), vous pouvez créer une instance google.oauth2.credentials.Credentials. De plus, si vous obtenez un jeton d'actualisation, vous pouvez également spécifier le jeton d'actualisation et l'URI du jeton pour autoriser l'actualisation automatique des identifiants:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Ici, TOKEN_URI, CLIENT_ID et CLIENT_SECRET sont basés sur Créer un identifiant client OAuth.

Si vous ne disposez pas d'un jeton d'accès, vous pouvez utiliser google_auth_oauthlib.flow pour effectuer le flux d'attribution d'autorisation OAuth 2.0 afin d'obtenir une instance google.oauth2.credentials.Credentials correspondante:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Pour en savoir plus, consultez la documentation du module google_auth_oauthlib.flow.

Fournisseur d'identité

Si vous souhaitez authentifier les utilisateurs à l'aide d'une combinaison adresse e-mail/mot de passe, d'un numéro de téléphone, d'un compte de réseau social tel que Google, Facebook ou GitHub, ou d'un mécanisme d'authentification personnalisé, vous pouvez utiliser Identity Platform ou Firebase Authentication, ou tout fournisseur d'identité compatible avec OpenID Connect (OIDC).

Pour en savoir plus, consultez Accéder aux ressources depuis un fournisseur d'identité OIDC.

Étape suivante