Benutzerdefinierten Agent entwickeln

Agent-Vorlagen in der Agent Engine werden als Python-Klassen definiert. In den folgenden Schritten wird beschrieben, wie Sie eine benutzerdefinierte Vorlage zum Instanziieren von Agenten erstellen, die in Vertex AI bereitgestellt werden können:

  1. Einfaches Beispiel
  2. Optional: Streamantworten
  3. Optional: Benutzerdefinierte Methoden registrieren
  4. Optional: Typannotationen angeben
  5. Optional: Tracing mit OpenTelemetry
  6. Optional: Mit Umgebungsvariablen arbeiten
  7. Optional: Secret Manager einbinden
  8. (Optional) Anmeldedaten verarbeiten

Einfaches Beispiel

Als einfaches Beispiel dient die folgende Python-Klasse, die als Vorlage zum Instanziieren von Agenten dient, die in Vertex AI bereitgestellt werden können. Sie können der Variablen CLASS_NAME einen Wert wie MyAgent zuweisen:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Überlegungen zur Bereitstellung

Beim Schreiben Ihrer Python-Klasse sind die folgenden drei Methoden wichtig:

  1. __init__():
    • Verwenden Sie diese Methode nur für Parameter der Agentkonfiguration. Sie können mit dieser Methode beispielsweise die Modellparameter und Sicherheitsattribute als Eingabeargumente von Ihren Nutzern erfassen. Sie können diese Methode auch verwenden, um Parameter wie die Projekt-ID, die Region, die Anmeldedaten für die Anwendung und die API-Schlüssel zu erfassen.
    • Der Konstruktor gibt ein Objekt zurück, das „pickle-fähig“ sein muss, damit es für die Agent Engine bereitgestellt werden kann. Daher sollten Sie Dienstclients initialisieren und Verbindungen zu Datenbanken in der Methode .set_up anstelle der Methode __init__ herstellen.
    • Diese Methode ist optional. Wenn er nicht angegeben ist, verwendet Vertex AI den Standard-Python-Konstruktor für die Klasse.
  2. set_up():
    • Sie müssen diese Methode verwenden, um die Logik für die Kundenservicemitarbeiter-Initialisierung zu definieren. Sie können mit dieser Methode beispielsweise Verbindungen zu Datenbanken oder abhängigen Diensten herstellen, abhängige Pakete importieren oder Daten vorausberechnen, die zur Verarbeitung von Abfragen verwendet werden.
    • Diese Methode ist optional. Wenn sie nicht angegeben ist, geht Vertex AI davon aus, dass der Agent keine .set_up-Methode aufrufen muss, bevor Nutzerabfragen verarbeitet werden.
  3. query() / stream_query():
    • Mit query() wird die vollständige Antwort als einzelnes Ergebnis zurückgegeben.
    • Mit stream_query() können Sie die Antwort in Teilen zurückgeben, sobald sie verfügbar ist. Die stream_query-Methode muss ein iterierbares Objekt (z. B. einen Generator) zurückgeben, um Streaming zu ermöglichen.
    • Sie können beide Methoden implementieren, wenn Sie sowohl Antworten als auch Streaminginteraktionen mit Ihrem Kundenservicemitarbeiter unterstützen möchten.
    • Sie sollten dieser Methode einen eindeutigen Docstring zuweisen, der ihre Funktion definiert, ihre Attribute dokumentiert und Typenannotationen für ihre Eingaben bereitstellt. Vermeiden Sie Variablenargumente in den Methoden query und stream_query.

Agent lokal instanziieren

Mit dem folgenden Code können Sie eine lokale Instanz Ihres Agents erstellen:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

query-Methode testen

Sie können den Agenten testen, indem Sie Abfragen an die lokale Instanz senden:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

Die Antwort ist ein Wörterbuch, das in etwa so aussieht:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Streamingantworten

Wenn Sie Antworten auf Abfragen streamen möchten, können Sie eine Methode namens stream_query definieren, die Antworten liefert. In der folgenden Vorlage wird das grundlegende Beispiel beispielsweise um gestreamte Antworten erweitert und kann in Vertex AI bereitgestellt werden:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

Bei der Verwendung der Streaming API ist Folgendes zu beachten:

  • Maximale Zeitüberschreitung: Die maximale Zeitüberschreitung für Streamingantworten beträgt 10 Minuten. Wenn Ihr Kundenservicemitarbeiter längere Verarbeitungszeiten benötigt, sollten Sie die Aufgabe in kleinere Teile aufteilen.
  • Streaming von Modellen und Ketten: Die Runnable-Schnittstelle von LangChain unterstützt Streaming. So können Sie nicht nur Antworten von Agents, sondern auch von Modellen und Ketten streamen.
  • LangChain-Kompatibilität: Asynchrone Methoden wie die astream_event-Methode von LangChain werden derzeit nicht unterstützt.
  • Inhaltserstellung drosseln: Wenn Backpressure-Probleme auftreten (d. h. der Produzent generiert Daten schneller, als der Verbraucher sie verarbeiten kann), sollten Sie die Inhaltserstellungsrate drosseln. So lassen sich Pufferüberläufe vermeiden und ein reibungsloses Streaming ermöglichen.

stream_query-Methode testen

Sie können die Streamingabfrage lokal testen, indem Sie die Methode stream_query aufrufen und die Ergebnisse durchgehen. Beispiel:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Mit diesem Code wird jeder Teil der Antwort ausgegeben, sobald er generiert wird. Die Ausgabe sieht in etwa so aus:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

