Agent Engine のエージェント テンプレートは、Python クラスとして定義されます。次の手順では、Vertex AI にデプロイ可能なエージェントをインスタンス化するためのカスタム テンプレートを作成する方法を示します。
- 基本的な例
- (省略可)レスポンスをストリーミングする
- (省略可)カスタム メソッドを登録する
- (省略可)型アノテーションを指定する
- (省略可)OpenTelemetry を使用したトレース
- (省略可)環境変数の操作
- (省略可)Secret Manager との統合
- (省略可)認証情報の処理
基本的な例
基本的な例として、次の Python クラスは、Vertex AI にデプロイ可能なエージェントをインスタンス化するテンプレートです(CLASS_NAME
変数に MyAgent
などの値を指定できます)。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
デプロイに関する考慮事項
Python クラスを作成する場合、次の 3 つのメソッドが重要になります。
__init__()
:- このメソッドは、エージェント構成パラメータにのみ使用します。たとえば、このメソッドを使用して、モデル パラメータと安全性属性をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
- コンストラクタは、エージェント エンジンにデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、
__init__
メソッドではなく.set_up
メソッドを使用する必要があります。 - このメソッドは省略可能です。指定しない場合、Vertex AI はクラスのデフォルトの Python コンストラクタを使用します。
set_up()
:- このメソッドを使用して、エージェントの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
- このメソッドは省略可能です。指定しない場合、Vertex AI は、ユーザークエリを処理する前にエージェントで
.set_up
メソッドを呼び出す必要がないと見なします。
query()
/stream_query()
:query()
を使用すると、完全なレスポンスを 1 つの結果として返すことができます。stream_query()
を使用して、レスポンスが利用可能になり次第チャンクで返すことで、ストリーミング エクスペリエンスを実現します。ストリーミングを有効にするには、stream_query
メソッドがイテレラブルなオブジェクト(ジェネレータなど)を返す必要があります。- エージェントとの単一レスポンスとストリーミング インタラクションの両方をサポートする場合は、両方のメソッドを実装できます。
- このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。
query
メソッドとstream_query
メソッドで変数引数を使用しないでください。
エージェントをローカルでインスタンス化する
エージェントのローカル インスタンスを作成するには、次のコードを使用します。
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
query
メソッドをテストする
ローカル インスタンスにクエリを送信して、エージェントをテストできます。
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
ストリーミング レスポンス
クエリのレスポンスをストリーミングするには、レスポンスを生成する stream_query
という名前のメソッドを定義します。たとえば、次のテンプレートは、基本的な例を拡張してレスポンスをストリーミングし、Vertex AI にデプロイできます。
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
ストリーミング API を使用する際は、次の点にご注意ください。
- 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。エージェントに長い処理時間が必要な場合は、タスクを小さなチャンクに分割することを検討してください。
- モデルとチェーンのストリーミング: LangChain の Runnable インターフェースはストリーミングをサポートしているため、エージェントだけでなく、モデルとチェーンからもレスポンスをストリーミングできます。
- LangChain の互換性: LangChain の
astream_event
メソッドなどの非同期メソッドは現在サポートされていません。 - コンテンツ生成をスロットリングする: バックプレッシャーの問題(コンシューマがデータを処理するよりも速くプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成レートをスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。
stream_query
メソッドをテストする
stream_query
メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されると、そのチャンクを出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
この例では、各チャンクには、エージェントが行ったアクション、交換されたメッセージ、最終的な出力など、レスポンスに関するさまざまな情報が含まれています。
カスタム メソッドの登録
デフォルトでは、メソッド query
と stream_query
は、デプロイされたエージェントでオペレーションとして登録されます。デフォルトの動作をオーバーライドし、register_operations
メソッドを使用して登録するオペレーションのセットを定義できます。オペレーションは、標準(空の文字列 ""
で表される)またはストリーミング("stream"
)の実行モードとして登録できます。
複数のオペレーションを登録するには、register_operations
という名前のメソッドを定義します。このメソッドには、エージェントがデプロイされたときにユーザーが使用できるメソッドを一覧表示します。次のサンプルコードでは、register_operations
メソッドにより、デプロイされたエージェントが query
と get_state
を同期実行オペレーションとして、stream_query
と get_state_history
をレスポンスをストリーミングするオペレーションとして登録します。
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
カスタム メソッドは、query
メソッドと stream_query
メソッドをテストする場合と同様に、エージェントのローカル インスタンスで直接呼び出してテストできます。
型アノテーションを指定する
型アノテーションを使用すると、エージェント メソッドの想定される入力と出力の型を指定できます。エージェントがデプロイされている場合、エージェントがサポートするオペレーションの入力と出力では、JSON シリアル化可能な型のみがサポートされます。入力と出力のスキーマには、TypedDict
モデルまたは Pydantic モデルを使用してアノテーションを付けることができます。
次の例では、入力に TypedDict
としてアノテーションを付け、.get_state
(NamedTuple
)の生の出力を ._asdict()
メソッドを使用してシリアル化可能な辞書に変換します。
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
OpenTelemetry を使用したトレース
OpenTelemetry をサポートする計測ライブラリでトレースを有効にするには、.set_up
メソッドでインポートして初期化します。
たとえば、次のテンプレートは、トレースを Cloud Trace にエクスポートするように 基本的な例を変更したものです。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
環境変数の操作
エージェントがデプロイされたときに機能するように環境変数を設定するには、.set_up
メソッド内に設定します。次に例を示します。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Secret Manager との統合
Secret Manager と統合するには:
次のコマンドを実行してクライアント ライブラリをインストールします。
pip install google-cloud-secret-manager
デプロイされたエージェントにロールを付与するの手順に沿って、Google Cloud コンソールから Reasoning Engine サービス エージェントに「Secret Manager シークレット アクセサー」ロール(
roles/secretmanager.secretAccessor
)を付与します。.set_up
メソッドでクライアントをインポートして初期化し、必要に応じて対応するシークレットを取得します。たとえば、次のテンプレートは、Secret Manager に保存されているChatAnthropic
の API キーを使用するように、基本的な例を変更したものです。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
認証情報の処理
エージェントをデプロイするときに、さまざまな種類の認証情報を処理する必要がある場合があります。
- アプリケーションのデフォルト認証情報(ADC)(通常はサービス アカウントから取得)
- ユーザー アカウントから発生する一般的な OAuth
- 外部アカウントの認証情報(Workload Identity 連携)のID プロバイダ。
アプリケーションのデフォルト認証情報
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
コードでは次のように使用できます。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
詳細については、アプリケーションのデフォルト認証情報の仕組みをご覧ください。
OAuth
通常、ユーザー認証情報は OAuth 2.0 を使用して取得されます。
アクセス トークン(oauthlib
など)がある場合は、google.oauth2.credentials.Credentials
インスタンスを作成できます。また、更新トークンを取得した場合は、更新トークンとトークン URI を指定して、認証情報を自動的に更新することもできます。
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
ここで、TOKEN_URI
、CLIENT_ID
、CLIENT_SECRET
は OAuth クライアント認証情報を作成するに基づいています。
アクセス トークンがない場合は、google_auth_oauthlib.flow
を使用して OAuth 2.0 認可付与フローを実行し、対応する google.oauth2.credentials.Credentials
インスタンスを取得できます。
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
詳細については、google_auth_oauthlib.flow
モジュールのドキュメントをご覧ください。
ID プロバイダ
メールとパスワード、電話番号、Google、Facebook、GitHub などのソーシャル プロバイダ、またはカスタム認証メカニズムを使用してユーザーの認証を行う場合は、Identity Platform または Firebase Authentication、または OpenID Connect(OIDC)をサポートする任意の ID プロバイダを使用できます。
詳細については、OIDC ID プロバイダからリソースにアクセスするをご覧ください。