Vertex AI Agent Engine 中的代理範本定義為 Python 類別。下列步驟說明如何建立自訂範本,用於例項化可在 Vertex AI 部署的代理程式:
- 基本範例
- (選用) 逐句回覆
- (選用) 註冊自訂方法
- (選用) 提供類型註解
- (選用) 將追蹤記錄傳送至 Cloud Trace
- (選用) 使用環境變數
- (選用) 與 Secret Manager 整合
- (選用) 處理憑證
- (選用) 處理錯誤
基本範例
舉例來說,下列 Python 類別是例項化代理程式的範本,可部署在 Vertex AI 上 (您可以為 CLASS_NAME
變數提供值,例如 MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
部署考量事項
編寫 Python 類別時,下列三種方法非常重要:
__init__()
:- 這個方法僅適用於代理程式設定參數。舉例來說,您可以使用這個方法,從使用者收集模型參數和安全屬性做為輸入引數。您也可以使用這個方法收集專案 ID、區域、應用程式憑證和 API 金鑰等參數。
- 建構函式會傳回物件,該物件必須「可 Pickle 化」,才能部署至 Vertex AI Agent Engine。因此,您應該在
.set_up
方法中初始化服務用戶端,並建立與資料庫的連線,而不是在__init__
方法中執行這項操作。 - 此為選用方法。如未指定,Vertex AI 會使用該類別的預設 Python 建構函式。
set_up()
:- 您必須使用這個方法定義代理程式初始化邏輯。舉例來說,您可以使用這個方法建立資料庫或相依服務的連線、匯入相依套件,或預先計算用於提供查詢服務的資料。
- 此為選用方法。如未指定,Vertex AI 會假設代理程式不需要在處理使用者查詢前呼叫
.set_up
方法。
query()
/stream_query()
:- 使用
query()
將完整的回應做為單一結果傳回。 - 使用
stream_query()
可在回應可用時以區塊形式傳回,提供串流體驗。stream_query
方法必須傳回可疊代物件 (例如產生器),才能啟用串流。 - 如要支援代理程式的單一回應和串流互動,可以同時導入這兩種方法。
- 您應為這個方法提供清楚的說明字串,定義其用途、記錄其屬性,並為輸入內容提供型別註解。避免在
query
和stream_query
方法中使用變數引數。
- 使用
在本機上建立代理程式例項
您可以使用下列程式碼,建立代理程式的本機執行個體:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
測試 query
方法
您可以將查詢傳送到本機執行個體,測試代理程式:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
回應是類似下列內容的字典:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
非同步查詢
如要非同步回應查詢,可以定義傳回 Python 協同程式的方法 (例如 async_query
)。舉例來說,下列範本會擴充基本範例,以非同步方式回應,並可部署在 Vertex AI 上:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
測試 async_query
方法
您可以呼叫 async_query
方法,在本機測試代理程式。範例如下:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
回應是類似下列內容的字典:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
串流回應
如要將查詢的回應串流化,可以定義名為 stream_query
的方法,產生回應。舉例來說,下列範本會擴充基本範例來串流回應,並可部署在 Vertex AI 上:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
使用串流 API 時,請注意下列幾項要點:
- 逾時時間上限:串流回應的逾時時間上限為 10 分鐘。如果代理程式需要較長的處理時間,請考慮將工作拆成多個小任務。
- 串流模型和鏈結:LangChain 的 Runnable 介面支援串流,因此您不僅可以串流代理程式的回覆,也可以串流模型和鏈結的回覆。
- LangChain 相容性:請注意,目前不支援非同步方法,例如 LangChain 的
astream_event
方法。 - 節流內容生成作業:如果遇到背壓問題 (即生產者生成資料的速度比消費者處理資料的速度快),請節流內容生成速率。這有助於避免緩衝區溢位,確保串流體驗順暢。
測試 stream_query
方法
您可以呼叫 stream_query
方法並疊代結果,在本機測試串流查詢。範例如下:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
這段程式碼會在生成回覆的每個區塊時,列印該區塊。輸出內容可能如下所示:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
在這個範例中,每個區塊都包含不同的回應資訊,例如代理程式執行的動作、交換的訊息和最終輸出內容。
以非同步方式逐句顯示回覆
如要以非同步方式串流回應,可以定義傳回非同步產生器的方法 (例如 async_stream_query
)。舉例來說,下列範本會擴充基本範例,以非同步方式串流回應,並可部署在 Vertex AI 上:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
測試 async_stream_query
方法
與測試串流查詢的程式碼類似,您可以呼叫 async_stream_query
方法並逐一查看結果,在本機測試代理程式。範例如下:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
這段程式碼會在生成回覆的每個區塊時,列印該區塊。輸出內容可能如下所示:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
註冊自訂方法
根據預設,方法 query
和 stream_query
會註冊為已部署代理程式中的作業。您可以覆寫預設行為,並使用 register_operations
方法定義要註冊的一組作業。作業可註冊為標準 (以空字串 ""
表示) 或串流 ("stream"
) 執行模式。
如要註冊多項作業,可以定義名為 register_operations
的方法,列出代理程式部署後可供使用者使用的方法。在下列程式碼範例中,register_operations
方法會導致已部署的代理程式將 query
和 get_state
註冊為同步執行的作業,並將 stream_query
和 get_state_history
註冊為串流回應的作業:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
您可以直接在服務專員的本機例項上呼叫自訂方法,測試這些方法,做法與測試 query
和 stream_query
方法類似。
提供型別註解
您可以使用型別註解,指定代理程式方法的預期輸入和輸出型別。部署代理程式後,代理程式支援的作業輸入和輸出內容,就只支援可序列化為 JSON 的型別。您可以使用 TypedDict
或 Pydantic 模型,為輸入和輸出內容的結構定義加上註解。
在以下範例中,我們會將輸入內容註解為 TypedDict
,並使用 ._asdict()
方法,將 .get_state
的原始輸出內容 (即 NamedTuple
) 轉換為可序列化的字典:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
將追蹤記錄傳送至 Cloud Trace
如要使用支援 OpenTelemetry 的檢測程式庫將追蹤記錄傳送至 Cloud Trace,您可以在 .set_up
方法中匯入並初始化這些程式庫。對於常見的代理程式架構,您或許可以搭配使用 Open Telemetry Google Cloud 整合服務和檢測架構,例如 OpenInference 或 OpenLLMetry。
舉例來說,下列範本是基本範例的修改版本,可將追蹤記錄匯出至 Cloud Trace:
OpenInference
首先,請執行以下指令,使用 pip
安裝必要套件:
pip install openinference-instrumentation-langchain==0.1.34
接著,匯入並初始化檢測工具:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
首先,請執行以下指令,使用 pip
安裝必要套件:
pip install opentelemetry-instrumentation-langchain==0.38.10
接著,匯入並初始化檢測工具:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
使用環境變數
如要設定環境變數,請確保開發期間可透過 os.environ
存取這些變數,並在部署代理程式時按照「定義環境變數」一節的說明操作。
與 Secret Manager 整合
如要與 Secret Manager 整合,請按照下列步驟操作:
執行下列指令,安裝用戶端程式庫:
pip install google-cloud-secret-manager
請按照「為已部署的代理程式授予角色」一文中的操作說明,透過 Google Cloud 控制台,將「Secret Manager 密鑰存取者」角色 (
roles/secretmanager.secretAccessor
) 授予服務帳戶。在
.set_up
方法中匯入並初始化用戶端,並視需要取得對應的密鑰。舉例來說,下列範本是基本範例的修改版本,會使用儲存在 Secret Manager 中的ChatAnthropic
API 金鑰:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
處理憑證
部署代理程式時,可能需要處理不同類型的憑證:
- 應用程式預設憑證 (ADC),通常來自服務帳戶。
- OAuth,通常是使用者帳戶所致,以及
- 外部帳戶憑證的身分識別提供者 (工作負載身分聯盟)。
應用程式預設憑證
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
您可以在程式碼中以下列方式使用:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
詳情請參閱「應用程式預設憑證的運作方式」。
OAuth
使用者憑證通常是透過 OAuth 2.0 取得。
如果您有存取權杖 (例如來自 oauthlib
),可以建立 google.oauth2.credentials.Credentials
執行個體。此外,如果您取得更新權杖,也可以指定更新權杖和權杖 URI,讓憑證自動更新:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
這裡的 TOKEN_URI
、CLIENT_ID
和 CLIENT_SECRET
是根據「建立 OAuth 用戶端憑證」而來。
如果沒有存取權杖,可以使用 google_auth_oauthlib.flow
執行 OAuth 2.0 授權授予流程,取得對應的 google.oauth2.credentials.Credentials
執行個體:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
詳情請參閱 google_auth_oauthlib.flow
模組的說明文件。
識別資訊提供者
如要使用電子郵件/密碼、電話號碼、社交媒體供應商 (如 Google、Facebook 或 GitHub) 或是自訂的驗證機制來驗證使用者,您可以使用 Identity Platform、Firebase 驗證,或是任何支援 OpenID Connect (OIDC) 的身分識別提供者。
詳情請參閱「從 OIDC 識別資訊提供者存取資源」。
處理錯誤
為確保 API 錯誤以結構化 JSON 格式傳回,建議您使用 try...except
區塊在代理程式碼中實作錯誤處理機制,這可以抽象化為裝飾器。
雖然 Vertex AI Agent Engine 可以在內部處理各種狀態碼,但 Python 缺少標準化方式,可針對所有例外狀況類型,以相關聯的 HTTP 狀態碼表示錯誤。嘗試將所有可能的 Python 例外狀況對應至基礎服務中的 HTTP 狀態,會很複雜且難以維護。
更具擴充性的做法是在代理程式方法中明確擷取相關例外狀況,或使用 error_wrapper
等可重複使用的裝飾器。然後,您可以關聯適當的狀態碼 (例如,將 code
和 error
屬性新增至自訂例外狀況,或特別處理標準例外狀況),並將錯誤格式化為 JSON 字典,做為傳回值。這項作業只需要在代理程式方法中進行極少的程式碼變更,通常只要新增裝飾器即可。
以下範例說明如何在代理程式中實作錯誤處理:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
上述程式碼會產生下列輸出內容:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
後續步驟
- 評估代理程式。
- 部署代理程式。
- 排解開發代理程式的問題。
- 取得支援。