カスタム エージェントを開発する

Agent Engine のエージェント テンプレートは、Python クラスとして定義されます。次の手順では、Vertex AI にデプロイ可能なエージェントをインスタンス化するためのカスタム テンプレートを作成する方法を示します。

  1. 基本的な例
  2. (省略可)レスポンスをストリーミングする
  3. (省略可)カスタム メソッドを登録する
  4. (省略可)型アノテーションを指定する
  5. (省略可)OpenTelemetry を使用したトレース
  6. (省略可)環境変数の操作
  7. (省略可)Secret Manager との統合
  8. (省略可)認証情報の処理

基本的な例

基本的な例として、次の Python クラスは、Vertex AI にデプロイ可能なエージェントをインスタンス化するテンプレートです(CLASS_NAME 変数に MyAgent などの値を指定できます)。

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

デプロイに関する考慮事項

Python クラスを作成する場合、次の 3 つのメソッドが重要になります。

  1. __init__():
    • このメソッドは、エージェント構成パラメータにのみ使用します。たとえば、このメソッドを使用して、モデル パラメータと安全性属性をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
    • コンストラクタは、エージェント エンジンにデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、__init__ メソッドではなく .set_up メソッドを使用する必要があります。
    • このメソッドは省略可能です。指定しない場合、Vertex AI はクラスのデフォルトの Python コンストラクタを使用します。
  2. set_up():
    • このメソッドを使用して、エージェントの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
    • このメソッドは省略可能です。指定しない場合、Vertex AI は、ユーザークエリを処理する前にエージェントで .set_up メソッドを呼び出す必要がないと見なします。
  3. query() / stream_query():
    • query() を使用すると、完全なレスポンスを 1 つの結果として返すことができます。
    • stream_query() を使用して、レスポンスが利用可能になり次第チャンクで返すことで、ストリーミング エクスペリエンスを実現します。ストリーミングを有効にするには、stream_query メソッドがイテレラブルなオブジェクト(ジェネレータなど)を返す必要があります。
    • エージェントとの単一レスポンスとストリーミング インタラクションの両方をサポートする場合は、両方のメソッドを実装できます。
    • このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。query メソッドと stream_query メソッドで変数引数を使用しないでください。

エージェントをローカルでインスタンス化する

エージェントのローカル インスタンスを作成するには、次のコードを使用します。

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

query メソッドをテストする

ローカル インスタンスにクエリを送信して、エージェントをテストできます。

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

レスポンスは、次のようなディクショナリ形式になります。

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

ストリーミング レスポンス

クエリのレスポンスをストリーミングするには、レスポンスを生成する stream_query という名前のメソッドを定義します。たとえば、次のテンプレートは、基本的な例を拡張してレスポンスをストリーミングし、Vertex AI にデプロイできます。

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

ストリーミング API を使用する際は、次の点にご注意ください。

  • 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。エージェントに長い処理時間が必要な場合は、タスクを小さなチャンクに分割することを検討してください。
  • モデルとチェーンのストリーミング: LangChain の Runnable インターフェースはストリーミングをサポートしているため、エージェントだけでなく、モデルとチェーンからもレスポンスをストリーミングできます。
  • LangChain の互換性: LangChain の astream_event メソッドなどの非同期メソッドは現在サポートされていません。
  • コンテンツ生成をスロットリングする: バックプレッシャーの問題(コンシューマがデータを処理するよりも速くプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成レートをスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。

stream_query メソッドをテストする

stream_query メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

このコードは、レスポンスの各チャンクが生成されると、そのチャンクを出力します。出力は次のようになります。

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

この例では、各チャンクには、エージェントが行ったアクション、交換されたメッセージ、最終的な出力など、レスポンスに関するさまざまな情報が含まれています。

カスタム メソッドの登録

デフォルトでは、メソッド querystream_query は、デプロイされたエージェントでオペレーションとして登録されます。デフォルトの動作をオーバーライドし、register_operations メソッドを使用して登録するオペレーションのセットを定義できます。オペレーションは、標準(空の文字列 "" で表される)またはストリーミング("stream")の実行モードとして登録できます。

複数のオペレーションを登録するには、register_operations という名前のメソッドを定義します。このメソッドには、エージェントがデプロイされたときにユーザーが使用できるメソッドを一覧表示します。次のサンプルコードでは、register_operations メソッドにより、デプロイされたエージェントが queryget_state を同期実行オペレーションとして、stream_queryget_state_history をレスポンスをストリーミングするオペレーションとして登録します。

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

カスタム メソッドは、query メソッドと stream_query メソッドをテストする場合と同様に、エージェントのローカル インスタンスで直接呼び出してテストできます。

型アノテーションを指定する

型アノテーションを使用すると、エージェント メソッドの想定される入力と出力の型を指定できます。エージェントがデプロイされている場合、エージェントがサポートするオペレーションの入力と出力では、JSON シリアル化可能な型のみがサポートされます。入力と出力のスキーマには、TypedDict モデルまたは Pydantic モデルを使用してアノテーションを付けることができます。

次の例では、入力に TypedDict としてアノテーションを付け、.get_stateNamedTuple)の生の出力を ._asdict() メソッドを使用してシリアル化可能な辞書に変換します。

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

