本页面介绍了如何将双向流式传输与 Vertex AI Agent Engine 运行时搭配使用。
概览
双向流式传输可在应用与代理之间提供持久的双向通信渠道,让您摆脱基于回合的请求-响应模式。双向流式传输适用于代理需要持续处理信息并做出响应的应用场景,例如以低延迟与音频或视频输入进行互动。
借助 Vertex AI Agent Engine 运行时,双向流式传输支持交互式实时代理使用情形和多模态实时 API 的数据交换。所有框架都支持双向流式传输,并且可以通过注册自定义方法来使用自定义双向流式传输方法。您可以使用双向流式传输,通过 Vertex AI Agent Engine 上的智能体开发套件 (ADK) 与 Gemini Live API 进行交互。
仅支持通过 Google Gen AI SDK 部署具有双向查询方法的远程代理。检测到双向查询方法时,Gen AI SDK 会在调用 REST API 时自动设置 EXPERIMENTAL
代理服务器模式。
开发代理
在开发代理时,请按以下步骤实现双向流式传输:
定义双向流式传输查询方法
您可以定义一个 bidi_stream_query
方法,该方法以异步方式接受流式请求作为输入,并输出流式响应。例如,以下模板扩展了基本模板,以实现请求和响应的流式传输,并且可在 Agent Engine 上部署:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
使用双向流式传输 API 时,请注意以下几点:
asyncio.Queue
:您可以将任何数据类型放入此请求队列中,等待发送到模型 API。最长超时时间:双向流式传输查询的最长超时时间为 10 分钟。如果您的代理需要更长的处理时间,请考虑将任务分解为更小的块,并使用会话或内存来保持状态。
限制内容消耗:从双向流中消耗内容时,请务必管理代理处理传入数据的速率。如果代理消耗数据的速度过慢,可能会导致服务器端出现延迟时间增加或内存压力过大等问题。实现相关机制,以便在代理准备好处理数据时主动拉取数据,并避免可能会停止消耗内容的阻塞操作。
限制内容生成:如果您遇到背压问题(即提供方生成数据的速度快于使用方处理数据的速度),则应限制内容生成速率。这有助于防止缓冲区溢出,并确保流畅的流式传输体验。
测试双向流式传输查询方法
您可以通过调用 bidi_stream_query
方法并迭代结果来在本地测试双向流式传输查询:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
同一双向查询连接可以处理多个请求和响应。对于队列中的每个新请求,以下示例会生成一个包含有关响应的不同信息的数据块流:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(可选)注册自定义方法
操作可以注册为标准(以空字符串 ""
表示)、流式传输 (stream
) 或双向流式传输 (bidi_stream
) 执行模式。
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
部署代理
将代理开发为 live_agent
后,您可以通过创建 Agent Engine 实例将代理部署到 Agent Engine。
请注意,使用 Gen AI SDK 时,所有部署配置(其他软件包和自定义资源控制)在创建Agent Engine实例时都会作为 config
的值进行分配。
初始化生成式 AI 客户端:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
将代理部署到 Agent Engine:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
如需了解部署期间在后台发生的步骤,请参阅创建 AgentEngine 实例。
获取智能体资源 ID:
remote_live_agent.api_resource.name
使用代理
如果您在开发代理时定义了 bidi_stream_query
操作,则可以使用 Python 版 Gen AI SDK 以异步方式对代理进行双向流式传输查询。
您可以根据代理可识别的任何数据修改以下示例,并使用适用于输入流和输出流的任何终止逻辑:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Vertex AI Agent Engine 运行时会以迭代生成的对象序列的形式流式传输响应。例如,第一轮中的一组两个响应可能如下所示:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
使用智能体开发套件智能体
如果您使用智能体开发套件 (ADK) 开发了智能体,则可以使用双向流式传输与 Gemini Live API 进行交互。
以下示例创建了一个对话代理,该代理接受用户提出的文本问题,并接收 Gemini Live API 响应音频数据:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
后续步骤
- 详细了解 Gemini Live API。