開發自訂代理程式

Vertex AI Agent Engine 中的代理範本定義為 Python 類別。下列步驟說明如何建立自訂範本,用於例項化可在 Vertex AI 部署的代理程式:

  1. 基本範例
  2. (選用) 逐句回覆
  3. (選用) 註冊自訂方法
  4. (選用) 提供類型註解
  5. (選用) 將追蹤記錄傳送至 Cloud Trace
  6. (選用) 使用環境變數
  7. (選用) 與 Secret Manager 整合
  8. (選用) 處理憑證
  9. (選用) 處理錯誤

基本範例

舉例來說,下列 Python 類別是例項化代理程式的範本,可部署在 Vertex AI 上 (您可以為 CLASS_NAME 變數提供值,例如 MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

部署考量事項

編寫 Python 類別時,下列三種方法非常重要:

  1. __init__()
    • 這個方法僅適用於代理程式設定參數。舉例來說,您可以使用這個方法,從使用者收集模型參數和安全屬性做為輸入引數。您也可以使用這個方法收集專案 ID、區域、應用程式憑證和 API 金鑰等參數。
    • 建構函式會傳回物件,該物件必須「可 Pickle 化」,才能部署至 Vertex AI Agent Engine。因此,您應該在 .set_up 方法中初始化服務用戶端,並建立與資料庫的連線,而不是在 __init__ 方法中執行這項操作。
    • 此為選用方法。如未指定,Vertex AI 會使用該類別的預設 Python 建構函式。
  2. set_up()
    • 您必須使用這個方法定義代理程式初始化邏輯。舉例來說,您可以使用這個方法建立資料庫或相依服務的連線、匯入相依套件,或預先計算用於提供查詢服務的資料。
    • 此為選用方法。如未指定,Vertex AI 會假設代理程式不需要在處理使用者查詢前呼叫 .set_up 方法。
  3. query() / stream_query()
    • 使用 query() 將完整的回應做為單一結果傳回。
    • 使用 stream_query() 可在回應可用時以區塊形式傳回,提供串流體驗。stream_query 方法必須傳回可疊代物件 (例如產生器),才能啟用串流。
    • 如要支援代理程式的單一回應和串流互動,可以同時導入這兩種方法。
    • 您應為這個方法提供清楚的說明字串,定義其用途、記錄其屬性,並為輸入內容提供型別註解。避免在 querystream_query 方法中使用變數引數。

在本機上建立代理程式例項

您可以使用下列程式碼,建立代理程式的本機執行個體:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

測試 query 方法

您可以將查詢傳送到本機執行個體,測試代理程式:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

回應是類似下列內容的字典:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

非同步查詢

如要非同步回應查詢,可以定義傳回 Python 協同程式的方法 (例如 async_query)。舉例來說,下列範本會擴充基本範例,以非同步方式回應,並可部署在 Vertex AI 上:

class AsyncAgent(CLASS_NAME):

    async def async_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.ainvoke(**kwargs):
            yield dumpd(chunk)

agent = AsyncAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

測試 async_query 方法

您可以呼叫 async_query 方法,在本機測試代理程式。範例如下:

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

回應是類似下列內容的字典:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

串流回應

如要將查詢的回應串流化,可以定義名為 stream_query 的方法,產生回應。舉例來說,下列範本會擴充基本範例來串流回應,並可部署在 Vertex AI 上:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

agent = StreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

使用串流 API 時,請注意下列幾項要點:

  • 逾時時間上限:串流回應的逾時時間上限為 10 分鐘。如果代理程式需要較長的處理時間,請考慮將工作拆成多個小任務。
  • 串流模型和鏈結:LangChain 的 Runnable 介面支援串流,因此您不僅可以串流代理程式的回覆,也可以串流模型和鏈結的回覆。
  • LangChain 相容性:請注意,目前不支援非同步方法,例如 LangChain 的 astream_event 方法。
  • 節流內容生成作業:如果遇到背壓問題 (即生產者生成資料的速度比消費者處理資料的速度快),請節流內容生成速率。這有助於避免緩衝區溢位,確保串流體驗順暢。

測試 stream_query 方法

您可以呼叫 stream_query 方法並疊代結果,在本機測試串流查詢。範例如下:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

這段程式碼會在生成回覆的每個區塊時,列印該區塊。輸出內容可能如下所示:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

在這個範例中,每個區塊都包含不同的回應資訊,例如代理程式執行的動作、交換的訊息和最終輸出內容。

以非同步方式逐句顯示回覆

如要以非同步方式串流回應,可以定義傳回非同步產生器的方法 (例如 async_stream_query)。舉例來說,下列範本會擴充基本範例,以非同步方式串流回應,並可部署在 Vertex AI 上:

class AsyncStreamingAgent(CLASS_NAME):

    async def async_stream_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.astream(**kwargs):
            yield dumpd(chunk)

agent = AsyncStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

測試 async_stream_query 方法

與測試串流查詢的程式碼類似,您可以呼叫 async_stream_query 方法並逐一查看結果,在本機測試代理程式。範例如下:

import pprint

async for chunk in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

這段程式碼會在生成回覆的每個區塊時,列印該區塊。輸出內容可能如下所示:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

註冊自訂方法

根據預設,方法 querystream_query 會註冊為已部署代理程式中的作業。您可以覆寫預設行為,並使用 register_operations 方法定義要註冊的一組作業。作業可註冊為標準 (以空字串 "" 表示) 或串流 ("stream") 執行模式。

如要註冊多項作業,可以定義名為 register_operations 的方法,列出代理程式部署後可供使用者使用的方法。在下列程式碼範例中,register_operations 方法會導致已部署的代理程式將 queryget_state 註冊為同步執行的作業,並將 stream_queryget_state_history 註冊為串流回應的作業:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

您可以直接在服務專員的本機例項上呼叫自訂方法,測試這些方法,做法與測試 querystream_query 方法類似。

提供型別註解

您可以使用型別註解,指定代理程式方法的預期輸入和輸出型別。部署代理程式後,代理程式支援的作業輸入和輸出內容,就只支援可序列化為 JSON 的型別。您可以使用 TypedDict 或 Pydantic 模型,為輸入和輸出內容的結構定義加上註解。

在以下範例中,我們會將輸入內容註解為 TypedDict,並使用 ._asdict() 方法,將 .get_state 的原始輸出內容 (即 NamedTuple) 轉換為可序列化的字典:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

將追蹤記錄傳送至 Cloud Trace

如要使用支援 OpenTelemetry 的檢測程式庫將追蹤記錄傳送至 Cloud Trace,您可以在 .set_up 方法中匯入並初始化這些程式庫。對於常見的代理程式架構,您或許可以搭配使用 Open Telemetry Google Cloud 整合服務和檢測架構,例如 OpenInferenceOpenLLMetry

舉例來說,下列範本是基本範例的修改版本,可將追蹤記錄匯出至 Cloud Trace:

OpenInference

首先,請執行以下指令,使用 pip 安裝必要套件

pip install openinference-instrumentation-langchain==0.1.34

接著,匯入並初始化檢測工具:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

OpenLLMetry

首先,請執行以下指令,使用 pip 安裝必要套件

pip install opentelemetry-instrumentation-langchain==0.38.10

接著,匯入並初始化檢測工具:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from opentelemetry.instrumentation.langchain import LangchainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangchainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

使用環境變數

如要設定環境變數,請確保開發期間可透過 os.environ 存取這些變數,並在部署代理程式時按照「定義環境變數」一節的說明操作。

與 Secret Manager 整合

如要與 Secret Manager 整合,請按照下列步驟操作:

  1. 執行下列指令,安裝用戶端程式庫

    pip install google-cloud-secret-manager
  2. 請按照「為已部署的代理程式授予角色」一文中的操作說明,透過 Google Cloud 控制台,將「Secret Manager 密鑰存取者」角色 (roles/secretmanager.secretAccessor) 授予服務帳戶

  3. .set_up 方法中匯入並初始化用戶端,並視需要取得對應的密鑰。舉例來說,下列範本是基本範例的修改版本,會使用儲存在 Secret Manager 中的 ChatAnthropic API 金鑰:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

處理憑證

部署代理程式時,可能需要處理不同類型的憑證:

  1. 應用程式預設憑證 (ADC),通常來自服務帳戶。
  2. OAuth,通常是使用者帳戶所致,以及
  3. 外部帳戶憑證的身分識別提供者 (工作負載身分聯盟)。

應用程式預設憑證

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

您可以在程式碼中以下列方式使用:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

詳情請參閱「應用程式預設憑證的運作方式」。

OAuth

使用者憑證通常是透過 OAuth 2.0 取得。

如果您有存取權杖 (例如來自 oauthlib),可以建立 google.oauth2.credentials.Credentials 執行個體。此外,如果您取得更新權杖,也可以指定更新權杖和權杖 URI,讓憑證自動更新:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

這裡的 TOKEN_URICLIENT_IDCLIENT_SECRET 是根據「建立 OAuth 用戶端憑證」而來。

如果沒有存取權杖,可以使用 google_auth_oauthlib.flow 執行 OAuth 2.0 授權授予流程,取得對應的 google.oauth2.credentials.Credentials 執行個體:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

詳情請參閱 google_auth_oauthlib.flow 模組的說明文件

識別資訊提供者

如要使用電子郵件/密碼、電話號碼、社交媒體供應商 (如 Google、Facebook 或 GitHub) 或是自訂的驗證機制來驗證使用者,您可以使用 Identity PlatformFirebase 驗證,或是任何支援 OpenID Connect (OIDC) 的身分識別提供者。

詳情請參閱「從 OIDC 識別資訊提供者存取資源」。

處理錯誤

為確保 API 錯誤以結構化 JSON 格式傳回,建議您使用 try...except 區塊在代理程式碼中實作錯誤處理機制,這可以抽象化為裝飾器。

雖然 Vertex AI Agent Engine 可以在內部處理各種狀態碼,但 Python 缺少標準化方式,可針對所有例外狀況類型,以相關聯的 HTTP 狀態碼表示錯誤。嘗試將所有可能的 Python 例外狀況對應至基礎服務中的 HTTP 狀態,會很複雜且難以維護。

更具擴充性的做法是在代理程式方法中明確擷取相關例外狀況,或使用 error_wrapper 等可重複使用的裝飾器。然後,您可以關聯適當的狀態碼 (例如,將 codeerror 屬性新增至自訂例外狀況,或特別處理標準例外狀況),並將錯誤格式化為 JSON 字典,做為傳回值。這項作業只需要在代理程式方法中進行極少的程式碼變更,通常只要新增裝飾器即可。

以下範例說明如何在代理程式中實作錯誤處理:

from functools import wraps
import json

def error_wrapper(func):
    @wraps(func)  # Preserve original function metadata
    def wrapper(*args, **kwargs):
        try:
            # Execute the original function with its arguments
            return func(*args, **kwargs)
        except Exception as err:
            error_code = getattr(err, 'code')
            error_message = getattr(err, 'error')

            # Construct the error response dictionary
            error_response = {
                "error": {
                    "code": error_code,
                    "message": f"'{func.__name__}': {error_message}"
                }
            }
            # Return the Python dictionary directly.
            return error_response

    return wrapper

# Example exception
class SessionNotFoundError(Exception):
    def __init__(self, session_id, message="Session not found"):
        self.code = 404
        self.error = f"{message}: {session_id}"
        super().__init__(self.error)

# Example Agent Class
class MyAgent:
    @error_wrapper
    def get_session(self, session_id: str):
        # Simulate the condition where the session isn't found
        raise SessionNotFoundError(session_id=session_id)


# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))

上述程式碼會產生下列輸出內容: json { "error": { "code": 404, "message": "Invocation error in 'get_session': Session not found: nonexistent_session_123" } }

後續步驟