Agent2Agent 에이전트 개발

이 페이지에서는 Agent2Agent(A2A) 에이전트를 개발하고 테스트하는 방법을 보여줍니다. A2A 프로토콜은 AI 에이전트 간의 원활한 통신과 협업을 지원하도록 설계된 개방형 표준입니다. 이 가이드에서는 배포 전에 에이전트의 기능을 정의하고 확인할 수 있는 로컬 워크플로에 중점을 둡니다.

핵심 워크플로에는 다음 단계가 포함됩니다.

  1. 주요 구성요소 정의
  2. 로컬 에이전트 만들기
  3. 로컬 에이전트 테스트

에이전트 구성요소 정의

A2A 에이전트를 만들려면 AgentCard, AgentExecutor, ADK LlmAgent와 같은 구성요소를 정의해야 합니다.

  • AgentCard에는 에이전트의 기능을 설명하는 메타데이터 문서가 포함되어 있습니다. AgentCard는 다른 상담사가 내 상담사가 할 수 있는 작업을 파악하는 데 사용할 수 있는 명함과 같습니다. 자세한 내용은 상담사 카드 사양을 참고하세요.
  • AgentExecutor에는 에이전트의 핵심 로직이 포함되어 있으며 작업을 처리하는 방법을 정의합니다. 여기에서 에이전트의 동작을 구현합니다. 자세한 내용은 A2A 프로토콜 사양을 참고하세요.
  • (선택사항) LlmAgent는 시스템 지침, 생성 모델, 도구를 비롯한 ADK 에이전트를 정의합니다.

AgentCard 정의

다음 코드 샘플은 환율 에이전트의 AgentCard을 정의합니다.

from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

AgentExecutor 정의

다음 코드 예시에서는 통화 환율로 응답하는 AgentExecutor를 정의합니다. CurrencyAgent 인스턴스를 가져와 요청을 실행하도록 ADK 러너를 초기화합니다.

import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise ServerError(error=UnsupportedOperationError())

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        if not context.current_task:
            await updater.submit()
        await updater.start_work()

        query = context.get_user_input()
        content = types.Content(role='user', parts=[types.Part(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [TextPart(text=response_text)],
                        name='result',
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message('Failed to generate a final response with text content.'),
                final=True
            )

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message(f"An error occurred: {str(e)}"),
                final=True,
            )

LlmAgent 정의

먼저 LlmAgent에서 사용할 통화 환전 도구를 정의합니다.

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

그런 다음 도구를 사용하는 ADK LlmAgent를 정의합니다.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

로컬 에이전트 만들기

에이전트의 구성요소를 정의한 후 AgentCard, AgentExecutor, LlmAgent를 사용하여 로컬 테스트를 시작하는 A2aAgent 클래스의 인스턴스를 만듭니다.

from vertexai.preview.reasoning_engines import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

A2A 에이전트 템플릿을 사용하면 A2A 규격 서비스를 만들 수 있습니다. 이 서비스는 변환 레이어를 추상화하는 래퍼 역할을 합니다.

로컬 에이전트 테스트

환율 에이전트는 다음 세 가지 방법을 지원합니다.

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

handle_authenticated_agent_card 테스트

다음 코드는 에이전트의 기능을 설명하는 에이전트의 인증된 카드를 가져옵니다.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

on_message_send 테스트

다음 코드는 상담사에게 새 메시지를 보내는 클라이언트를 시뮬레이션합니다. A2aAgent는 새 작업을 만들고 작업의 ID를 반환합니다.

import json
from starlette.requests import Request
import asyncio

# 1. Define the message payload you want to send.
message_data = {
    "message": {
        "messageId": "local-test-message-id",
        "content":[
              {
                  "text": "What is the exchange rate from USD to EUR today?"
              }
          ],
        "role": "ROLE_USER",
    },
}

# 2. Construct the request
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "POST",
    "headers": [(b"content-type", b"application/json")],
}

async def receive():
    byte_data = json.dumps(message_data).encode("utf-8")
    return {"type": "http.request", "body": byte_data, "more_body": False}

post_request = Request(scope, receive=receive)

# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)

print(send_message_response)

on_get_task 테스트

다음 코드는 작업의 상태와 결과를 가져옵니다. 출력에는 작업이 완료되었으며 'Hello World' 응답 아티팩트가 포함되어 있습니다.


from starlette.requests import Request
import asyncio

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']

# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}

# 3. Construct the starlette.requests.Request object directly.
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "GET",
    "headers": [],
    "query_string": b'',
    "path_params": task_data,
}

async def empty_receive():
    return {"type": "http.disconnect"}

get_request = Request(scope, empty_receive)

# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

다음 단계