In diesem Beispiel enthält jeder Teil verschiedene Informationen zur Antwort, z. B. die vom Kundenservicemitarbeiter ausgeführten Aktionen, die ausgetauschten Nachrichten und die endgültige Ausgabe.

Benutzerdefinierte Methoden registrieren

Die Methoden query und stream_query sind standardmäßig als Vorgänge im bereitgestellten Agenten registriert. Sie können das Standardverhalten überschreiben und die zu registrierenden Vorgänge mit der Methode register_operations definieren. Vorgänge können als Standard (durch einen leeren String "" dargestellt) oder Streaming ("stream") ausgeführt werden.

Wenn Sie mehrere Vorgänge registrieren möchten, können Sie eine Methode namens register_operations definieren, in der die Methoden aufgeführt sind, die Nutzern bei der Bereitstellung des Agents zur Verfügung gestellt werden sollen. Im folgenden Beispielcode führt die Methode register_operations dazu, dass der bereitgestellte Agent query und get_state als synchron ausgeführte Vorgänge und stream_query und get_state_history als Vorgänge registriert, die die Antworten streamen:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Sie können die benutzerdefinierten Methoden testen, indem Sie sie direkt in der lokalen Instanz des Agents aufrufen, ähnlich wie Sie die Methoden query und stream_query testen würden.

Typannotationen bereitstellen

Mithilfe von Typanmerkungen können Sie die erwarteten Eingabe- und Ausgabetypen Ihrer Agentenmethoden angeben. Nach der Bereitstellung des Agents werden bei der Eingabe und Ausgabe der vom Agenten unterstützten Vorgänge nur JSON-serialisierbare Typen unterstützt. Die Schemas der Eingaben und Ausgaben können mit TypedDict- oder Pydantic-Modellen annotiert werden.

Im folgenden Beispiel wird die Eingabe als TypedDict annotiert und die Rohausgabe von .get_state (eine NamedTuple) mithilfe der ._asdict()-Methode in ein serialisierbares Wörterbuch umgewandelt:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Tracing mit OpenTelemetry

Wenn Sie Tracing mit Instrumentierungsbibliotheken aktivieren möchten, die OpenTelemetry unterstützen, können Sie sie in der Methode .set_up importieren und initialisieren.

Die folgende Vorlage ist beispielsweise eine Modifikation des einfachen Beispiels zum Exportieren von Traces nach Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Mit Umgebungsvariablen arbeiten

Wenn Sie Umgebungsvariablen so festlegen möchten, dass sie beim Bereitstellen des Agents funktionieren, legen Sie sie in der .set_up-Methode fest, z. B. so:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Integration mit Secret Manager

So integrieren Sie Secret Manager:

  1. Installieren Sie die Clientbibliothek mit dem Befehl

    pip install google-cloud-secret-manager
  2. Folgen Sie der Anleitung unter Rollen für einen bereitgestellten Agenten gewähren, um dem Dienst-Agenten der Reasoning Engine über die Google Cloud Console die Rolle „Secret Manager Secret Accessor“ (roles/secretmanager.secretAccessor) zuzuweisen.

  3. Importieren und initialisieren Sie den Client in der .set_up-Methode und rufen Sie bei Bedarf das entsprechende Secret ab. Die folgende Vorlage ist beispielsweise eine Modifikation des Grundbeispiels, bei der ein API-Schlüssel für ChatAnthropic verwendet wird, der in Secret Manager gespeichert wurde:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Umgang mit Anmeldedaten

Bei der Bereitstellung des Agents müssen möglicherweise verschiedene Arten von Anmeldedaten verarbeitet werden:

  1. Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC), die in der Regel aus Dienstkonten stammen,
  2. OAuth, die häufig aus Nutzerkonten stammen, und
  3. Identitätsanbieter für Anmeldedaten aus externen Konten (Workload Identity Federation).

Standardanmeldedaten für Anwendungen

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Sie kann so im Code verwendet werden:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Weitere Informationen finden Sie unter Funktionsweise von Standardanmeldedaten für Anwendungen.

OAuth

Nutzeranmeldedaten werden in der Regel mit OAuth 2.0 abgerufen.

Wenn Sie ein Zugriffstoken haben (z.B. von oauthlib), können Sie eine google.oauth2.credentials.Credentials-Instanz erstellen. Wenn Sie ein Aktualisierungstoken erhalten, können Sie außerdem das Aktualisierungstoken und die Token-URI angeben, damit die Anmeldedaten automatisch aktualisiert werden:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Hier basieren TOKEN_URI, CLIENT_ID und CLIENT_SECRET auf OAuth-Client-Anmeldedaten erstellen.

Wenn du kein Zugriffstoken hast, kannst du mit google_auth_oauthlib.flow den OAuth 2.0-Autorisierungsvorgang ausführen, um eine entsprechende google.oauth2.credentials.Credentials-Instanz abzurufen:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Weitere Informationen finden Sie in der Dokumentation zum google_auth_oauthlib.flow-Modul.

Identitätsanbieter

Wenn Sie Nutzer über E-Mail/Passwort, Telefonnummer, soziale Netzwerke wie Google, Facebook oder GitHub oder einen benutzerdefinierten Authentifizierungsmechanismus authentifizieren möchten, können Sie Identity Platform, Firebase Authentication oder einen Identitätsanbieter verwenden, der OpenID Connect (OIDC) unterstützt.

Weitere Informationen finden Sie unter Über einen OIDC-Identitätsanbieter auf Ressourcen zugreifen.

Nächste Schritte