使用 Vertex AI Agent Engine 執行階段進行雙向串流

本頁說明如何搭配 Vertex AI Agent Engine Runtime 使用雙向串流。

總覽

雙向串流會在應用程式和代理程式之間建立持續的雙向通訊管道,讓您擺脫以回合為基礎的要求/回應模式。雙向串流適用於需要持續處理資訊並回應的用途,例如與低延遲音訊或視訊輸入內容互動。

Vertex AI Agent Engine Runtime 的雙向串流功能支援互動式即時虛擬服務專員用途,以及多模態即時 API 的資料交換。所有架構都支援雙向串流,您也可以透過註冊自訂方法,使用自訂雙向串流方法。您可以使用雙向串流,透過 Vertex AI Agent Engine 上的 Agent Development Kit (ADK),與 Gemini Live API 互動。

如要部署具有雙向查詢方法的遠端代理程式,只能透過 Google Gen AI SDK 進行。偵測到雙向查詢方法時,Gen AI SDK 會在呼叫 REST API 時自動設定 EXPERIMENTAL 代理程式伺服器模式。

開發代理

開發代理程式時,請按照下列步驟實作雙向串流:

定義雙向串流查詢方法

您可以定義 bidi_stream_query 方法,以非同步方式將串流要求做為輸入內容,並輸出串流回應。舉例來說,下列範本會擴充基本範本,以便串流要求和回應,並可部署在 Agent Engine 上:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

使用雙向串流 API 時,請注意下列事項:

  • asyncio.Queue:您可以在這個要求佇列中放入任何資料類型,等待傳送至模型 API。

  • 逾時時間上限:雙向串流查詢的逾時時間上限為 10 分鐘。如果代理程式需要較長的處理時間,請考慮將工作分解成較小的區塊,並使用工作階段或記憶體來保留狀態。

  • 節流內容消耗:從雙向串流消耗內容時,請務必管理代理程式處理傳入資料的速率。如果代理程式消耗資料的速度太慢,可能會導致延遲時間增加或伺服器端記憶體壓力等問題。實作機制,在代理程式準備好處理資料時主動提取資料,並避免封鎖可能導致內容停止使用的作業。

  • 節流內容生成:如果遇到背壓問題 (生產者生成資料的速度比消費者處理資料的速度快),請節流內容生成速率。這有助於避免緩衝區溢位,確保串流體驗順暢。

測試雙向串流查詢方法

您可以呼叫 bidi_stream_query 方法並疊代結果,在本機測試雙向串流查詢:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

同一個雙向查詢連線可以處理多個要求和回應。針對佇列中的每項新要求,以下範例會產生包含不同回應資訊的區塊串流:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(選用) 註冊自訂方法

作業可註冊為標準 (以空字串 "" 表示)、串流 (stream) 或雙向串流 (bidi_stream) 執行模式。

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

部署代理程式

將代理程式開發為 live_agent 後,建立 Agent Engine 執行個體即可將代理程式部署至 Agent Engine。

請注意,使用 Gen AI SDK 時,建立 Agent Engine 執行個體時,所有部署設定 (額外套件和自訂資源控制項) 都會指派為 config 的值。

初始化 Gen AI 用戶端:

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

將代理部署至 Agent Engine:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

如要瞭解部署期間在背景執行的步驟,請參閱「建立 AgentEngine 執行個體」。

取得代理程式資源 ID:

remote_live_agent.api_resource.name

使用代理程式

如果您在開發代理程式時定義了 bidi_stream_query 作業,可以使用 Gen AI SDK for Python,以非同步方式雙向串流查詢代理程式。

您可以修改下列範例,使用代理程式可辨識的任何資料,並為輸入串流和輸出串流套用任何適用的終止邏輯:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Vertex AI Agent Engine Runtime 會以疊代生成的物件序列形式,串流傳輸回覆內容。舉例來說,第一輪的兩組回覆可能如下所示:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

使用 Agent Development Kit 代理程式

如果您使用 Agent Development Kit (ADK) 開發代理程式,可以透過雙向串流與 Gemini Live API 互動。

下列範例會建立對話代理程式,接收使用者文字問題並取得 Gemini Live API 回應音訊資料:

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

後續步驟