이 페이지에서는 Vertex AI Agent Engine Runtime에서 양방향 스트리밍을 사용하는 방법을 설명합니다.
개요
양방향 스트리밍은 애플리케이션과 에이전트 간에 지속적인 양방향 통신 채널을 제공하여 턴 기반 요청-응답 패턴을 넘어설 수 있도록 해줍니다. 양방향 스트리밍은 에이전트가 정보를 처리하고 지속적으로 응답해야 하는 사용 사례에 적합합니다(예: 지연 시간이 짧은 오디오 또는 동영상 입력과 상호작용).
Vertex AI Agent Engine 런타임의 양방향 스트리밍은 멀티모달 라이브 API를 위한 대화형 실시간 에이전트 사용 사례와 데이터 교환을 지원합니다. 양방향 스트리밍은 모든 프레임워크에서 지원되며 커스텀 메서드를 등록하여 커스텀 양방향 스트리밍 메서드를 사용할 수 있습니다. Vertex AI Agent Engine에서 에이전트 개발 키트(ADK)를 사용하여 양방향 스트리밍으로 Gemini Live API와 상호작용할 수 있습니다.
양방향 쿼리 메서드를 사용하여 원격 에이전트를 배포하는 것은 Google Gen AI SDK를 통해서만 지원됩니다. 양방향 쿼리 메서드가 감지되면 Gen AI SDK는 REST API를 호출할 때 EXPERIMENTAL
에이전트 서버 모드를 자동으로 설정합니다.
에이전트 개발
에이전트를 개발하는 동안 다음 단계에 따라 양방향 스트리밍을 구현하세요.
양방향 스트리밍 쿼리 메서드 정의
스트림 요청을 비동기적으로 입력으로 가져오고 스트리밍 응답을 출력하는 bidi_stream_query
메서드를 정의할 수 있습니다. 예를 들어 다음 템플릿은 기본 템플릿을 확장하여 요청과 응답을 스트리밍하며 에이전트 엔진에 배포할 수 있습니다.
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
양방향 스트리밍 API를 사용할 때는 다음 사항에 유의하세요.
asyncio.Queue
: 모델 API로 전송되기를 기다리는 요청 큐에 모든 데이터 유형을 넣을 수 있습니다.최대 제한 시간: 양방향 스트리밍 쿼리의 최대 제한 시간은 10분입니다. 에이전트에 더 긴 처리 시간이 필요한 경우 태스크를 더 작은 청크로 나누고 세션이나 메모리를 사용하여 상태를 유지하는 것이 좋습니다.
콘텐츠 소비 제한: 양방향 스트림에서 콘텐츠를 소비할 때는 에이전트가 수신 데이터를 처리하는 속도를 관리하는 것이 중요합니다. 에이전트가 데이터를 너무 느리게 소비하면 서버 측에서 지연 시간이 증가하거나 메모리 부족과 같은 문제가 발생할 수 있습니다. 에이전트가 데이터를 처리할 준비가 되면 데이터를 적극적으로 가져오는 메커니즘을 구현하고 콘텐츠 소비를 중단할 수 있는 작업을 차단하지 마세요.
콘텐츠 생성 제한: 백프레셔 문제(생산자가 소비자가 처리할 수 있는 것보다 더 빠르게 데이터를 생성하는 경우)가 발생하면 콘텐츠 생성 속도를 제한해야 합니다. 이렇게 하면 버퍼 오버플로를 방지하고 원활한 스트리밍 환경을 보장할 수 있습니다.
양방향 스트리밍 쿼리 메서드 테스트
bidi_stream_query
메서드를 호출하고 결과를 반복하여 양방향 스트리밍 쿼리를 로컬에서 테스트할 수 있습니다.
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
동일한 양방향 쿼리 연결은 여러 요청과 응답을 처리할 수 있습니다. 다음 예시에서는 큐의 각 새 요청에 대해 응답에 관한 다양한 정보가 포함된 청크 스트림을 생성합니다.
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(선택사항) 커스텀 메서드 등록
작업은 표준(빈 문자열 ""
로 표시됨), 스트리밍(stream
) 또는 양방향 스트리밍(bidi_stream
) 실행 모드로 등록할 수 있습니다.
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
에이전트 배포
에이전트를 live_agent
로 개발한 후 Agent Engine 인스턴스를 만들어 에이전트를 Agent Engine에 배포할 수 있습니다.
Gen AI SDK를 사용하면 에이전트 엔진 인스턴스를 만들 때 모든 배포 구성(추가 패키지 및 맞춤설정된 리소스 제어)이 config
값으로 할당됩니다.
생성형 AI 클라이언트를 초기화합니다.
import vertexai
client = vertexai.Client(project=PROJECT, location=LOCATION)
Agent Engine에 에이전트를 배포합니다.
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
},
)
배포 중에 백그라운드에서 발생하는 단계에 대한 자세한 내용은 AgentEngine 인스턴스 만들기를 참조하세요.
에이전트 리소스 ID를 가져옵니다.
remote_live_agent.api_resource.name
에이전트 사용
에이전트를 개발할 때 bidi_stream_query
작업을 정의한 경우 Python용 Gen AI SDK를 사용하여 에이전트를 비동기적으로 양방향 스트리밍 쿼리할 수 있습니다.
입력 스트림과 출력 스트림에 적용 가능한 종료 로직을 사용하여 에이전트가 인식할 수 있는 데이터로 다음 예시를 수정할 수 있습니다.
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Vertex AI Agent Engine 런타임은 반복적으로 생성된 객체의 시퀀스로 응답을 스트리밍합니다. 예를 들어 첫 번째 턴의 두 응답 세트는 다음과 같을 수 있습니다.
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
에이전트 개발 키트 에이전트 사용
에이전트 개발 키트(ADK)를 사용하여 에이전트를 개발한 경우 양방향 스트리밍을 사용하여 Gemini Live API와 상호작용할 수 있습니다.
다음 예시에서는 사용자 텍스트 질문을 받아 Gemini Live API 응답 오디오 데이터를 수신하는 대화 에이전트를 만듭니다.
import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text = "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
다음 단계
- Gemini Live API 자세히 알아보기