Auf dieser Seite wird beschrieben, wie Sie bidirektionales Streaming mit der Vertex AI Agent Engine-Laufzeit verwenden.
Übersicht
Bidirektionales Streaming bietet einen persistenten, bidirektionalen Kommunikationskanal zwischen Ihrer Anwendung und dem Agenten. So können Sie über turnbasierte Anfrage-Antwort-Muster hinausgehen. Bidirektionales Streaming eignet sich für Anwendungsfälle, in denen Ihr Agent Informationen verarbeiten und kontinuierlich antworten muss, z. B. bei der Interaktion mit Audio- oder Videoeingaben mit niedriger Latenz.
Das bidirektionale Streaming mit der Vertex AI Agent Engine-Laufzeit unterstützt interaktive Echtzeit-Anwendungsfälle für Agents und den Datenaustausch für multimodale Live-APIs. Bidirektionales Streaming wird für alle Frameworks unterstützt. Benutzerdefinierte bidirektionale Streamingmethoden sind durch Registrieren benutzerdefinierter Methoden verfügbar. Sie können bidirektionales Streaming verwenden, um über das Agent Development Kit (ADK) in Vertex AI Agent Engine mit der Gemini Live API zu interagieren.
Die Bereitstellung eines Remote-Agents mit bidirektionalen Abfragemethoden wird nur über das Google Gen AI SDK unterstützt. Wenn bidirektionale Abfragemethoden erkannt werden, legt das Gen AI SDK beim Aufrufen der REST API automatisch den EXPERIMENTAL
-Agent-Servermodus fest.
Agent entwickeln
Führen Sie die folgenden Schritte aus, um bidirektionales Streaming zu implementieren, wenn Sie einen Agent entwickeln:
Benutzerdefinierte Methoden registrieren (optional)
Bidirektionale Streaming-Abfragemethode definieren
Sie können eine bidi_stream_query
-Methode definieren, die asynchron Streamanfragen als Eingabe entgegennimmt und Streamingantworten ausgibt. Das folgende Beispiel zeigt, wie die einfache Vorlage erweitert wird, um Anfragen und Antworten zu streamen. Sie kann in Agent Engine bereitgestellt werden:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Beachten Sie bei der Verwendung der bidirektionalen Streaming-API Folgendes:
asyncio.Queue
: Sie können beliebige Datentypen in diese Anfragewarteschlange einfügen, die dann darauf warten, an die Modell-API gesendet zu werden.Maximale Zeitüberschreitung: Die maximale Zeitüberschreitung für bidirektionale Streamingabfragen beträgt 10 Minuten. Wenn Ihr Agent längere Verarbeitungszeiten benötigt, sollten Sie die Aufgabe in kleinere Teile aufteilen und Sitzungen oder den Arbeitsspeicher verwenden, um die Status beizubehalten.
Inhaltsverbrauch drosseln: Wenn Sie Inhalte aus einem bidirektionalen Stream nutzen, ist es wichtig, die Geschwindigkeit zu steuern, mit der Ihr Agent eingehende Daten verarbeitet. Wenn Ihr Agent Daten zu langsam verarbeitet, kann dies zu Problemen wie erhöhter Latenz oder Arbeitsspeicherbelastung auf dem Server führen. Implementieren Sie Mechanismen, um Daten aktiv abzurufen, wenn Ihr Agent bereit ist, sie zu verarbeiten. Vermeiden Sie außerdem blockierende Vorgänge, die die Nutzung von Inhalten unterbrechen könnten.
Generierung von Inhalten drosseln: Wenn Sie auf Probleme mit dem Rückstau stoßen (bei denen der Producer Daten schneller generiert, als der Consumer sie verarbeiten kann), sollten Sie die Rate der Inhaltsgenerierung drosseln. So können Pufferüberläufe verhindert und ein reibungsloses Streaming ermöglicht werden.
Bidirektionale Streaming-Anfragemethode testen
Sie können die bidirektionale Streamingabfrage lokal testen, indem Sie die Methode bidi_stream_query
aufrufen und die Ergebnisse durchlaufen:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
Über dieselbe bidirektionale Abfrageverbindung können mehrere Anfragen und Antworten verarbeitet werden. Für jede neue Anfrage aus der Warteschlange wird im folgenden Beispiel ein Stream von Chunks generiert, der verschiedene Informationen zur Antwort enthält:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
Optional: Benutzerdefinierte Methoden registrieren
Vorgänge können als Standard- (durch einen leeren String ""
dargestellt), Streaming- (stream
) oder bidirektionale Streaming-Ausführungsmodi (bidi_stream
) registriert werden.
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Agent bereitstellen
Nachdem Sie Ihren Agent als live_agent
entwickelt haben, können Sie ihn in Agent Engine bereitstellen, indem Sie eine Agent Engine-Instanz erstellen.
Mit dem Gen AI SDK werden alle Bereitstellungskonfigurationen (zusätzliche Pakete und benutzerdefinierte Ressourcenkontrollen) beim Erstellen der Agent Engine-Instanz als Wert von config
zugewiesen.
Initialisieren Sie den Client für generative KI:
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Stellen Sie den Agent in Agent Engine bereit:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
Informationen zu den Schritten, die während der Bereitstellung im Hintergrund ausgeführt werden, finden Sie unter AgentEngine-Instanz erstellen.
Agent-Ressourcen-ID abrufen:
remote_live_agent.api_resource.name
KI‑Agenten verwenden
Wenn Sie beim Entwickeln Ihres Agents einen bidi_stream_query
-Vorgang definiert haben, können Sie den Agent mit dem Gen AI SDK for Python asynchron bidirektional streamen.
Sie können das folgende Beispiel mit beliebigen Daten ändern, die von Ihrem Agent erkannt werden, und dabei eine beliebige anwendbare Beendigungslogik für den Eingabe- und Ausgabestream verwenden:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Die Vertex AI Agent Engine-Laufzeit streamt Antworten als Sequenz von iterativ generierten Objekten. Ein Satz mit zwei Antworten im ersten Zug könnte beispielsweise so aussehen:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Agent Development Kit-Agent verwenden
Wenn Sie Ihren Agent mit dem Agent Development Kit (ADK) entwickelt haben, können Sie bidirektionales Streaming verwenden, um mit der Gemini Live API zu interagieren.
Im folgenden Beispiel wird ein Konversations-Agent erstellt, der Textfragen von Nutzern entgegennimmt und Audiodaten als Antwort von der Gemini Live API erhält:
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))