本頁說明如何搭配 Vertex AI Agent Engine Runtime 使用雙向串流。
總覽
雙向串流會在應用程式和代理程式之間建立持續的雙向通訊管道,讓您擺脫以回合為基礎的要求/回應模式。雙向串流適用於需要持續處理資訊並回應的用途,例如與低延遲音訊或視訊輸入內容互動。
Vertex AI Agent Engine Runtime 的雙向串流功能支援互動式即時虛擬服務專員用途,以及多模態即時 API 的資料交換。所有架構都支援雙向串流,您也可以透過註冊自訂方法,使用自訂雙向串流方法。您可以使用雙向串流,透過 Vertex AI Agent Engine 上的 Agent Development Kit (ADK),與 Gemini Live API 互動。
如要部署具有雙向查詢方法的遠端代理程式,只能透過 Google Gen AI SDK 進行。偵測到雙向查詢方法時,Gen AI SDK 會在呼叫 REST API 時自動設定 EXPERIMENTAL
代理程式伺服器模式。
開發代理
開發代理程式時,請按照下列步驟實作雙向串流:
註冊自訂方法 (選用)
定義雙向串流查詢方法
您可以定義 bidi_stream_query
方法,以非同步方式將串流要求做為輸入內容,並輸出串流回應。舉例來說,下列範本會擴充基本範本,以便串流要求和回應,並可部署在 Agent Engine 上:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
使用雙向串流 API 時,請注意下列事項:
asyncio.Queue
:您可以在這個要求佇列中放入任何資料類型,等待傳送至模型 API。逾時時間上限:雙向串流查詢的逾時時間上限為 10 分鐘。如果代理程式需要較長的處理時間,請考慮將工作分解成較小的區塊,並使用工作階段或記憶體來保留狀態。
節流內容消耗:從雙向串流消耗內容時,請務必管理代理程式處理傳入資料的速率。如果代理程式消耗資料的速度太慢,可能會導致延遲時間增加或伺服器端記憶體壓力等問題。實作機制,在代理程式準備好處理資料時主動提取資料,並避免封鎖可能導致內容停止使用的作業。
節流內容生成:如果遇到背壓問題 (生產者生成資料的速度比消費者處理資料的速度快),請節流內容生成速率。這有助於避免緩衝區溢位,確保串流體驗順暢。
測試雙向串流查詢方法
您可以呼叫 bidi_stream_query
方法並疊代結果,在本機測試雙向串流查詢:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
同一個雙向查詢連線可以處理多個要求和回應。針對佇列中的每項新要求,以下範例會產生包含不同回應資訊的區塊串流:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(選用) 註冊自訂方法
作業可註冊為標準 (以空字串 ""
表示)、串流 (stream
) 或雙向串流 (bidi_stream
) 執行模式。
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
部署代理程式
將代理程式開發為 live_agent
後,建立 Agent Engine 執行個體即可將代理程式部署至 Agent Engine。
請注意,使用 Gen AI SDK 時,建立 Agent Engine 執行個體時,所有部署設定 (額外套件和自訂資源控制項) 都會指派為 config
的值。
初始化 Gen AI 用戶端:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
將代理部署至 Agent Engine:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
如要瞭解部署期間在背景執行的步驟,請參閱「建立 AgentEngine 執行個體」。
取得代理程式資源 ID:
remote_live_agent.api_resource.name
使用代理程式
如果您在開發代理程式時定義了 bidi_stream_query
作業,可以使用 Gen AI SDK for Python,以非同步方式雙向串流查詢代理程式。
您可以修改下列範例,使用代理程式可辨識的任何資料,並為輸入串流和輸出串流套用任何適用的終止邏輯:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Vertex AI Agent Engine Runtime 會以疊代生成的物件序列形式,串流傳輸回覆內容。舉例來說,第一輪的兩組回覆可能如下所示:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
使用 Agent Development Kit 代理程式
如果您使用 Agent Development Kit (ADK) 開發代理程式,可以透過雙向串流與 Gemini Live API 互動。
下列範例會建立對話代理程式,接收使用者文字問題並取得 Gemini Live API 回應音訊資料:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
後續步驟
- 進一步瞭解 Gemini Live API。