Vertex AI Agent Engine のエージェント テンプレートは、Python クラスとして定義されます。次の手順では、Vertex AI にデプロイ可能なエージェントをインスタンス化するためのカスタム テンプレートを作成する方法について説明します。
- 基本的な例
- (省略可)レスポンスをストリーミングする
- (省略可)カスタム メソッドを登録する
- (省略可)型アノテーションを指定する
- (省略可)トレースを Cloud Trace に送信する
- (省略可)環境変数の操作
- (省略可)Secret Manager との統合
- (省略可)認証情報の処理
- (省略可)エラー処理
基本的な例
基本的な例として、次の Python クラスは、Vertex AI にデプロイ可能なエージェントをインスタンス化するためのテンプレートです(CLASS_NAME
変数に MyAgent
などの値を指定できます)。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
デプロイに関する考慮事項
Python クラスを作成する場合、次の 3 つのメソッドが重要になります。
__init__()
:- このメソッドは、エージェント構成パラメータにのみ使用します。たとえば、このメソッドを使用して、モデル パラメータと安全性属性をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
- コンストラクタは、Vertex AI Agent Engine にデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、
__init__
メソッドではなく.set_up
メソッドを使用する必要があります。 - このメソッドは省略可能です。指定しない場合、Vertex AI はクラスのデフォルトの Python コンストラクタを使用します。
set_up()
:- このメソッドを使用して、エージェントの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
- このメソッドは省略可能です。指定しない場合、Vertex AI は、ユーザークエリを処理する前にエージェントで
.set_up
メソッドを呼び出す必要がないと見なします。
query()
/stream_query()
:query()
を使用して、完全なレスポンスを単一の結果として返します。stream_query()
を使用すると、レスポンスが利用可能になったときにチャンク単位で返され、ストリーミング エクスペリエンスが可能になります。stream_query
メソッドは、ストリーミングを有効にするために、反復可能なオブジェクト(ジェネレータなど)を返す必要があります。- エージェントとの単一レスポンスとストリーミングの両方のインタラクションをサポートする場合は、両方のメソッドを実装できます。
- このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。
query
メソッドとstream_query
メソッドで変数引数を使用しないでください。
エージェントをローカルでインスタンス化する
次のコードを使用して、エージェントのローカル インスタンスを作成できます。
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
query
メソッドをテストする
ローカル インスタンスにクエリを送信して、エージェントをテストできます。
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
非同期クエリ
クエリに非同期で応答するには、Python コルーチンを返すメソッド(async_query
など)を定義します。たとえば、次のテンプレートは、非同期で応答するように基本例を拡張したもので、Vertex AI にデプロイできます。
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
async_query
メソッドをテストする
async_query
メソッドを呼び出すことで、エージェントをローカルでテストできます。例:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
ストリーミング レスポンス
クエリに対するレスポンスをストリーミングするには、レスポンスを生成する stream_query
という名前のメソッドを定義します。たとえば、次のテンプレートは、レスポンスをストリーミングするように基本例を拡張したもので、Vertex AI にデプロイできます。
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
ストリーミング API を使用する際は、次の点にご留意ください。
- 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。エージェントで処理時間が長くなる場合は、タスクを小さなチャンクに分割することを検討してください。
- ストリーミング モデルとチェーン: LangChain の Runnable インターフェースはストリーミングをサポートしているため、エージェントだけでなく、モデルやチェーンからもレスポンスをストリーミングできます。
- LangChain の互換性: 現在、LangChain の
astream_event
メソッドなどの非同期メソッドはサポートされていません。 - コンテンツ生成をスロットリングする: バックプレッシャーの問題(プロデューサーがコンシューマーが処理できるよりも速い速度でデータを生成する)が発生した場合は、コンテンツ生成率をスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。
stream_query
メソッドをテストする
stream_query
メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されるたびに出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
この例では、各チャンクには、エージェントが実行したアクション、交換されたメッセージ、最終出力など、レスポンスに関するさまざまな情報が含まれています。
レスポンスを非同期でストリーミングする
レスポンスを非同期でストリーミングするには、非同期ジェネレータを返すメソッド(async_stream_query
など)を定義します。たとえば、次のテンプレートは、非同期でレスポンスをストリーミングするように基本例を拡張したもので、Vertex AI にデプロイできます。
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
async_stream_query
メソッドをテストする
ストリーミング クエリのテストコードと同様に、async_stream_query
メソッドを呼び出して結果を反復処理することで、エージェントをローカルでテストできます。次に例を示します。
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されるたびに出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
カスタム メソッドの登録
デフォルトでは、メソッド query
と stream_query
は、デプロイされたエージェントのオペレーションとして登録されます。デフォルトの動作をオーバーライドし、登録するオペレーションのセットを定義するには、register_operations
メソッドを使用します。オペレーションは、標準(空の文字列 ""
で表される)またはストリーミング("stream"
)の実行モードとして登録できます。
複数のオペレーションを登録するには、エージェントのデプロイ時にユーザーが使用できるメソッドを一覧表示する register_operations
という名前のメソッドを定義します。次のコード例では、register_operations
メソッドにより、デプロイされたエージェントが query
と get_state
を同期的に実行されるオペレーションとして登録し、stream_query
と get_state_history
をレスポンスをストリーミングするオペレーションとして登録します。
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
query
メソッドと stream_query
メソッドをテストする場合と同様に、エージェントのローカル インスタンスでカスタム メソッドを直接呼び出してテストできます。
型アノテーションの提供
型アノテーションを使用して、エージェント メソッドの想定される入力型と出力型を指定できます。エージェントがデプロイされると、エージェントがサポートするオペレーションの入力と出力で JSON シリアル化可能な型のみがサポートされます。入力と出力のスキーマには、TypedDict
または Pydantic モデルを使用してアノテーションを付けることができます。
次の例では、入力を TypedDict
としてアノテーションし、.get_state
(NamedTuple
)からの生の出力を ._asdict()
メソッドを使用してシリアル化可能な辞書に変換します。
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Cloud Trace へのトレースの送信
OpenTelemetry をサポートするインストルメンテーション ライブラリを使用してトレースを Cloud Trace に送信するには、.set_up
メソッドでライブラリをインポートして初期化します。一般的なエージェント フレームワークでは、Open Telemetry Google Cloud インテグレーションと、OpenInference や OpenLLMetry などの計測化フレームワークを組み合わせて使用できる場合があります。
たとえば、次のテンプレートは、トレースを Cloud Trace にエクスポートするように 基本的な例を変更したものです。
OpenInference
まず、pip
を使用して必要なパッケージをインストールします。
pip install openinference-instrumentation-langchain==0.1.34
次に、インストルメンターをインポートして初期化します。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
まず、pip
を使用して必要なパッケージをインストールします。
pip install opentelemetry-instrumentation-langchain==0.38.10
次に、インストルメンターをインポートして初期化します。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
環境変数の操作
環境変数を設定するには、開発中に os.environ
で使用できることを確認し、エージェントをデプロイするときに環境変数を定義するの手順に沿って操作します。
Secret Manager との統合
Secret Manager と統合するには:
次のコマンドを実行して、クライアント ライブラリをインストールします。
pip install google-cloud-secret-manager
デプロイされたエージェントのロールを付与するの手順に沿って、 Google Cloud コンソールで サービス アカウントに「Secret Manager のシークレット アクセサー」ロール(
roles/secretmanager.secretAccessor
)を付与します。.set_up
メソッドでクライアントをインポートして初期化し、必要に応じて対応するシークレットを取得します。たとえば、次のテンプレートは、Secret Manager に保存されたChatAnthropic
の API キーを使用するように 基本例を変更したものです。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
認証情報の処理
エージェントをデプロイするときに、さまざまなタイプの認証情報を処理する必要がある場合があります。
- サービス アカウントから発生するアプリケーションのデフォルト認証情報(ADC)。
- ユーザー アカウントから発生する OAuth
- 外部アカウント(Workload Identity 連携)の認証情報のID プロバイダ。
アプリケーションのデフォルト認証情報
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
コードでは次のように使用できます。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
詳細については、アプリケーションのデフォルト認証情報の仕組みをご覧ください。
OAuth
ユーザー認証情報は通常、OAuth 2.0 を使用して取得されます。
アクセス トークン(oauthlib
など)がある場合は、google.oauth2.credentials.Credentials
インスタンスを作成できます。また、更新トークンを取得した場合は、更新トークンとトークン URI を指定して、認証情報を自動的に更新することもできます。
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
ここで、TOKEN_URI
、CLIENT_ID
、CLIENT_SECRET
は、OAuth クライアント認証情報を作成するに基づいています。
アクセス トークンがない場合は、google_auth_oauthlib.flow
を使用して OAuth 2.0 認可付与フローを実行し、対応する google.oauth2.credentials.Credentials
インスタンスを取得できます。
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
詳細については、google_auth_oauthlib.flow
モジュールのドキュメントをご覧ください。
ID プロバイダ
メールアドレスとパスワード、電話番号、Google、Facebook、GitHub などのソーシャル プロバイダ、カスタム認証メカニズムを使用してユーザーの認証を行う場合は、Identity Platform、Firebase Authentication、または OpenID Connect(OIDC)をサポートする任意の ID プロバイダを使用できます。
詳細については、OIDC ID プロバイダからリソースにアクセスするをご覧ください。
エラー処理
API エラーが構造化された JSON 形式で返されるようにするには、try...except
ブロックを使用してエージェント コード内でエラー処理を実装することをおすすめします。このブロックはデコレータに抽象化できます。
Vertex AI Agent Engine はさまざまなステータス コードを内部で処理できますが、Python には、すべての例外タイプで関連する HTTP ステータス コードを使用してエラーを表す標準化された方法がありません。基盤となるサービス内のすべての可能な Python 例外を HTTP ステータスにマッピングしようとすると、複雑になり、保守が困難になります。
よりスケーラブルなアプローチは、エージェント メソッド内で関連する例外を明示的にキャッチするか、error_wrapper
などの再利用可能なデコレータを使用することです。次に、適切なステータス コードを関連付け(たとえば、カスタム例外に code
属性と error
属性を追加するか、標準例外を明示的に処理するなど)、エラーを JSON 辞書として戻り値の形式にします。これにより、エージェント メソッド自体内のコード変更は最小限に抑えられます。多くの場合、デコレータを追加するだけで済みます。
エージェントでエラー処理を実装する方法の例を次に示します。
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
上記のコードを実行すると、次の出力が得られます。
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}