Streaming dua arah dengan Runtime Agent Engine Vertex AI

Halaman ini menjelaskan cara menggunakan streaming dua arah dengan Vertex AI Agent Engine Runtime.

Ringkasan

Streaming dua arah menyediakan saluran komunikasi dua arah yang persisten antara aplikasi Anda dan agen, sehingga Anda dapat melampaui pola permintaan-respons berbasis giliran. Streaming dua arah berfungsi untuk kasus penggunaan saat agen Anda perlu memproses informasi dan merespons secara berkelanjutan, seperti berinteraksi dengan input audio atau video dengan latensi rendah.

Streaming dua arah dengan Vertex AI Agent Engine Runtime mendukung kasus penggunaan agen interaktif dan real-time serta pertukaran data untuk API live multimodal. Streaming dua arah didukung untuk semua framework, dan metode streaming dua arah kustom tersedia melalui pendaftaran metode kustom. Anda dapat menggunakan streaming dua arah untuk berinteraksi dengan Gemini Live API menggunakan Agent Development Kit (ADK) di Vertex AI Agent Engine.

Men-deploy agen jarak jauh dengan metode kueri dua arah hanya didukung melalui Google Gen AI SDK. Saat metode kueri dua arah terdeteksi, Gen AI SDK akan otomatis menetapkan mode server agen EXPERIMENTAL saat memanggil REST API.

Mengembangkan agen

Saat mengembangkan agen, gunakan langkah-langkah berikut untuk menerapkan streaming dua arah:

Menentukan metode kueri streaming dua arah

Anda dapat menentukan metode bidi_stream_query yang secara asinkron mengambil permintaan streaming sebagai input dan menghasilkan respons streaming. Sebagai contoh, template berikut memperluas template dasar untuk melakukan streaming permintaan dan respons serta dapat di-deploy di Agent Engine:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],      # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Perhatikan hal-hal berikut saat menggunakan API streaming dua arah:

  • asyncio.Queue: Anda dapat memasukkan jenis data apa pun dalam antrean permintaan ini untuk menunggu dikirim ke API model.

  • Waktu tunggu maksimum: Waktu tunggu maksimum untuk kueri streaming dua arah adalah 10 menit. Jika agen Anda memerlukan waktu pemrosesan yang lebih lama, pertimbangkan untuk memecah tugas menjadi bagian yang lebih kecil dan menggunakan sesi atau memori untuk mempertahankan status.

  • Membatasi penggunaan konten: Saat menggunakan konten dari aliran dua arah, penting untuk mengelola kecepatan pemrosesan data masuk oleh agen Anda. Jika agen Anda menggunakan data terlalu lambat, hal ini dapat menyebabkan masalah seperti peningkatan latensi atau tekanan memori di sisi server. Terapkan mekanisme untuk secara aktif menarik data saat agen Anda siap memprosesnya, dan hindari pemblokiran operasi yang dapat menghentikan konsumsi konten.

  • Membatasi pembuatan konten: Jika Anda mengalami masalah tekanan balik (ketika produsen menghasilkan data lebih cepat daripada yang dapat diproses konsumen), Anda harus membatasi kecepatan pembuatan konten. Tindakan ini dapat membantu mencegah buffer overflow dan memastikan pengalaman streaming yang lancar.

Menguji metode kueri streaming dua arah

Anda dapat menguji kueri streaming dua arah secara lokal dengan memanggil metode bidi_stream_query dan melakukan iterasi pada hasilnya:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

Koneksi kueri dua arah yang sama dapat menangani beberapa permintaan dan respons. Untuk setiap permintaan baru dari antrean, contoh berikut menghasilkan aliran potongan yang berisi informasi berbeda tentang respons:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

(Opsional) Mendaftarkan metode kustom

Operasi dapat didaftarkan sebagai mode eksekusi standar (diwakili oleh string kosong ""), streaming (stream), atau streaming dua arah (bidi_stream).

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

Men-deploy agen

Setelah mengembangkan agen sebagai live_agent, Anda dapat men-deploy agen ke Agent Engine dengan membuat instance Agent Engine.

Perhatikan bahwa dengan Gen AI SDK, semua konfigurasi deployment (paket tambahan dan kontrol resource yang disesuaikan) ditetapkan sebagai nilai config saat membuat instance Agent Engine.

Lakukan inisialisasi klien AI Generatif:

import vertexai

client = vertexai.Client(project=PROJECT, location=LOCATION)

Deploy agen ke Agent Engine:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
    },
)

Untuk mengetahui informasi tentang langkah-langkah yang terjadi di latar belakang selama deployment, lihat Membuat instance AgentEngine.

Dapatkan ID resource agen:

remote_live_agent.api_resource.name

Menggunakan agen

Jika Anda menentukan operasi bidi_stream_query saat mengembangkan agen, Anda dapat melakukan kueri streaming dua arah ke agen secara asinkron menggunakan Gen AI SDK untuk Python.

Anda dapat mengubah contoh berikut dengan data apa pun yang dapat dikenali oleh agen Anda, menggunakan logika penghentian yang berlaku untuk aliran input dan aliran output:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Vertex AI Agent Engine Runtime mengalirkan respons sebagai urutan objek yang dihasilkan secara iteratif. Misalnya, sekumpulan dua respons pada giliran pertama mungkin terlihat seperti berikut:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Menggunakan agen Agent Development Kit

Jika Anda mengembangkan agen menggunakan Agent Development Kit (ADK), Anda dapat menggunakan streaming dua arah untuk berinteraksi dengan Gemini Live API.

Contoh berikut membuat agen percakapan yang menerima pertanyaan teks pengguna dan menerima data audio respons Gemini Live API:

import numpy as np
from google.adk.agents.live_request_queue improt LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text = "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

Langkah berikutnya