Desenvolver um agente personalizado

Os modelos de agentes no Agente Engine são definidos como classes Python. As etapas a seguir mostram como criar um modelo personalizado para instanciar agentes que podem ser implantados na Vertex AI:

  1. Exemplo básico
  2. (Opcional) Respostas de stream
  3. (Opcional) Registrar métodos personalizados
  4. (Opcional) Fornecer anotações de tipo
  5. (Opcional) Rastreamento usando o OpenTelemetry
  6. (Opcional) Como trabalhar com variáveis de ambiente
  7. (Opcional) Integração com o Secret Manager
  8. (Opcional) Processamento de credenciais

Exemplo básico

Para dar um exemplo básico, a classe Python a seguir é um modelo para instanciar agentes que podem ser implantados na Vertex AI (é possível atribuir um valor como MyAgent à variável CLASS_NAME):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Considerações sobre implantação

Ao escrever a classe Python, os três métodos a seguir são importantes:

  1. __init__():
    • Use esse método apenas para parâmetros de configuração do agente. Por exemplo, é possível usar esse método para coletar os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos usuários. Também é possível usar esse método para coletar parâmetros como o ID do projeto, a região, as credenciais de aplicativo e as chaves de API.
    • O construtor retorna um objeto que precisa ser "serializável" para ser implantado no Agent Engine. Portanto, inicialize clientes de serviço e estabeleça conexões com bancos de dados no método .set_up em vez do método __init__.
    • Esse método é opcional. Se não for especificado, a Vertex AI usará o construtor padrão do Python para a classe.
  2. set_up():
    • Use esse método para definir a lógica de inicialização do agente. Por exemplo, use esse método para estabelecer conexões com bancos de dados ou serviços dependentes, importar pacotes dependentes ou pré-computar dados usados para atender a consultas.
    • Esse método é opcional. Se não for especificado, a Vertex AI vai presumir que o agente não precisa chamar um método .set_up antes de atender às consultas do usuário.
  3. query() / stream_query():
    • Use query() para retornar a resposta completa como um único resultado.
    • Use stream_query() para retornar a resposta em partes à medida que ela fica disponível, ativando uma experiência de streaming. O método stream_query precisa retornar um objeto iterável (por exemplo, um gerador) para ativar o streaming.
    • É possível implementar os dois métodos se você quiser oferecer suporte a interações de resposta única e de streaming com o agente.
    • Forneça a esse método uma docstring clara que defina o que ele faz, documente os atributos e forneça anotações de tipo para as entradas. Evite argumentos variáveis nos métodos query e stream_query.

Instanciar o agente localmente

É possível criar uma instância local do seu agente usando o seguinte código:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Testar o método query

É possível testar o agente enviando consultas para a instância local:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

A resposta é um dicionário semelhante a este:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Respostas de streaming

Para transmitir respostas a consultas, defina um método chamado stream_query que gere respostas. Como exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas e pode ser implantado na Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

Confira alguns pontos importantes ao usar a API de streaming:

  • Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o agente exigir tempos de processamento mais longos, considere dividir a tarefa em partes menores.
  • Streaming de modelos e cadeias: a interface Runnable do LangChain oferece suporte a streaming, para que você possa fazer streaming de respostas não apenas de agentes, mas também de modelos e cadeias.
  • Compatibilidade com LangChain: métodos assíncronos, como o método astream_event do LangChain, não são compatíveis no momento.
  • Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processá-los), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar transbordamentos de buffer e garantir uma experiência de streaming tranquila.

Testar o método stream_query

É possível testar a consulta de streaming localmente chamando o método stream_query e iterando os resultados. Veja um exemplo:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Esse código imprime cada bloco da resposta conforme é gerado. A saída pode parecer esta:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Neste exemplo, cada bloco contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e a saída final.

Como registrar métodos personalizados

Por padrão, os métodos query e stream_query são registrados como operações no agente implantado. É possível substituir o comportamento padrão e definir o conjunto de operações a serem registradas usando o método register_operations. As operações podem ser registradas como modos de execução padrão (representado por uma string vazia "") ou de streaming ("stream").

