使用 Vertex AI Agent Engine 运行时进行双向流式传输

本页面介绍了如何将双向流式传输与 Vertex AI Agent Engine 运行时搭配使用。

概览

双向流式传输可在应用与代理之间提供持久的双向通信渠道,让您摆脱基于回合的请求-响应模式。双向流式传输适用于代理需要持续处理信息并做出响应的应用场景,例如以低延迟与音频或视频输入进行互动。

借助 Vertex AI Agent Engine 运行时,双向流式传输支持交互式实时代理使用情形和多模态实时 API 的数据交换。所有框架都支持双向流式传输,并且可以通过注册自定义方法来使用自定义双向流式传输方法。您可以使用双向流式传输,通过 Vertex AI Agent Engine 上的智能体开发套件 (ADK) 与 Gemini Live API 进行交互。

仅支持通过 Google Gen AI SDK 部署具有双向查询方法的远程代理。检测到双向查询方法时,Gen AI SDK 会在调用 REST API 时自动设置 EXPERIMENTAL 代理服务器模式。

开发代理

在开发代理时,请按以下步骤实现双向流式传输:

定义双向流式传输查询方法

您可以定义一个 bidi_stream_query 方法,该方法以异步方式接受流式请求作为输入,并输出流式响应。例如,以下模板扩展了基本模板,以实现请求和响应的流式传输,并且可在 Agent Engine 上部署:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

使用双向流式传输 API 时,请注意以下几点:

  • asyncio.Queue:您可以将任何数据类型放入此请求队列中,等待发送到模型 API。

  • 最长超时时间:双向流式传输查询的最长超时时间为 10 分钟。如果您的代理需要更长的处理时间,请考虑将任务分解为更小的块,并使用会话或内存来保持状态。

  • 限制内容消耗:从双向流中消耗内容时,请务必管理代理处理传入数据的速率。如果代理消耗数据的速度过慢,可能会导致服务器端出现延迟时间增加或内存压力过大等问题。实现相关机制,以便在代理准备好处理数据时主动拉取数据,并避免可能会停止消耗内容的阻塞操作。

  • 限制内容生成:如果您遇到背压问题(即提供方生成数据的速度快于使用方处理数据的速度),则应限制内容生成速率。这有助于防止缓冲区溢出,并确保流畅的流式传输体验。

测试双向流式传输查询方法

您可以通过调用 bidi_stream_query 方法并迭代结果来在本地测试双向流式传输查询:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

同一双向查询连接可以处理多个请求和响应。对于队列中的每个新请求,以下示例会生成一个包含有关响应的不同信息的数据块流:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(可选)注册自定义方法

操作可以注册为标准(以空字符串 "" 表示)、流式传输 (stream) 或双向流式传输 (bidi_stream) 执行模式。

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

部署代理

将代理开发为 live_agent 后,您可以通过创建 Agent Engine 实例将代理部署到 Agent Engine。

请注意,使用 Gen AI SDK 时,所有部署配置(其他软件包和自定义资源控制)在创建Agent Engine实例时都会作为 config 的值进行分配。

初始化生成式 AI 客户端:

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

将代理部署到 Agent Engine:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

如需了解部署期间在后台发生的步骤,请参阅创建 AgentEngine 实例

获取智能体资源 ID:

remote_live_agent.api_resource.name

使用代理

如果您在开发代理时定义了 bidi_stream_query 操作,则可以使用 Python 版 Gen AI SDK 以异步方式对代理进行双向流式传输查询。

您可以根据代理可识别的任何数据修改以下示例,并使用适用于输入流和输出流的任何终止逻辑:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Vertex AI Agent Engine 运行时会以迭代生成的对象序列的形式流式传输响应。例如,第一轮中的一组两个响应可能如下所示:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

使用智能体开发套件智能体

如果您使用智能体开发套件 (ADK) 开发了智能体,则可以使用双向流式传输与 Gemini Live API 进行交互。

以下示例创建了一个对话代理,该代理接受用户提出的文本问题,并接收 Gemini Live API 响应音频数据:

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

后续步骤