Desarrolla un agente personalizado

Las plantillas de agentes en Agent Engine se definen como clases de Python. En los siguientes pasos, se muestra cómo crear una plantilla personalizada para crear instancias de agentes que se pueden implementar en Vertex AI:

  1. Ejemplo básico
  2. Respuestas de transmisión (opcional)
  3. Registra métodos personalizados (opcional)
  4. Proporciona anotaciones de tipo (opcional)
  5. Seguimiento con OpenTelemetry (opcional)
  6. Trabaja con variables de entorno (opcional)
  7. Integración con Secret Manager (opcional)
  8. Cómo controlar las credenciales (opcional)

Ejemplo básico

Para dar un ejemplo básico, la siguiente clase de Python es una plantilla para crear instancias de agentes que se pueden implementar en Vertex AI (puedes asignar a la variable CLASS_NAME un valor como MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Consideraciones sobre la implementación

Cuando escribes tu clase de Python, los siguientes tres métodos son importantes:

  1. __init__():
    • Usa este método solo para los parámetros de configuración del agente. Por ejemplo, puedes usar este método para recopilar los parámetros del modelo y los atributos de seguridad como argumentos de entrada de tus usuarios. También puedes usar este método para recopilar parámetros, como el ID del proyecto, la región, las credenciales de la aplicación y las claves de API.
    • El constructor muestra un objeto que debe ser "pickle" para que se pueda implementar en el motor de agentes. Por lo tanto, debes inicializar los clientes de servicio y establecer conexiones con las bases de datos en el método .set_up, en lugar de hacerlo en el método __init__.
    • Este método es opcional. Si no se especifica, Vertex AI usa el constructor de Python predeterminado para la clase.
  2. set_up():
    • Debes usar este método para definir la lógica de inicialización del agente. Por ejemplo, usas este método para establecer conexiones con bases de datos o servicios dependientes, importar paquetes dependientes o datos de procesamiento previo que se usan para entregar consultas.
    • Este método es opcional. Si no se especifica, Vertex AI supone que el agente no necesita llamar a un método .set_up antes de entregar consultas de los usuarios.
  3. query() / stream_query():
    • Usa query() para mostrar la respuesta completa como un solo resultado.
    • Usa stream_query() para mostrar la respuesta en fragmentos a medida que esté disponible, lo que habilita una experiencia de transmisión. El método stream_query debe mostrar un objeto iterable (por ejemplo, un generador) para habilitar la transmisión.
    • Puedes implementar ambos métodos si deseas admitir interacciones de transmisión y de respuesta única con tu agente.
    • Debes proporcionar a este método una docstring clara que defina lo que hace, documenta sus atributos y proporciona anotaciones de tipo para sus entradas. Evita los argumentos variables en los métodos query y stream_query.

Cómo crear una instancia del agente de forma local

Puedes crear una instancia local de tu agente con el siguiente código:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Prueba el método query

Para probar el agente, envía consultas a la instancia local:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

La respuesta es un diccionario similar al siguiente:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Respuestas de transmisión

Para transmitir respuestas a las consultas, puedes definir un método llamado stream_query que genere respuestas. A modo de ejemplo, la siguiente plantilla extiende el ejemplo básico para transmitir respuestas y se puede implementar en Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

A continuación, se incluyen algunos aspectos clave que debes tener en cuenta cuando uses la API de transmisión:

  • Tiempo de espera máximo: El tiempo de espera máximo para las respuestas de transmisión es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera dividir la tarea en partes más pequeñas.
  • Modelos y cadenas de transmisión: La interfaz Runnable de LangChain admite la transmisión, por lo que puedes transmitir respuestas no solo de agentes, sino también de modelos y cadenas.
  • Compatibilidad con LangChain: Ten en cuenta que, por el momento, no se admiten métodos asíncronos, como el método astream_event de LangChain.
  • Limita la generación de contenido: Si tienes problemas de contrapresión (en los que el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la tasa de generación de contenido. Esto puede ayudar a evitar desbordamientos del búfer y garantizar una experiencia de transmisión fluida.

Prueba el método stream_query

Para probar la consulta de transmisión de forma local, llama al método stream_query y itera por los resultados. Por ejemplo:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Este código imprime cada fragmento de la respuesta a medida que se genera. El resultado podría verse de la siguiente manera:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

En este ejemplo, cada fragmento contiene información diferente sobre la respuesta, como las acciones que realizó el agente, los mensajes que se intercambiaron y el resultado final.

Registra métodos personalizados

De forma predeterminada, los métodos query y stream_query se registran como operaciones en el agente implementado. Puedes anular el comportamiento predeterminado y definir el conjunto de operaciones que se registrarán con el método register_operations. Las operaciones se pueden registrar como modos de ejecución estándar (representados por una cadena vacía "") o de transmisión ("stream").

Para registrar varias operaciones, puedes definir un método llamado register_operations que enumere los métodos que se pondrán a disposición de los usuarios cuando se implemente el agente. En el siguiente código de ejemplo, el método register_operations hará que el agente implementado registre query y get_state como operaciones que se ejecutan de forma síncrona, y stream_query y get_state_history como operaciones que transmiten las respuestas:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Para probar los métodos personalizados, llámalos directamente en la instancia local del agente, de manera similar a como probarías los métodos query y stream_query.

Cómo proporcionar anotaciones de tipo

Puedes usar anotaciones de tipo para especificar los tipos de entrada y salida esperados de los métodos de tu agente. Cuando se implementa el agente, solo se admiten tipos serializables en JSON en la entrada y la salida de las operaciones que admite el agente. Los esquemas de las entradas y salidas se pueden anotar con modelos TypedDict o Pydantic.

En el siguiente ejemplo, anotamos la entrada como TypedDict y convertimos el resultado sin procesar de .get_state (que es un NamedTuple) en un diccionario serializable con su método ._asdict():

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Realiza un seguimiento con OpenTelemetry

Para habilitar el seguimiento con bibliotecas de instrumentación que admiten OpenTelemetry, puedes importarlas e inicializarlas en el método .set_up.

A modo de ejemplo, la siguiente plantilla es una modificación del ejemplo básico para exportar seguimientos a Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Cómo trabajar con variables de entorno

Para configurar las variables de entorno de una manera que funcione cuando se implemente el agente, las establecerás dentro del método .set_up, por ejemplo:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Integración con Secret Manager

Para integrarlo con Secret Manager, haz lo siguiente:

  1. Para instalar la biblioteca cliente, ejecuta

    pip install google-cloud-secret-manager
  2. Sigue las instrucciones en Otorga roles a un agente implementado para otorgar al agente de servicio de Reasoning Engine el rol de "Administrador y descriptor de acceso a secretos" (roles/secretmanager.secretAccessor) a través de la consola de Google Cloud.

  3. Importa e inicializa el cliente en el método .set_up y obtén el secreto correspondiente cuando sea necesario. A modo de ejemplo, la siguiente plantilla es una modificación del ejemplo básico para usar una clave de API para ChatAnthropic que se almacena en Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Cómo controlar las credenciales

Cuando se implementa el agente, es posible que deba controlar diferentes tipos de credenciales:

  1. Credenciales predeterminadas de la aplicación (ADC) que suelen provenir de cuentas de servicio
  2. OAuth, que suele provenir de cuentas de usuario
  3. Proveedores de identidad para credenciales de cuentas externas (federación de identidades para cargas de trabajo).

Credenciales predeterminadas de la aplicación

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Se puede usar en el código de la siguiente manera:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Para obtener más información, consulta Cómo funcionan las credenciales predeterminadas de la aplicación.

OAuth

Por lo general, las credenciales del usuario se obtienen con OAuth 2.0.

Si tienes un token de acceso (p.ej., de oauthlib), puedes crear una instancia de google.oauth2.credentials.Credentials. Además, si obtienes un token de actualización, también puedes especificar el token de actualización y el URI del token para permitir que las credenciales se actualicen automáticamente:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Aquí, TOKEN_URI, CLIENT_ID y CLIENT_SECRET se basan en Crea una credencial de cliente de OAuth.

Si no tienes un token de acceso, puedes usar google_auth_oauthlib.flow para realizar el flujo de otorgamiento de autorización de OAuth 2.0 y obtener una instancia de google.oauth2.credentials.Credentials correspondiente:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Para obtener más información, consulta la documentación del módulo google_auth_oauthlib.flow.

Proveedor de identidad

Si quieres autenticar usuarios mediante correo electrónico/contraseña, número de teléfono, proveedores de redes sociales como Google, Facebook o GitHub, o un mecanismo de autenticación personalizado, puedes usar Identity Platform, Firebase Authentication o cualquier proveedor de identidad que admita OpenID Connect (OIDC).

Para obtener más información, consulta Cómo acceder a los recursos desde un proveedor de identidad de OIDC.

¿Qué sigue?