Para registrar várias operações, defina um método chamado register_operations que liste os métodos que serão disponibilizados aos usuários quando o agente for implantado. No exemplo de código abaixo, o método register_operations resulta no agente implantado registrando query e get_state como operações executadas de forma síncrona, e stream_query e get_state_history como operações que transmitem as respostas:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

É possível testar os métodos personalizados chamando-os diretamente na instância local do agente, de forma semelhante a como você testaria os métodos query e stream_query.

Como fornecer anotações de tipo

É possível usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do agente. Quando o agente é implantado, apenas tipos serializáveis em JSON são compatíveis com a entrada e a saída das operações aceitas pelo agente. Os esquemas das entradas e saídas podem ser anotados usando modelos TypedDict ou Pydantic.

No exemplo abaixo, anotamos a entrada como um TypedDict e convertemos a saída bruta de .get_state (que é um NamedTuple) em um dicionário serializável usando o método ._asdict():

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Rastreamento usando o OpenTelemetry

Para ativar o tracing com bibliotecas de instrumentação que oferecem suporte ao OpenTelemetry, é possível importá-las e inicializá-las no método .set_up.

Como exemplo, o modelo a seguir é uma modificação do exemplo básico para exportar rastros para o Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Como trabalhar com variáveis de ambiente

Para definir variáveis de ambiente de uma maneira que funcione quando o agente for implantado, defina-as dentro do método .set_up, por exemplo:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Como integrar com o Secret Manager

Para integrar com o Secret Manager:

  1. Instale a biblioteca de cliente executando

    pip install google-cloud-secret-manager
  2. Siga as instruções em Conceder papéis a um agente implantado para conceder ao agente de serviço do mecanismo de inferência o papel "Acessador de secret do Secret Manager" (roles/secretmanager.secretAccessor) no console do Google Cloud.

  3. Importe e inicialize o cliente no método .set_up e receba a chave secreta correspondente quando necessário. Como exemplo, o modelo a seguir é uma modificação do exemplo básico para usar uma chave de API para ChatAnthropic que foi armazenada no Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Como processar credenciais

Quando o agente é implantado, ele pode precisar processar diferentes tipos de credenciais:

  1. Credenciais padrão do aplicativo (ADC, na sigla em inglês), que geralmente vêm de contas de serviço;
  2. OAuth, que geralmente surge de contas de usuários;
  3. Provedores de identidade para credenciais de contas externas (federação de identidade da carga de trabalho).

Application Default Credentials

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Ele pode ser usado no código da seguinte maneira:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Para mais detalhes, acesse Como o Application Default Credentials funciona.

OAuth

As credenciais do usuário geralmente são obtidas usando o OAuth 2.0.

Se você tiver um token de acesso (por exemplo, do oauthlib), é possível criar uma instância google.oauth2.credentials.Credentials. Além disso, se você receber um token de atualização, também poderá especificar o token de atualização e o URI para permitir que as credenciais sejam atualizadas automaticamente:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Aqui, TOKEN_URI, CLIENT_ID e CLIENT_SECRET são baseados em Criar uma credencial de cliente OAuth.

Se você não tiver um token de acesso, use google_auth_oauthlib.flow para realizar o fluxo de concessão de autorização do OAuth 2.0 para receber uma instância google.oauth2.credentials.Credentials correspondente:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Para mais detalhes, acesse a documentação do módulo google_auth_oauthlib.flow.

Provedor de identidade

Se você quiser autenticar usuários usando e-mail/senha, número de telefone, provedores de redes sociais como Google, Facebook ou GitHub ou um mecanismo de autenticação personalizado, use o Identity Platform ou o Firebase Authentication, ou qualquer provedor de identidade compatível com o OpenID Connect (OIDC).

Para mais detalhes, acesse Como acessar recursos de um provedor de identidade OIDC.

A seguir