Cette page explique comment utiliser le streaming bidirectionnel avec le runtime Vertex AI Agent Engine.
Présentation
Le streaming bidirectionnel fournit un canal de communication bidirectionnel et persistant entre votre application et l'agent, ce qui vous permet de dépasser les modèles de requête-réponse par tour. Le streaming bidirectionnel fonctionne pour les cas d'utilisation où votre agent doit traiter des informations et répondre en continu, par exemple en interagissant avec des entrées audio ou vidéo à faible latence.
Le streaming bidirectionnel avec le moteur d'exécution Vertex AI Agent Engine est compatible avec les cas d'utilisation d'agents interactifs en temps réel et l'échange de données pour les API multimodales en direct. Le streaming bidirectionnel est compatible avec tous les frameworks. Des méthodes de streaming bidirectionnel personnalisées sont disponibles en enregistrant des méthodes personnalisées. Vous pouvez utiliser le streaming bidirectionnel pour interagir avec l'API Gemini Live à l'aide de l'Agent Development Kit (ADK) sur Vertex AI Agent Engine.
Le déploiement d'un agent à distance avec des méthodes de requête bidirectionnelles n'est compatible qu'avec le SDK Google Gen AI. Lorsque des méthodes de requête bidirectionnelles sont détectées, le SDK Gen AI définit automatiquement le mode serveur d'agent EXPERIMENTAL
lors de l'appel de l'API REST.
Développer un agent
Lorsque vous développez un agent, suivez les étapes ci-dessous pour implémenter le streaming bidirectionnel :
Enregistrer des méthodes personnalisées (facultatif)
Définir une méthode de requête de streaming bidirectionnel
Vous pouvez définir une méthode bidi_stream_query
qui accepte de manière asynchrone les requêtes de flux en entrée et génère des réponses de flux en sortie. Par exemple, le modèle suivant étend le modèle de base pour diffuser des requêtes et des réponses, et peut être déployé sur Agent Engine :
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Gardez les points suivants à l'esprit lorsque vous utilisez l'API de streaming bidirectionnel :
asyncio.Queue
: vous pouvez placer n'importe quel type de données dans cette file d'attente de requêtes pour qu'elles soient envoyées à l'API du modèle.Délai avant expiration maximal : le délai avant expiration maximal pour une requête de streaming bidirectionnel est de 10 minutes. Si votre agent nécessite des temps de traitement plus longs, envisagez de diviser la tâche en plus petites parties et d'utiliser la session ou la mémoire pour conserver les états.
Limiter la consommation de contenu : lorsque vous consommez du contenu à partir d'un flux bidirectionnel, il est important de gérer la vitesse à laquelle votre agent traite les données entrantes. Si votre agent consomme les données trop lentement, cela peut entraîner des problèmes tels qu'une latence accrue ou une pression sur la mémoire côté serveur. Implémentez des mécanismes pour extraire activement les données lorsque votre agent est prêt à les traiter, et évitez les opérations de blocage qui pourraient interrompre la consommation de contenu.
Limitez la génération de contenu : si vous rencontrez des problèmes de contre-pression (le producteur génère des données plus rapidement que le consommateur ne peut les traiter), vous devez limiter votre taux de génération de contenu. Cela peut aider à éviter les dépassements de mémoire tampon et à garantir une expérience de streaming fluide.
Tester la méthode de requête de streaming bidirectionnel
Vous pouvez tester la requête de streaming bidirectionnel en local en appelant la méthode bidi_stream_query
et en parcourant les résultats :
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
La même connexion de requête bidirectionnelle peut gérer plusieurs requêtes et réponses. Pour chaque nouvelle requête de la file d'attente, l'exemple suivant génère un flux de blocs contenant différentes informations sur la réponse :
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(Facultatif) Enregistrer des méthodes personnalisées
Les opérations peuvent être enregistrées en tant que modes d'exécution standard (représentés par une chaîne vide ""
), de streaming (stream
) ou de streaming bidirectionnel (bidi_stream
).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Déployer un agent
Une fois que vous avez développé votre agent en tant que live_agent
, vous pouvez le déployer dans Agent Engine en créant une instance Agent Engine.
Notez qu'avec le SDK Gen AI, toutes les configurations de déploiement (packages supplémentaires et contrôles de ressources personnalisés) sont attribuées en tant que valeur de config
lors de la création de l'instance Agent Engine.
Initialisez le client Gen AI :
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Déployez l'agent dans Agent Engine :
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
Pour en savoir plus sur les étapes qui se déroulent en arrière-plan lors du déploiement, consultez Créer une instance AgentEngine.
Obtenez l'ID de ressource de l'agent :
remote_live_agent.api_resource.name
Utiliser un agent
Si vous avez défini une opération bidi_stream_query
lors du développement de votre agent, vous pouvez interroger l'agent de manière asynchrone en streaming bidirectionnel à l'aide du SDK Gen AI pour Python.
Vous pouvez modifier l'exemple suivant avec n'importe quelles données reconnaissables par votre agent, en utilisant toute logique de fin applicable pour le flux d'entrée et le flux de sortie :
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
L'environnement d'exécution Vertex AI Agent Engine diffuse les réponses sous forme de séquence d'objets générés de manière itérative. Par exemple, un ensemble de deux réponses au premier tour peut se présenter comme suit :
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Utiliser un agent Agent Development Kit
Si vous avez développé votre agent à l'aide de l'Agent Development Kit (ADK), vous pouvez utiliser le streaming bidirectionnel pour interagir avec l'API Gemini Live.
L'exemple suivant crée un agent conversationnel qui prend en compte les questions textuelles des utilisateurs et reçoit des données audio de réponse de l'API Gemini Live :
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
Étapes suivantes
- En savoir plus sur l'API Gemini Live