En esta página, se describe cómo usar la transmisión bidireccional con el entorno de ejecución de Vertex AI Agent Engine.
Descripción general
La transmisión bidireccional proporciona un canal de comunicación persistente y bidireccional entre tu aplicación y el agente, lo que te permite ir más allá de los patrones basados en turnos y de solicitud-respuesta. La transmisión bidireccional funciona para los casos de uso en los que tu agente necesita procesar información y responder de forma continua, como interactuar con entradas de audio o video con baja latencia.
La transmisión bidireccional con el tiempo de ejecución de Vertex AI Agent Engine admite casos de uso interactivos y en tiempo real de agentes, y el intercambio de datos para las APIs multimodales en vivo. La transmisión bidireccional es compatible con todos los frameworks, y los métodos de transmisión bidireccional personalizados están disponibles a través del registro de métodos personalizados. Puedes usar la transmisión bidireccional para interactuar con la API de Gemini Live con el Agent Development Kit (ADK) en Vertex AI Agent Engine.
La implementación de un agente remoto con métodos de consulta bidireccionales solo se admite a través del SDK de IA generativa de Google. Cuando se detectan métodos de consulta bidireccionales, el SDK de IA generativa establece automáticamente el modo de servidor del agente EXPERIMENTAL
cuando se llama a la API de REST.
Desarrolla un agente
Cuando desarrolles un agente, sigue estos pasos para implementar la transmisión bidireccional:
Registra métodos personalizados (opcional)
Define un método de consulta de transmisión bidireccional
Puedes definir un método bidi_stream_query
que tome de forma asíncrona solicitudes de transmisión como entrada y genere respuestas de transmisión como salida. Por ejemplo, la siguiente plantilla extiende la plantilla básica para transmitir solicitudes y respuestas, y se puede implementar en Agent Engine:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Ten en cuenta lo siguiente cuando uses la API de transmisión bidireccional:
asyncio.Queue
: Puedes colocar cualquier tipo de datos en esta cola de solicitudes para que espere a ser enviada a la API del modelo.Tiempo de espera máximo: El tiempo de espera máximo para la consulta de transmisión bidireccional es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera dividir la tarea en partes más pequeñas y usar la sesión o la memoria para mantener los estados.
Limita el consumo de contenido: Cuando consumes contenido de una transmisión bidireccional, es importante administrar la velocidad a la que tu agente procesa los datos entrantes. Si tu agente consume datos con demasiada lentitud, puede generar problemas como una mayor latencia o presión de memoria en el servidor. Implementa mecanismos para extraer datos de forma activa cuando tu agente esté listo para procesarlos y evita bloquear operaciones que podrían detener el consumo de contenido.
Limita la generación de contenido: Si tienes problemas de contrapresión (en los que el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la tasa de generación de contenido. Esto puede ayudar a evitar desbordamientos de búfer y garantizar una experiencia de transmisión fluida.
Prueba el método de consulta de transmisión bidireccional
Puedes probar la consulta de transmisión bidireccional de forma local llamando al método bidi_stream_query
y, luego, iterando los resultados:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
La misma conexión de consulta bidireccional puede controlar varias solicitudes y respuestas. En el siguiente ejemplo, para cada solicitud nueva de la cola, se genera un flujo de fragmentos que contiene información diferente sobre la respuesta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(Opcional) Registra métodos personalizados
Las operaciones se pueden registrar como modos de ejecución estándar (representados por una cadena vacía ""
), de transmisión (stream
) o de transmisión bidireccional (bidi_stream
).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Implementa un agente
Una vez que desarrolles tu agente como live_agent
, puedes implementarlo en Agent Engine creando una instancia de Agent Engine.
Ten en cuenta que, con el SDK de IA generativa, todas las configuraciones de implementación (paquetes adicionales y controles de recursos personalizados) se asignan como un valor de config
cuando se crea la instancia de Agent Engine.
Inicializa el cliente de IA generativa:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Implementa el agente en Agent Engine:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
Para obtener información sobre los pasos que se realizan en segundo plano durante la implementación, consulta Crea una instancia de AgentEngine.
Obtén el ID de recurso del agente:
remote_live_agent.api_resource.name
Cómo usar un agente
Si definiste una operación bidi_stream_query
cuando desarrollaste tu agente, puedes transmitir consultas de forma bidireccional al agente de manera asíncrona con el SDK de Gen AI para Python.
Puedes modificar el siguiente ejemplo con cualquier dato que reconozca tu agente y usar cualquier lógica de finalización aplicable para el flujo de entrada y el flujo de salida:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
El entorno de ejecución de Vertex AI Agent Engine transmite las respuestas como una secuencia de objetos generados de forma iterativa. Por ejemplo, un conjunto de dos respuestas en el primer turno podría verse de la siguiente manera:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Cómo usar un agente del Kit de desarrollo de agentes
Si desarrollaste tu agente con el kit de desarrollo de agentes (ADK), puedes usar la transmisión bidireccional para interactuar con la API de Gemini Live.
En el siguiente ejemplo, se crea un agente de conversación que toma preguntas de texto del usuario y recibe datos de audio de respuesta de la API de Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
¿Qué sigue?
- Obtén más información sobre la API de Gemini Live.