Streaming bidireccional con el tiempo de ejecución de Vertex AI Agent Engine

En esta página se describe cómo usar el streaming bidireccional con el tiempo de ejecución de Vertex AI Agent Engine.

Información general

El streaming bidireccional proporciona un canal de comunicación bidireccional persistente entre tu aplicación y el agente, lo que te permite ir más allá de los patrones de solicitud-respuesta basados en turnos. El streaming bidireccional se usa en casos en los que tu agente necesita procesar información y responder continuamente, como interactuar con entradas de audio o vídeo con baja latencia.

El streaming bidireccional con el tiempo de ejecución de Vertex AI Agent Engine admite casos prácticos de agentes interactivos y en tiempo real, así como el intercambio de datos para APIs activas multimodales. El streaming bidireccional se admite en todos los frameworks y se pueden usar métodos de streaming bidireccional personalizados registrando métodos personalizados. Puedes usar el streaming bidireccional para interactuar con la API Gemini Live mediante Agent Development Kit (ADK) en Vertex AI Agent Engine.

Solo se puede implementar un agente remoto con métodos de consulta bidireccionales a través del SDK de IA generativa de Google. Cuando se detectan métodos de consulta bidireccionales, el SDK de IA generativa define automáticamente el modo de servidor del agente EXPERIMENTAL al llamar a la API REST.

Desarrollar un agente

Durante el desarrollo de un agente, sigue estos pasos para implementar el streaming bidireccional:

Definir un método de consulta de streaming bidireccional

Puedes definir un método bidi_stream_query que tome de forma asíncrona solicitudes de streaming como entrada y genere respuestas de streaming. Por ejemplo, la siguiente plantilla amplía la plantilla básica para transmitir solicitudes y respuestas, y se puede implementar en Agent Engine:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Cuando utilices la API de streaming bidireccional, ten en cuenta lo siguiente:

  • asyncio.Queue: puedes poner cualquier tipo de datos en esta cola de solicitudes para que espere a enviarse a la API del modelo.

  • Tiempo de espera máximo: el tiempo de espera máximo para las consultas de streaming bidireccional es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera la posibilidad de dividir la tarea en partes más pequeñas y usar sesiones o memoria para mantener los estados.

  • Limitar el consumo de contenido: cuando se consume contenido de un flujo bidireccional, es importante gestionar la velocidad a la que tu agente procesa los datos entrantes. Si tu agente consume datos demasiado lentamente, puede provocar problemas como un aumento de la latencia o de la presión de memoria en el lado del servidor. Implementa mecanismos para extraer datos de forma activa cuando tu agente esté listo para procesarlos y evita las operaciones de bloqueo que puedan detener el consumo de contenido.

  • Limitar la generación de contenido: si tienes problemas de contrapresión (cuando el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la velocidad de generación de contenido. Esto puede ayudar a evitar desbordamientos de búfer y garantizar una experiencia de streaming fluida.

Probar el método de consulta de streaming bidireccional

Puedes probar la consulta de streaming bidireccional de forma local llamando al método bidi_stream_query e iterando los resultados:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

La misma conexión de consulta bidireccional puede gestionar varias solicitudes y respuestas. En cada nueva solicitud de la cola, el siguiente ejemplo genera un flujo de fragmentos que contiene información diferente sobre la respuesta:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(Opcional) Registrar métodos personalizados

Las operaciones se pueden registrar como modos de ejecución estándar (representado por una cadena vacía ""), de streaming (stream) o de streaming bidireccional (bidi_stream).

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

Desplegar un agente

Una vez que hayas desarrollado tu agente como live_agent, puedes implementarlo en Agent Engine creando una instancia de Agent Engine.

Tenga en cuenta que, con el SDK de IA generativa, todas las configuraciones de implementación (paquetes adicionales y controles de recursos personalizados) se asignan como valor de config al crear la instancia de Agent Engine.

Inicializa el cliente de IA generativa:

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

Implementa el agente en Agent Engine:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

Para obtener información sobre los pasos que se llevan a cabo en segundo plano durante la implementación, consulta Crear una instancia de AgentEngine.

Obtén el ID de recurso del agente:

remote_live_agent.api_resource.name

Usar un agente

Si has definido una operación bidi_stream_query al desarrollar tu agente, puedes transmitir bidireccionalmente consultas al agente de forma asíncrona con el SDK de Gen AI para Python.

Puede modificar el siguiente ejemplo con cualquier dato que reconozca su agente, utilizando la lógica de finalización aplicable para el flujo de entrada y el flujo de salida:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

El tiempo de ejecución de Vertex AI Agent Engine transmite las respuestas como una secuencia de objetos generados de forma iterativa. Por ejemplo, un conjunto de dos respuestas en la primera conversación podría tener el siguiente aspecto:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Usar un agente de Agent Development Kit

Si has desarrollado tu agente con Agent Development Kit (ADK), puedes usar el streaming bidireccional para interactuar con la API Gemini Live.

En el siguiente ejemplo se crea un agente de conversación que recibe preguntas de texto de los usuarios y datos de audio de la respuesta de la API Gemini Live:

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

Siguientes pasos