OpenTelemetry を使用したトレース

OpenTelemetry をサポートする計測ライブラリでトレースを有効にするには、.set_up メソッドでインポートして初期化します。

たとえば、次のテンプレートは、トレースを Cloud Trace にエクスポートするように 基本的な例を変更したものです。

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

環境変数の操作

エージェントがデプロイされたときに機能するように環境変数を設定するには、.set_up メソッド内に設定します。次に例を示します。

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Secret Manager との統合

Secret Manager と統合するには:

  1. 次のコマンドを実行してクライアント ライブラリをインストールします。

    pip install google-cloud-secret-manager
  2. デプロイされたエージェントにロールを付与するの手順に沿って、Google Cloud コンソールから Reasoning Engine サービス エージェントに「Secret Manager シークレット アクセサー」ロール(roles/secretmanager.secretAccessor)を付与します。

  3. .set_up メソッドでクライアントをインポートして初期化し、必要に応じて対応するシークレットを取得します。たとえば、次のテンプレートは、Secret Manager に保存されている ChatAnthropic の API キーを使用するように、基本的な例を変更したものです。

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

認証情報の処理

エージェントをデプロイするときに、さまざまな種類の認証情報を処理する必要がある場合があります。

  1. アプリケーションのデフォルト認証情報(ADC)(通常はサービス アカウントから取得)
  2. ユーザー アカウントから発生する一般的な OAuth
  3. 外部アカウントの認証情報(Workload Identity 連携)のID プロバイダ

アプリケーションのデフォルト認証情報

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

コードでは次のように使用できます。

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

詳細については、アプリケーションのデフォルト認証情報の仕組みをご覧ください。

OAuth

通常、ユーザー認証情報は OAuth 2.0 を使用して取得されます。

アクセス トークン(oauthlib など)がある場合は、google.oauth2.credentials.Credentials インスタンスを作成できます。また、更新トークンを取得した場合は、更新トークンとトークン URI を指定して、認証情報を自動的に更新することもできます。

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

ここで、TOKEN_URICLIENT_IDCLIENT_SECRETOAuth クライアント認証情報を作成するに基づいています。

アクセス トークンがない場合は、google_auth_oauthlib.flow を使用して OAuth 2.0 認可付与フローを実行し、対応する google.oauth2.credentials.Credentials インスタンスを取得できます。

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

詳細については、google_auth_oauthlib.flow モジュールのドキュメントをご覧ください。

ID プロバイダ

メールとパスワード、電話番号、Google、Facebook、GitHub などのソーシャル プロバイダ、またはカスタム認証メカニズムを使用してユーザーの認証を行う場合は、Identity Platform または Firebase Authentication、または OpenID Connect(OIDC)をサポートする任意の ID プロバイダを使用できます。

詳細については、OIDC ID プロバイダからリソースにアクセスするをご覧ください。

次のステップ