Questa pagina descrive come utilizzare lo streaming bidirezionale con Vertex AI Agent Engine Runtime.
Panoramica
Lo streaming bidirezionale fornisce un canale di comunicazione bidirezionale persistente tra l'applicazione e l'agente, consentendoti di andare oltre i pattern di richiesta-risposta a turni. Lo streaming bidirezionale è utile per i casi d'uso in cui l'agente deve elaborare le informazioni e rispondere continuamente, ad esempio interagire con input audio o video a bassa latenza.
Lo streaming bidirezionale con Vertex AI Agent Engine Runtime supporta casi d'uso di agent interattivi in tempo reale e lo scambio di dati per API live multimodali. Lo streaming bidirezionale è supportato per tutti i framework e i metodi di streaming bidirezionale personalizzati sono disponibili tramite la registrazione di metodi personalizzati. Puoi utilizzare lo streaming bidirezionale per interagire con l'API Gemini Live utilizzando l'Agent Development Kit (ADK) su Vertex AI Agent Engine.
Il deployment di un agente remoto con metodi di query bidirezionali è supportato solo tramite Google Gen AI SDK. Quando vengono rilevati metodi di query bidirezionali, l'SDK Gen AI imposta automaticamente la modalità server agent EXPERIMENTAL
quando chiama l'API REST.
Sviluppare un agente
Durante lo sviluppo di un agente, segui questi passaggi per implementare lo streaming bidirezionale:
Registra metodi personalizzati (facoltativo)
Definisci un metodo di query di streaming bidirezionale
Puoi definire un metodo bidi_stream_query
che accetta in modo asincrono le richieste di stream come input e restituisce risposte di streaming. Ad esempio, il seguente modello estende il modello di base per trasmettere in streaming richieste e risposte ed è implementabile su Agent Engine:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Quando utilizzi l'API di streaming bidirezionale, tieni presente quanto segue:
asyncio.Queue
: puoi inserire qualsiasi tipo di dati in questa coda di richieste in attesa di essere inviati all'API del modello.Timeout massimo: il timeout massimo per la query di streaming bidirezionale è di 10 minuti. Se l'agente richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in blocchi più piccoli e utilizza la sessione o la memoria per conservare gli stati.
Limitare il consumo di contenuti: quando consumi contenuti da un flusso bidirezionale, è importante gestire la velocità con cui l'agente elabora i dati in entrata. Se l'agente consuma i dati troppo lentamente, possono verificarsi problemi come aumento della latenza o pressione sulla memoria lato server. Implementa meccanismi per recuperare attivamente i dati quando l'agente è pronto a elaborarli ed evita di bloccare le operazioni che potrebbero interrompere il consumo di contenuti.
Limita la generazione di contenuti: se si verificano problemi di contropressione (il produttore genera dati più velocemente di quanto il consumer possa elaborarli), devi limitare la velocità di generazione dei contenuti. In questo modo, puoi evitare i buffer overflow e garantire un'esperienza di streaming fluida.
Testare il metodo di query di streaming bidirezionale
Puoi testare la query di streaming bidirezionale localmente chiamando il metodo bidi_stream_query
e scorrendo i risultati:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
La stessa connessione di query bidirezionale può gestire più richieste e risposte. Per ogni nuova richiesta dalla coda, il seguente esempio genera un flusso di blocchi contenenti informazioni diverse sulla risposta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(Facoltativo) Registrare metodi personalizzati
Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentata da una stringa vuota ""
), in streaming (stream
) o in streaming bidirezionale (bidi_stream
).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Esegui il deployment di un agente
Una volta sviluppato l'agente come live_agent
, puoi eseguirne il deployment in Agent Engine creando un'istanza di Agent Engine.
Tieni presente che con l'SDK Gen AI, tutte le configurazioni di deployment (pacchetti aggiuntivi e controlli delle risorse personalizzati) vengono assegnate come valore di config
durante la creazione dell'istanza di Agent Engine.
Inizializza il client AI generativa:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Esegui il deployment dell'agente in Agent Engine:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
Per informazioni sui passaggi che avvengono in background durante il deployment, consulta Creare un'istanza di AgentEngine.
Recupera l'ID risorsa dell'agente:
remote_live_agent.api_resource.name
Utilizzare un agente
Se hai definito un'operazione bidi_stream_query
durante lo sviluppo dell'agente, puoi eseguire query di streaming bidirezionali sull'agente in modo asincrono utilizzando l'SDK Gen AI per Python.
Puoi modificare il seguente esempio con qualsiasi dato riconoscibile dal tuo agente, utilizzando qualsiasi logica di terminazione applicabile per il flusso di input e il flusso di output:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Vertex AI Agent Engine Runtime trasmette le risposte come una sequenza di oggetti generati in modo iterativo. Ad esempio, un insieme di due risposte nel primo turno potrebbe avere il seguente aspetto:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Utilizzare un agente Agent Development Kit
Se hai sviluppato il tuo agente utilizzando l'Agent Development Kit (ADK), puoi utilizzare lo streaming bidirezionale per interagire con l'API Gemini Live.
L'esempio seguente crea un agente conversazionale che riceve domande di testo dell'utente e dati audio di risposta dell'API Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
Passaggi successivi
- Scopri di più sull'API Gemini Live.