Mengembangkan agen Agent2Agent

Halaman ini menunjukkan cara mengembangkan dan menguji agen Agent2Agent (A2A). Protokol A2A adalah standar terbuka yang dirancang untuk memungkinkan komunikasi dan kolaborasi yang lancar antara agen AI. Panduan ini berfokus pada alur kerja lokal, yang memungkinkan Anda menentukan dan memverifikasi fungsi agen sebelum deployment.

Alur kerja inti mencakup langkah-langkah berikut:

  1. Menentukan komponen utama
  2. Buat agen lokal
  3. Menguji agen lokal

Menentukan komponen agen

Untuk membuat agen A2A, Anda perlu menentukan komponen berikut: AgentCard, AgentExecutor, dan LlmAgent ADK.

  • AgentCard berisi dokumen metadata yang menjelaskan kemampuan agen Anda. AgentCard seperti kartu nama yang dapat digunakan agen lain untuk menemukan kemampuan agen Anda. Untuk mengetahui detail selengkapnya, lihat spesifikasi Kartu Agen.
  • AgentExecutor berisi logika inti agen dan menentukan cara agen menangani tugas. Di sinilah Anda menerapkan perilaku agen. Anda dapat membacanya lebih lanjut di spesifikasi protokol A2A.
  • (Opsional) LlmAgent menentukan agen ADK, termasuk petunjuk sistem, model generatif, dan alatnya.

Menentukan AgentCard

Contoh kode berikut menentukan AgentCard untuk agen nilai tukar mata uang:

from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

Menentukan AgentExecutor

Contoh kode berikut menentukan AgentExecutor yang merespons dengan nilai tukar mata uang. Mengambil instance CurrencyAgent dan melakukan inisialisasi ADK Runner untuk mengeksekusi permintaan.

import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise ServerError(error=UnsupportedOperationError())

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        if not context.current_task:
            await updater.submit()
        await updater.start_work()

        query = context.get_user_input()
        content = types.Content(role='user', parts=[types.Part(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [TextPart(text=response_text)],
                        name='result',
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message('Failed to generate a final response with text content.'),
                final=True
            )

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message(f"An error occurred: {str(e)}"),
                final=True,
            )

Menentukan LlmAgent

Pertama, tentukan alat penukaran mata uang yang akan digunakan LlmAgent:

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Kemudian, tentukan LlmAgent ADK yang menggunakan alat tersebut.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Membuat agen lokal

Setelah menentukan komponen agen, buat instance class A2aAgent yang menggunakan AgentCard, AgentExecutor, dan LlmAgent untuk memulai pengujian lokal.

from vertexai.preview.reasoning_engines import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

Template Agen A2A membantu Anda membuat layanan yang kompatibel dengan A2A. Layanan ini bertindak sebagai wrapper, yang mengabstraksi lapisan konversi dari Anda.

Menguji agen lokal

Agen nilai tukar mata uang mendukung tiga metode berikut:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Uji handle_authenticated_agent_card

Kode berikut mengambil kartu yang diautentikasi agen, yang menjelaskan kemampuan agen.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

Uji on_message_send

Kode berikut menyimulasikan klien yang mengirim pesan baru ke agen. A2aAgent membuat tugas baru dan menampilkan ID tugas.

import json
from starlette.requests import Request
import asyncio

# 1. Define the message payload you want to send.
message_data = {
    "message": {
        "messageId": "local-test-message-id",
        "content":[
              {
                  "text": "What is the exchange rate from USD to EUR today?"
              }
          ],
        "role": "ROLE_USER",
    },
}

# 2. Construct the request
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "POST",
    "headers": [(b"content-type", b"application/json")],
}

async def receive():
    byte_data = json.dumps(message_data).encode("utf-8")
    return {"type": "http.request", "body": byte_data, "more_body": False}

post_request = Request(scope, receive=receive)

# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)

print(send_message_response)

Uji on_get_task

Kode berikut mengambil status dan hasil tugas. Output menunjukkan bahwa tugas telah selesai dan menyertakan artefak respons "Hello World".


from starlette.requests import Request
import asyncio

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']

# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}

# 3. Construct the starlette.requests.Request object directly.
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "GET",
    "headers": [],
    "query_string": b'',
    "path_params": task_data,
}

async def empty_receive():
    return {"type": "http.disconnect"}

get_request = Request(scope, empty_receive)

# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

Langkah berikutnya