Vertex AI Agent Engine ランタイムでの双方向ストリーミング

このページでは、Vertex AI Agent Engine Runtime で双方向ストリーミングを使用する方法について説明します。

概要

双方向ストリーミングは、アプリケーションとエージェント間の永続的な双方向通信チャネルを提供し、ターンベースのリクエスト / レスポンス パターンを超えた通信を可能にします。双方向ストリーミングは、エージェントが情報を処理して継続的に応答する必要があるユースケース(低レイテンシで音声入力や動画入力とやり取りするなど)に適しています。

Vertex AI Agent Engine Runtime の双方向ストリーミングは、マルチモーダル ライブ API のインタラクティブなリアルタイム エージェントのユースケースとデータ交換をサポートしています。双方向ストリーミングはすべてのフレームワークでサポートされており、カスタム双方向ストリーミング メソッドはカスタム メソッドを登録することで使用できます。双方向ストリーミングを使用すると、Vertex AI Agent Engine の Agent Development Kit(ADK)を使用して Gemini Live API を操作できます。

双方向クエリメソッドを使用したリモート エージェントのデプロイは、Google 生成 AI SDK でのみサポートされています。双方向クエリメソッドが検出されると、生成 AI SDK は REST API を呼び出すときに EXPERIMENTAL エージェント サーバーモードを自動的に設定します。

エージェントを開発する

エージェントの開発中に双方向ストリーミングを実装する手順は次のとおりです。

双方向ストリーミング クエリメソッドを定義する

ストリーム リクエストを非同期で入力として受け取り、ストリーミング レスポンスを出力する bidi_stream_query メソッドを定義できます。たとえば、次のテンプレートは、リクエストとレスポンスをストリーミングするように基本テンプレートを拡張したもので、Agent Engine にデプロイできます。

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

双方向ストリーミング API を使用する場合は、次の点に注意してください。

  • asyncio.Queue: このリクエスト キューには、モデル API に送信されるまで待機する任意のデータ型を配置できます。

  • 最大タイムアウト: 双方向ストリーミング クエリの最大タイムアウトは 10 分です。エージェントで処理時間が長くなる場合は、タスクを小さなチャンクに分割し、セッションまたはメモリを使用して状態を保持することを検討してください。

  • コンテンツ消費をスロットリングする: 双方向ストリームからコンテンツを消費する場合は、エージェントが受信データを処理するレートを管理することが重要です。エージェントによるデータの消費が遅すぎると、レイテンシの増加やサーバー側のメモリ負荷の増加などの問題が発生する可能性があります。エージェントがデータを処理する準備ができたら、データを積極的に pull するメカニズムを実装し、コンテンツの消費を停止させる可能性のあるオペレーションのブロックを回避します。

  • コンテンツ生成をスロットリングする: バックプレッシャーの問題(消費者が処理できるよりも速い速度でプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成率をスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。

双方向ストリーミング クエリメソッドをテストする

bidi_stream_query メソッドを呼び出して結果を反復処理することで、双方向ストリーミング クエリをローカルでテストできます。

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

同じ双方向クエリ接続で複数のリクエストとレスポンスを処理できます。次の例では、キューからの新しいリクエストごとに、レスポンスに関するさまざまな情報を含むチャンクのストリームを生成します。

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(省略可)カスタム メソッドを登録する

オペレーションは、標準(空の文字列 "" で表される)、ストリーミング(stream)、双方向ストリーミング(bidi_stream)のいずれかの実行モードとして登録できます。

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

エージェントをデプロイする

エージェントを live_agent として開発したら、Agent Engine インスタンスを作成してエージェントを Agent Engine にデプロイできます。

生成 AI SDK では、Agent Engine インスタンスの作成時に、すべてのデプロイ構成(追加パッケージとカスタマイズされたリソース制御)が config の値として割り当てられます。

生成 AI クライアントを初期化します。

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

エージェントを Agent Engine にデプロイします。

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

デプロイ中にバックグラウンドで実行される手順については、AgentEngine インスタンスを作成するをご覧ください。

エージェントのリソース ID を取得します。

remote_live_agent.api_resource.name

エージェントを使用する

エージェントの開発時に bidi_stream_query オペレーションを定義した場合は、生成 AI SDK for Python を使用してエージェントに非同期で双方向ストリーミング クエリを実行できます。

次の例は、エージェントが認識できる任意のデータで変更できます。入力ストリームと出力ストリームには、適用可能な終了ロジックを使用します。

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Vertex AI Agent Engine Runtime は、反復的に生成されたオブジェクトのシーケンスとしてレスポンスをストリーミングします。たとえば、最初のターンの 2 つのレスポンスのセットは次のようになります。

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Agent Development Kit エージェントを使用する

Agent Development Kit(ADK)を使用してエージェントを開発した場合は、双方向ストリーミングを使用して Gemini Live API とやり取りできます。

次の例では、ユーザーのテキストによる質問を受け取り、Gemini Live API レスポンスの音声データを受け取る会話エージェントを作成します。

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

次のステップ