Streaming bidirezionale con il runtime di Vertex AI Agent Engine

Questa pagina descrive come utilizzare lo streaming bidirezionale con Vertex AI Agent Engine Runtime.

Panoramica

Lo streaming bidirezionale fornisce un canale di comunicazione bidirezionale persistente tra l'applicazione e l'agente, consentendoti di andare oltre i pattern di richiesta-risposta a turni. Lo streaming bidirezionale è utile per i casi d'uso in cui l'agente deve elaborare le informazioni e rispondere continuamente, ad esempio interagire con input audio o video a bassa latenza.

Lo streaming bidirezionale con Vertex AI Agent Engine Runtime supporta casi d'uso di agent interattivi in tempo reale e lo scambio di dati per API live multimodali. Lo streaming bidirezionale è supportato per tutti i framework e i metodi di streaming bidirezionale personalizzati sono disponibili tramite la registrazione di metodi personalizzati. Puoi utilizzare lo streaming bidirezionale per interagire con l'API Gemini Live utilizzando l'Agent Development Kit (ADK) su Vertex AI Agent Engine.

Il deployment di un agente remoto con metodi di query bidirezionali è supportato solo tramite Google Gen AI SDK. Quando vengono rilevati metodi di query bidirezionali, l'SDK Gen AI imposta automaticamente la modalità server agent EXPERIMENTAL quando chiama l'API REST.

Sviluppare un agente

Durante lo sviluppo di un agente, segui questi passaggi per implementare lo streaming bidirezionale:

Definisci un metodo di query di streaming bidirezionale

Puoi definire un metodo bidi_stream_query che accetta in modo asincrono le richieste di stream come input e restituisce risposte di streaming. Ad esempio, il seguente modello estende il modello di base per trasmettere in streaming richieste e risposte ed è implementabile su Agent Engine:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Quando utilizzi l'API di streaming bidirezionale, tieni presente quanto segue:

  • asyncio.Queue: puoi inserire qualsiasi tipo di dati in questa coda di richieste in attesa di essere inviati all'API del modello.

  • Timeout massimo: il timeout massimo per la query di streaming bidirezionale è di 10 minuti. Se l'agente richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in blocchi più piccoli e utilizza la sessione o la memoria per conservare gli stati.

  • Limitare il consumo di contenuti: quando consumi contenuti da un flusso bidirezionale, è importante gestire la velocità con cui l'agente elabora i dati in entrata. Se l'agente consuma i dati troppo lentamente, possono verificarsi problemi come aumento della latenza o pressione sulla memoria lato server. Implementa meccanismi per recuperare attivamente i dati quando l'agente è pronto a elaborarli ed evita di bloccare le operazioni che potrebbero interrompere il consumo di contenuti.

  • Limita la generazione di contenuti: se si verificano problemi di contropressione (il produttore genera dati più velocemente di quanto il consumer possa elaborarli), devi limitare la velocità di generazione dei contenuti. In questo modo, puoi evitare i buffer overflow e garantire un'esperienza di streaming fluida.

Testare il metodo di query di streaming bidirezionale

Puoi testare la query di streaming bidirezionale localmente chiamando il metodo bidi_stream_query e scorrendo i risultati:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

La stessa connessione di query bidirezionale può gestire più richieste e risposte. Per ogni nuova richiesta dalla coda, il seguente esempio genera un flusso di blocchi contenenti informazioni diverse sulla risposta:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(Facoltativo) Registrare metodi personalizzati

Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentata da una stringa vuota ""), in streaming (stream) o in streaming bidirezionale (bidi_stream).

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

Esegui il deployment di un agente

Una volta sviluppato l'agente come live_agent, puoi eseguirne il deployment in Agent Engine creando un'istanza di Agent Engine.

Tieni presente che con l'SDK Gen AI, tutte le configurazioni di deployment (pacchetti aggiuntivi e controlli delle risorse personalizzati) vengono assegnate come valore di config durante la creazione dell'istanza di Agent Engine.

Inizializza il client AI generativa:

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

Esegui il deployment dell'agente in Agent Engine:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

Per informazioni sui passaggi che avvengono in background durante il deployment, consulta Creare un'istanza di AgentEngine.

Recupera l'ID risorsa dell'agente:

remote_live_agent.api_resource.name

Utilizzare un agente

Se hai definito un'operazione bidi_stream_query durante lo sviluppo dell'agente, puoi eseguire query di streaming bidirezionali sull'agente in modo asincrono utilizzando l'SDK Gen AI per Python.

Puoi modificare il seguente esempio con qualsiasi dato riconoscibile dal tuo agente, utilizzando qualsiasi logica di terminazione applicabile per il flusso di input e il flusso di output:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Vertex AI Agent Engine Runtime trasmette le risposte come una sequenza di oggetti generati in modo iterativo. Ad esempio, un insieme di due risposte nel primo turno potrebbe avere il seguente aspetto:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Utilizzare un agente Agent Development Kit

Se hai sviluppato il tuo agente utilizzando l'Agent Development Kit (ADK), puoi utilizzare lo streaming bidirezionale per interagire con l'API Gemini Live.

L'esempio seguente crea un agente conversazionale che riceve domande di testo dell'utente e dati audio di risposta dell'API Gemini Live:

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

Passaggi successivi