Nesta página, descrevemos como usar o streaming bidirecional com o ambiente de execução do Vertex AI Agent Engine.
Visão geral
O streaming bidirecional oferece um canal de comunicação persistente e bidirecional entre seu aplicativo e o agente, permitindo que você vá além dos padrões de solicitação-resposta baseados em turnos. O streaming bidirecional funciona para casos de uso em que o agente precisa processar informações e responder continuamente, como interagir com entradas de áudio ou vídeo com baixa latência.
O streaming bidirecional com o tempo de execução do Vertex AI Agent Engine oferece suporte a casos de uso de agentes interativos e em tempo real, além de troca de dados para APIs multimodais dinâmicas. O streaming bidirecional é compatível com todos os frameworks, e métodos personalizados estão disponíveis registrando métodos personalizados. É possível usar o streaming bidirecional para interagir com a API Gemini Live usando o Kit de desenvolvimento de agentes (ADK) no Vertex AI Agent Engine.
A implantação de um agente remoto com métodos de consulta bidirecional é compatível apenas com o SDK da IA generativa do Google. Quando métodos de consulta bidirecional são detectados, o SDK de IA generativa define automaticamente o modo de servidor do agente EXPERIMENTAL
ao chamar a API REST.
Desenvolver um agente
Ao desenvolver um agente, siga estas etapas para implementar o streaming bidirecional:
Registrar métodos personalizados (opcional)
Definir um método de consulta de streaming bidirecional
É possível definir um método bidi_stream_query
que recebe solicitações de stream de forma assíncrona como entrada e gera respostas de streaming. Por exemplo, o modelo a seguir estende o modelo básico para transmitir solicitações e respostas e pode ser implantado no Agent Engine:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Considere o seguinte ao usar a API de streaming bidirecional:
asyncio.Queue
: você pode colocar qualquer tipo de dados nessa fila de solicitações para aguardar o envio à API do modelo.Tempo limite máximo: o tempo limite máximo para consultas de streaming bidirecional é de 10 minutos. Se o agente precisar de mais tempo de processamento, divida a tarefa em partes menores e use sessão ou memória para manter os estados.
Limitar o consumo de conteúdo: ao consumir conteúdo de um fluxo bidirecional, é importante gerenciar a taxa em que o agente processa os dados recebidos. Se o agente consumir dados muito lentamente, isso poderá causar problemas como aumento da latência ou pressão de memória no lado do servidor. Implemente mecanismos para extrair dados ativamente quando seu agente estiver pronto para processá-los e evite bloquear operações que possam interromper o consumo de conteúdo.
Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor consegue processar), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.
Testar o método de consulta de streaming bidirecional
Teste a consulta de streaming bidirecional localmente chamando o método bidi_stream_query
e iterando os resultados:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
A mesma conexão de consulta bidirecional pode processar várias solicitações e respostas. Para cada nova solicitação da fila, o exemplo a seguir gera um fluxo de partes que contêm informações diferentes sobre a resposta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(Opcional) Registrar métodos personalizados
As operações podem ser registradas como modos de execução padrão (representados por uma string vazia ""
), streaming (stream
) ou streaming bidirecional (bidi_stream
).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Implantar um agente
Depois de desenvolver seu agente como live_agent
, crie uma instância do Agent Engine para implantá-lo.
Com o SDK de IA generativa, todas as configurações de implantação (pacotes adicionais e controles de recursos personalizados) são atribuídas como um valor de config
ao criar a instância do Agent Engine.
Inicialize o cliente de IA generativa:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Implante o agente no Agent Engine:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
Para informações sobre as etapas que acontecem em segundo plano durante a implantação, consulte Criar uma instância do AgentEngine.
Consiga o ID do recurso do agente:
remote_live_agent.api_resource.name
Usar um agente
Se você definiu uma operação bidi_stream_query
ao desenvolver seu agente, é possível consultar o agente de forma assíncrona usando o SDK de IA generativa para Python.
Você pode modificar o exemplo a seguir com qualquer dado reconhecido pelo seu agente, usando qualquer lógica de encerramento aplicável para fluxo de entrada e fluxo de saída:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
O ambiente de execução do Vertex AI Agent Engine transmite respostas como uma sequência de objetos gerados de forma iterativa. Por exemplo, um conjunto de duas respostas na primeira vez pode ser assim:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Usar um agente do Agent Development Kit
Se você desenvolveu seu agente usando o Kit de desenvolvimento de agentes (ADK, na sigla em inglês), é possível usar o streaming bidirecional para interagir com a API Gemini Live.
O exemplo a seguir cria um agente de conversa que recebe perguntas de texto do usuário e dados de áudio de resposta da API Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
A seguir
- Saiba mais sobre a API Gemini Live.