开发自定义代理

Agent Engine 中的代理模板定义为 Python 类。以下步骤介绍了如何创建自定义模板,以实例化可在 Vertex AI 上部署的代理:

  1. 基本示例
  2. (可选)流式传输响应
  3. (可选)注册自定义方法
  4. (可选)提供类型注解
  5. (可选)使用 OpenTelemetry 进行跟踪
  6. (可选)使用环境变量
  7. (可选)与 Secret Manager 集成
  8. (可选)处理凭据

基本示例

举个基本示例,以下 Python 类是用于实例化可在 Vertex AI 上部署的代理的模板(您可以为 CLASS_NAME 变量赋值,例如 MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

部署考虑事项

编写 Python 类时,以下三个方法非常重要:

  1. __init__()
    • 此方法仅适用于代理配置参数。例如,您可以使用此方法从用户那里收集模型参数和安全属性作为输入参数。您还可以使用此方法收集项目 ID、区域、应用凭据和 API 密钥等参数。
    • 构造函数返回的对象必须是“可压缩”的,才能部署到 Agent Engine。因此,您应在 .set_up 方法(而非 __init__ 方法)中初始化服务客户端并建立与数据库的连接。
    • 此方法为可选方法。如果未指定,Vertex AI 会使用该类的默认 Python 构造函数。
  2. set_up()
    • 您必须使用此方法来定义代理初始化逻辑。例如,您可以使用此方法建立与数据库或依赖服务的连接、导入依赖软件包或预计算用于处理查询的数据。
    • 此方法为可选方法。如果未指定,则 Vertex AI 会假定代理无需在处理用户查询前调用 .set_up 方法。
  3. query() / stream_query()
    • 使用 query() 可将完整响应作为单个结果返回。
    • 使用 stream_query() 在响应可用时以分块的形式返回响应,从而实现流式传输体验。stream_query 方法必须返回可迭代的对象(例如生成器),才能启用流式传输。
    • 如果您希望同时支持与客服人员进行单次响应和流式互动,可以同时实现这两种方法。
    • 您应为此方法提供一个清晰的文档字符串,用于定义它的作用、记录其属性以及为其输入提供类型注释。避免在 querystream_query 方法中使用可变参数。

在本地实例化代理

您可以使用以下代码创建代理的本地实例:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

测试 query 方法

您可以将查询发送到本地实例以测试代理:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

响应是一个类似于以下内容的字典:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

流式响应

如需对查询进行流式响应,您可以定义一个名为 stream_query 的方法来生成响应。例如,以下模板会将基本示例扩展为流式传输响应,并且可在 Vertex AI 上部署:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

在使用流式传输 API 时,请注意以下几点:

  • 超时上限:流式响应的超时上限为 10 分钟。如果您的代理需要更长时间的处理时间,请考虑将任务拆分成更小的部分。
  • 流式传输模型和链:LangChain 的 Runnable 接口支持流式传输,因此您不仅可以流式传输代理的回答,还可以流式传输模型和链的回答。
  • LangChain 兼容性:请注意,目前不支持 LangChain 的 astream_event 方法等异步方法。
  • 节流内容生成:如果您遇到回压问题(生产方生成数据的速度比使用方处理数据的速度快),则应节流内容生成速率。这有助于防止缓冲区溢出,并确保流式传输体验顺畅。

测试 stream_query 方法

您可以通过调用 stream_query 方法并迭代结果,在本地测试流式传输查询。示例如下:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

此代码会在响应生成时输出响应的每个分块。输出可能如下所示:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

在此示例中,每个分块都包含与响应相关的不同信息,例如客服人员执行的操作、交换的消息和最终输出。

注册自定义方法

默认情况下,方法 querystream_query 会在部署的代理中注册为操作。您可以使用 register_operations 方法替换默认行为并定义要注册的一组操作。操作可以注册为标准(由空字符串 "" 表示)或流式传输("stream")执行模式。

如需注册多个操作,您可以定义一个名为 register_operations 的方法,其中列出在部署代理时要向用户提供的方法。在以下示例代码中,register_operations 方法将导致部署的代理将 queryget_state 注册为同步运行的操作,并将 stream_queryget_state_history 注册为串流响应的操作:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

您可以通过直接在代理的本地实例上调用自定义方法来测试这些方法,这与测试 querystream_query 方法类似。

提供类型注解

您可以使用类型注解指定代理方法的预期输入和输出类型。部署代理后,代理支持的操作的输入和输出中仅支持 JSON 可序列化类型。您可以使用 TypedDict 或 Pydantic 模型为输入和输出的架构添加注解。

在以下示例中,我们将输入注解为 TypedDict,并使用 .get_state(即 NamedTuple)的 ._asdict() 方法将其原始输出转换为可序列化字典:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

使用 OpenTelemetry 进行跟踪

如需使用支持 OpenTelemetry 的插桩库启用跟踪,您可以在 .set_up 方法中导入并初始化这些库。

例如,以下模板是对基本示例的修改,用于将轨迹导出到 Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

使用环境变量

如需以在部署代理时有效的方式设置环境变量,您需要在 .set_up 方法内进行设置,例如:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

与 Secret Manager 集成

如需与 Secret Manager 集成,请执行以下操作:

  1. 运行以下命令安装客户端库

    pip install google-cloud-secret-manager
  2. 按照为已部署的代理授予角色中的说明,通过 Google Cloud 控制台向 Reasoning Engine 服务代理授予“Secret Manager Secret Accessor”角色 (roles/secretmanager.secretAccessor)。

  3. .set_up 方法中导入并初始化客户端,并在需要时获取相应的密钥。例如,以下模板是对基本示例的修改,用于为存储在 Secret Manager 中的 ChatAnthropic 使用 API 密钥:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

处理凭据

部署代理时,代理可能需要处理不同类型的凭据:

  1. 应用默认凭据 (ADC),通常由服务账号生成,
  2. OAuth(通常由用户账号引起)和
  3. 用于外部账号凭据的身份提供方(工作负载身份联合)。

应用默认凭据

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

您可以在代码中按如下方式使用它:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

如需了解详情,请参阅应用默认凭据的工作原理

OAuth

用户凭据通常是使用 OAuth 2.0 获取的。

如果您有访问令牌(例如通过 oauthlib 获取的令牌),则可以创建 google.oauth2.credentials.Credentials 实例。此外,如果您获取了刷新令牌,还可以指定刷新令牌和令牌 URI,以允许自动刷新凭据:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

在这里,TOKEN_URICLIENT_IDCLIENT_SECRET 基于创建 OAuth 客户端凭据

如果您没有访问令牌,可以使用 google_auth_oauthlib.flow 执行 OAuth 2.0 授权授予流程,以获取相应的 google.oauth2.credentials.Credentials 实例:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

如需了解详情,请参阅 google_auth_oauthlib.flow 模块的文档

身份提供方

如果要使用电子邮件/密码、电话号码、Google、Facebook 或 GitHub 等社交服务提供方或自定义身份验证机制对用户进行身份验证,则可以使用 Identity PlatformFirebase Authentication,或任何支持 OpenID Connect (OIDC) 的身份提供方。

如需了解详情,请参阅从 OIDC 身份提供商访问资源

后续步骤