Os modelos de agentes no Agente Engine são definidos como classes Python. As etapas a seguir mostram como criar um modelo personalizado para instanciar agentes que podem ser implantados na Vertex AI:
- Exemplo básico
- (Opcional) Respostas de stream
- (Opcional) Registrar métodos personalizados
- (Opcional) Fornecer anotações de tipo
- (Opcional) Rastreamento usando o OpenTelemetry
- (Opcional) Como trabalhar com variáveis de ambiente
- (Opcional) Integração com o Secret Manager
- (Opcional) Processamento de credenciais
Exemplo básico
Para dar um exemplo básico, a classe Python a seguir é um modelo para
instanciar agentes que podem ser implantados na Vertex AI (é possível atribuir um valor como MyAgent
à variável CLASS_NAME
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerações sobre implantação
Ao escrever a classe Python, os três métodos a seguir são importantes:
__init__()
:- Use esse método apenas para parâmetros de configuração do agente. Por exemplo, é possível usar esse método para coletar os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos usuários. Também é possível usar esse método para coletar parâmetros como o ID do projeto, a região, as credenciais de aplicativo e as chaves de API.
- O construtor retorna um objeto que precisa ser "serializável" para
ser implantado no Agent Engine. Portanto, inicialize
clientes de serviço e estabeleça conexões com bancos de dados no método
.set_up
em vez do método__init__
. - Esse método é opcional. Se não for especificado, a Vertex AI usará o construtor padrão do Python para a classe.
set_up()
:- Use esse método para definir a lógica de inicialização do agente. Por exemplo, use esse método para estabelecer conexões com bancos de dados ou serviços dependentes, importar pacotes dependentes ou pré-computar dados usados para atender a consultas.
- Esse método é opcional. Se não for especificado, a Vertex AI vai presumir
que o agente não precisa chamar um método
.set_up
antes de atender às consultas do usuário.
query()
/stream_query()
:- Use
query()
para retornar a resposta completa como um único resultado. - Use
stream_query()
para retornar a resposta em partes à medida que ela fica disponível, ativando uma experiência de streaming. O métodostream_query
precisa retornar um objeto iterável (por exemplo, um gerador) para ativar o streaming. - É possível implementar os dois métodos se você quiser oferecer suporte a interações de resposta única e de streaming com o agente.
- Forneça a esse método uma docstring clara que defina o que ele faz, documente os atributos e forneça anotações de tipo para as entradas.
Evite argumentos variáveis nos métodos
query
estream_query
.
- Use
Instanciar o agente localmente
É possível criar uma instância local do seu agente usando o seguinte código:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método query
É possível testar o agente enviando consultas para a instância local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
A resposta é um dicionário semelhante a este:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respostas de streaming
Para transmitir respostas a consultas, defina um método chamado stream_query
que gere respostas. Como exemplo, o modelo a seguir estende o exemplo básico
para transmitir respostas e pode ser implantado na Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
Confira alguns pontos importantes ao usar a API de streaming:
- Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o agente exigir tempos de processamento mais longos, considere dividir a tarefa em partes menores.
- Streaming de modelos e cadeias: a interface Runnable do LangChain oferece suporte a streaming, para que você possa fazer streaming de respostas não apenas de agentes, mas também de modelos e cadeias.
- Compatibilidade com LangChain: métodos assíncronos, como
o método
astream_event
do LangChain, não são compatíveis no momento. - Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processá-los), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar transbordamentos de buffer e garantir uma experiência de streaming tranquila.
Testar o método stream_query
É possível testar a consulta de streaming localmente chamando o método stream_query
e iterando os resultados. Veja um exemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Esse código imprime cada bloco da resposta conforme é gerado. A saída pode parecer esta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Neste exemplo, cada bloco contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e a saída final.
Como registrar métodos personalizados
Por padrão, os métodos query
e stream_query
são registrados como operações
no agente implantado.
É possível substituir o comportamento padrão e definir o conjunto de operações a serem
registradas usando o método register_operations
.
As operações podem ser registradas como modos de execução padrão (representado por uma string vazia
""
) ou de streaming ("stream"
).
Para registrar várias operações, defina um método chamado
register_operations
que liste os métodos que serão disponibilizados aos usuários quando
o agente for implantado. No exemplo de código abaixo, o método register_operations
resulta no agente implantado registrando query
e get_state
como
operações executadas de forma síncrona, e stream_query
e get_state_history
como
operações que transmitem as respostas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
É possível testar os métodos personalizados chamando-os diretamente na instância local
do agente, de forma semelhante a como você testaria os métodos query
e
stream_query
.
Como fornecer anotações de tipo
É possível usar anotações de tipo para especificar os tipos de entrada e saída esperados dos
métodos do agente. Quando o agente é implantado, apenas tipos serializáveis em JSON são
compatíveis com a entrada e a saída das operações aceitas pelo agente. Os
esquemas das entradas e saídas podem ser anotados usando modelos TypedDict
ou
Pydantic.
No exemplo abaixo, anotamos a entrada como um TypedDict
e convertemos
a saída bruta de .get_state
(que é um NamedTuple
) em um dicionário
serializável usando o método ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Rastreamento usando o OpenTelemetry
Para ativar o tracing com bibliotecas de instrumentação que oferecem suporte ao OpenTelemetry, é possível importá-las e inicializá-las no método .set_up
.
Como exemplo, o modelo a seguir é uma modificação do exemplo básico para exportar rastros para o Cloud Trace:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Como trabalhar com variáveis de ambiente
Para definir variáveis de ambiente de uma maneira que funcione quando o agente for implantado,
defina-as dentro do método .set_up
, por exemplo:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Como integrar com o Secret Manager
Para integrar com o Secret Manager:
Instale a biblioteca de cliente executando
pip install google-cloud-secret-manager
Siga as instruções em Conceder papéis a um agente implantado para conceder ao agente de serviço do mecanismo de inferência o papel "Acessador de secret do Secret Manager" (
roles/secretmanager.secretAccessor
) no console do Google Cloud.Importe e inicialize o cliente no método
.set_up
e receba a chave secreta correspondente quando necessário. Como exemplo, o modelo a seguir é uma modificação do exemplo básico para usar uma chave de API paraChatAnthropic
que foi armazenada no Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Como processar credenciais
Quando o agente é implantado, ele pode precisar processar diferentes tipos de credenciais:
- Credenciais padrão do aplicativo (ADC, na sigla em inglês), que geralmente vêm de contas de serviço;
- OAuth, que geralmente surge de contas de usuários;
- Provedores de identidade para credenciais de contas externas (federação de identidade da carga de trabalho).
Application Default Credentials
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Ele pode ser usado no código da seguinte maneira:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para mais detalhes, acesse Como o Application Default Credentials funciona.
OAuth
As credenciais do usuário geralmente são obtidas usando o OAuth 2.0.
Se você tiver um token de acesso (por exemplo, do oauthlib
),
é possível criar uma instância google.oauth2.credentials.Credentials
. Além disso,
se você receber um token de atualização, também poderá especificar o token de atualização e o URI
para permitir que as credenciais sejam atualizadas automaticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aqui, TOKEN_URI
, CLIENT_ID
e
CLIENT_SECRET
são baseados em Criar uma credencial de cliente OAuth.
Se você não tiver um token de acesso, use google_auth_oauthlib.flow
para
realizar o fluxo de concessão de autorização do OAuth 2.0
para receber uma instância google.oauth2.credentials.Credentials
correspondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para mais detalhes, acesse a documentação do módulo google_auth_oauthlib.flow
.
Provedor de identidade
Se você quiser autenticar usuários usando e-mail/senha, número de telefone, provedores de redes sociais como Google, Facebook ou GitHub ou um mecanismo de autenticação personalizado, use o Identity Platform ou o Firebase Authentication, ou qualquer provedor de identidade compatível com o OpenID Connect (OIDC).
Para mais detalhes, acesse Como acessar recursos de um provedor de identidade OIDC.