Las plantillas de agentes en Agent Engine se definen como clases de Python. En los siguientes pasos, se muestra cómo crear una plantilla personalizada para crear instancias de agentes que se pueden implementar en Vertex AI:
- Ejemplo básico
- Respuestas de transmisión (opcional)
- Registra métodos personalizados (opcional)
- Proporciona anotaciones de tipo (opcional)
- Seguimiento con OpenTelemetry (opcional)
- Trabaja con variables de entorno (opcional)
- Integración con Secret Manager (opcional)
- Cómo controlar las credenciales (opcional)
Ejemplo básico
Para dar un ejemplo básico, la siguiente clase de Python es una plantilla para
crear instancias de agentes que se pueden implementar en Vertex AI (puedes asignar
a la variable CLASS_NAME
un valor como MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Consideraciones sobre la implementación
Cuando escribes tu clase de Python, los siguientes tres métodos son importantes:
__init__()
:- Usa este método solo para los parámetros de configuración del agente. Por ejemplo, puedes usar este método para recopilar los parámetros del modelo y los atributos de seguridad como argumentos de entrada de tus usuarios. También puedes usar este método para recopilar parámetros, como el ID del proyecto, la región, las credenciales de la aplicación y las claves de API.
- El constructor muestra un objeto que debe ser "pickle" para
que se pueda implementar en el motor de agentes. Por lo tanto, debes inicializar
los clientes de servicio y establecer conexiones con las bases de datos en el método
.set_up
, en lugar de hacerlo en el método__init__
. - Este método es opcional. Si no se especifica, Vertex AI usa el constructor de Python predeterminado para la clase.
set_up()
:- Debes usar este método para definir la lógica de inicialización del agente. Por ejemplo, usas este método para establecer conexiones con bases de datos o servicios dependientes, importar paquetes dependientes o datos de procesamiento previo que se usan para entregar consultas.
- Este método es opcional. Si no se especifica, Vertex AI supone
que el agente no necesita llamar a un método
.set_up
antes de entregar consultas de los usuarios.
query()
/stream_query()
:- Usa
query()
para mostrar la respuesta completa como un solo resultado. - Usa
stream_query()
para mostrar la respuesta en fragmentos a medida que esté disponible, lo que habilita una experiencia de transmisión. El métodostream_query
debe mostrar un objeto iterable (por ejemplo, un generador) para habilitar la transmisión. - Puedes implementar ambos métodos si deseas admitir interacciones de transmisión y de respuesta única con tu agente.
- Debes proporcionar a este método una docstring clara que defina lo que hace,
documenta sus atributos y proporciona anotaciones de tipo para sus entradas.
Evita los argumentos variables en los métodos
query
ystream_query
.
- Usa
Cómo crear una instancia del agente de forma local
Puedes crear una instancia local de tu agente con el siguiente código:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Prueba el método query
Para probar el agente, envía consultas a la instancia local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La respuesta es un diccionario similar al siguiente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respuestas de transmisión
Para transmitir respuestas a las consultas, puedes definir un método llamado stream_query
que genere respuestas. A modo de ejemplo, la siguiente plantilla extiende el ejemplo básico para transmitir respuestas y se puede implementar en Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
A continuación, se incluyen algunos aspectos clave que debes tener en cuenta cuando uses la API de transmisión:
- Tiempo de espera máximo: El tiempo de espera máximo para las respuestas de transmisión es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera dividir la tarea en partes más pequeñas.
- Modelos y cadenas de transmisión: La interfaz Runnable de LangChain admite la transmisión, por lo que puedes transmitir respuestas no solo de agentes, sino también de modelos y cadenas.
- Compatibilidad con LangChain: Ten en cuenta que, por el momento, no se admiten métodos asíncronos, como el método
astream_event
de LangChain. - Limita la generación de contenido: Si tienes problemas de contrapresión (en los que el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la tasa de generación de contenido. Esto puede ayudar a evitar desbordamientos del búfer y garantizar una experiencia de transmisión fluida.
Prueba el método stream_query
Para probar la consulta de transmisión de forma local, llama al método stream_query
y itera por los resultados. Por ejemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Este código imprime cada fragmento de la respuesta a medida que se genera. El resultado podría verse de la siguiente manera:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
En este ejemplo, cada fragmento contiene información diferente sobre la respuesta, como las acciones que realizó el agente, los mensajes que se intercambiaron y el resultado final.
Registra métodos personalizados
De forma predeterminada, los métodos query
y stream_query
se registran como operaciones en el agente implementado.
Puedes anular el comportamiento predeterminado y definir el conjunto de operaciones que se registrarán con el método register_operations
.
Las operaciones se pueden registrar como modos de ejecución estándar (representados por una cadena vacía ""
) o de transmisión ("stream"
).
Para registrar varias operaciones, puedes definir un método llamado register_operations
que enumere los métodos que se pondrán a disposición de los usuarios cuando se implemente el agente. En el siguiente código de ejemplo, el método register_operations
hará que el agente implementado registre query
y get_state
como operaciones que se ejecutan de forma síncrona, y stream_query
y get_state_history
como operaciones que transmiten las respuestas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Para probar los métodos personalizados, llámalos directamente en la instancia local
del agente, de manera similar a como probarías los métodos query
y
stream_query
.
Cómo proporcionar anotaciones de tipo
Puedes usar anotaciones de tipo para especificar los tipos de entrada y salida esperados de los métodos de tu agente. Cuando se implementa el agente, solo se admiten tipos serializables en JSON en la entrada y la salida de las operaciones que admite el agente. Los esquemas de las entradas y salidas se pueden anotar con modelos TypedDict
o Pydantic.
En el siguiente ejemplo, anotamos la entrada como TypedDict
y convertimos el resultado sin procesar de .get_state
(que es un NamedTuple
) en un diccionario serializable con su método ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Realiza un seguimiento con OpenTelemetry
Para habilitar el seguimiento con bibliotecas de instrumentación que admiten OpenTelemetry, puedes importarlas e inicializarlas en el método .set_up
.
A modo de ejemplo, la siguiente plantilla es una modificación del ejemplo básico para exportar seguimientos a Cloud Trace:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Cómo trabajar con variables de entorno
Para configurar las variables de entorno de una manera que funcione cuando se implemente el agente,
las establecerás dentro del método .set_up
, por ejemplo:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Integración con Secret Manager
Para integrarlo con Secret Manager, haz lo siguiente:
Para instalar la biblioteca cliente, ejecuta
pip install google-cloud-secret-manager
Sigue las instrucciones en Otorga roles a un agente implementado para otorgar al agente de servicio de Reasoning Engine el rol de "Administrador y descriptor de acceso a secretos" (
roles/secretmanager.secretAccessor
) a través de la consola de Google Cloud.Importa e inicializa el cliente en el método
.set_up
y obtén el secreto correspondiente cuando sea necesario. A modo de ejemplo, la siguiente plantilla es una modificación del ejemplo básico para usar una clave de API paraChatAnthropic
que se almacena en Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Cómo controlar las credenciales
Cuando se implementa el agente, es posible que deba controlar diferentes tipos de credenciales:
- Credenciales predeterminadas de la aplicación (ADC) que suelen provenir de cuentas de servicio
- OAuth, que suele provenir de cuentas de usuario
- Proveedores de identidad para credenciales de cuentas externas (federación de identidades para cargas de trabajo).
Credenciales predeterminadas de la aplicación
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Se puede usar en el código de la siguiente manera:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para obtener más información, consulta Cómo funcionan las credenciales predeterminadas de la aplicación.
OAuth
Por lo general, las credenciales del usuario se obtienen con OAuth 2.0.
Si tienes un token de acceso (p.ej., de oauthlib
), puedes crear una instancia de google.oauth2.credentials.Credentials
. Además,
si obtienes un token de actualización, también puedes especificar el token de actualización y el URI
del token para permitir que las credenciales se actualicen automáticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aquí, TOKEN_URI
, CLIENT_ID
y CLIENT_SECRET
se basan en Crea una credencial de cliente de OAuth.
Si no tienes un token de acceso, puedes usar google_auth_oauthlib.flow
para realizar el flujo de otorgamiento de autorización de OAuth 2.0 y obtener una instancia de google.oauth2.credentials.Credentials
correspondiente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para obtener más información, consulta la documentación del módulo google_auth_oauthlib.flow
.
Proveedor de identidad
Si quieres autenticar usuarios mediante correo electrónico/contraseña, número de teléfono, proveedores de redes sociales como Google, Facebook o GitHub, o un mecanismo de autenticación personalizado, puedes usar Identity Platform, Firebase Authentication o cualquier proveedor de identidad que admita OpenID Connect (OIDC).
Para obtener más información, consulta Cómo acceder a los recursos desde un proveedor de identidad de OIDC.