이 페이지에서는 Agent2Agent(A2A) 에이전트를 개발하고 테스트하는 방법을 보여줍니다. A2A 프로토콜은 AI 에이전트 간의 원활한 통신과 협업을 지원하도록 설계된 개방형 표준입니다. 이 가이드에서는 배포 전에 에이전트의 기능을 정의하고 확인할 수 있는 로컬 워크플로에 중점을 둡니다.
핵심 워크플로에는 다음 단계가 포함됩니다.
에이전트 구성요소 정의
A2A 에이전트를 만들려면 AgentCard
, AgentExecutor
, ADK LlmAgent
와 같은 구성요소를 정의해야 합니다.
AgentCard
에는 에이전트의 기능을 설명하는 메타데이터 문서가 포함되어 있습니다.AgentCard
는 다른 상담사가 내 상담사가 할 수 있는 작업을 파악하는 데 사용할 수 있는 명함과 같습니다. 자세한 내용은 상담사 카드 사양을 참고하세요.AgentExecutor
에는 에이전트의 핵심 로직이 포함되어 있으며 작업을 처리하는 방법을 정의합니다. 여기에서 에이전트의 동작을 구현합니다. 자세한 내용은 A2A 프로토콜 사양을 참고하세요.- (선택사항)
LlmAgent
는 시스템 지침, 생성 모델, 도구를 비롯한 ADK 에이전트를 정의합니다.
AgentCard
정의
다음 코드 샘플은 환율 에이전트의 AgentCard
을 정의합니다.
from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card
# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
id='get_exchange_rate',
name='Get Currency Exchange Rate',
description='Retrieves the exchange rate between two currencies on a specified date.',
tags=['Finance', 'Currency', 'Exchange Rate'],
examples=[
'What is the exchange rate from USD to EUR?',
'How many Japanese Yen is 1 US dollar worth today?',
],
)
# Create the agent card using the utility function
agent_card = create_agent_card(
agent_name='Currency Exchange Agent',
description='An agent that can provide currency exchange rates',
skills=[currency_skill]
)
AgentExecutor
정의
다음 코드 예시에서는 통화 환율로 응답하는 AgentExecutor
를 정의합니다. CurrencyAgent
인스턴스를 가져와 요청을 실행하도록 ADK 러너를 초기화합니다.
import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types
class CurrencyAgentExecutorWithRunner(AgentExecutor):
"""Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""
def __init__(self, agent: LlmAgent):
self.agent = agent
self.runner = None
def _init_adk(self):
if not self.runner:
self.runner = Runner(
app_name=self.agent.name,
agent=self.agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue):
raise ServerError(error=UnsupportedOperationError())
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
self._init_adk() # Initialize on first execute call
if not context.message:
return
user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
if not context.current_task:
await updater.submit()
await updater.start_work()
query = context.get_user_input()
content = types.Content(role='user', parts=[types.Part(text=query)])
try:
session = await self.runner.session_service.get_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
) or await self.runner.session_service.create_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
)
final_event = None
async for event in self.runner.run_async(
session_id=session.id,
user_id=user_id,
new_message=content
):
if event.is_final_response():
final_event = event
if final_event and final_event.content and final_event.content.parts:
response_text = "".join(
part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
)
if response_text:
await updater.add_artifact(
[TextPart(text=response_text)],
name='result',
)
await updater.complete()
return
await updater.update_status(
TaskState.failed,
message=new_agent_text_message('Failed to generate a final response with text content.'),
final=True
)
except Exception as e:
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(f"An error occurred: {str(e)}"),
final=True,
)
LlmAgent
정의
먼저 LlmAgent
에서 사용할 통화 환전 도구를 정의합니다.
def get_exchange_rate(
currency_from: str = "USD",
currency_to: str = "EUR",
currency_date: str = "latest",
):
"""Retrieves the exchange rate between two currencies on a specified date.
Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
exchange rate data.
"""
try:
response = requests.get(
f"https://api.frankfurter.app/{currency_date}",
params={"from": currency_from, "to": currency_to},
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
그런 다음 도구를 사용하는 ADK LlmAgent
를 정의합니다.
my_llm_agent = LlmAgent(
model='gemini-2.0-flash',
name='currency_exchange_agent',
description='An agent that can provide currency exchange rates.',
instruction="""You are a helpful currency exchange assistant.
Use the get_exchange_rate tool to answer user questions.
If the tool returns an error, inform the user about the error.""",
tools=[get_exchange_rate],
)
로컬 에이전트 만들기
에이전트의 구성요소를 정의한 후 AgentCard
, AgentExecutor
, LlmAgent
를 사용하여 로컬 테스트를 시작하는 A2aAgent
클래스의 인스턴스를 만듭니다.
from vertexai.preview.reasoning_engines import A2aAgent
a2a_agent = A2aAgent(
agent_card=agent_card, # Assuming agent_card is defined
agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
agent=my_llm_agent,
)
)
a2a_agent.set_up()
A2A 에이전트 템플릿을 사용하면 A2A 규격 서비스를 만들 수 있습니다. 이 서비스는 변환 레이어를 추상화하는 래퍼 역할을 합니다.
로컬 에이전트 테스트
환율 에이전트는 다음 세 가지 방법을 지원합니다.
handle_authenticated_agent_card
on_message_send
on_get_task
handle_authenticated_agent_card
테스트
다음 코드는 에이전트의 기능을 설명하는 에이전트의 인증된 카드를 가져옵니다.
# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)
on_message_send
테스트
다음 코드는 상담사에게 새 메시지를 보내는 클라이언트를 시뮬레이션합니다. A2aAgent
는 새 작업을 만들고 작업의 ID를 반환합니다.
import json
from starlette.requests import Request
import asyncio
# 1. Define the message payload you want to send.
message_data = {
"message": {
"messageId": "local-test-message-id",
"content":[
{
"text": "What is the exchange rate from USD to EUR today?"
}
],
"role": "ROLE_USER",
},
}
# 2. Construct the request
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"headers": [(b"content-type", b"application/json")],
}
async def receive():
byte_data = json.dumps(message_data).encode("utf-8")
return {"type": "http.request", "body": byte_data, "more_body": False}
post_request = Request(scope, receive=receive)
# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)
print(send_message_response)
on_get_task
테스트
다음 코드는 작업의 상태와 결과를 가져옵니다. 출력에는 작업이 완료되었으며 'Hello World' 응답 아티팩트가 포함되어 있습니다.
from starlette.requests import Request
import asyncio
# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']
# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}
# 3. Construct the starlette.requests.Request object directly.
scope = {
"type": "http",
"http_version": "1.1",
"method": "GET",
"headers": [],
"query_string": b'',
"path_params": task_data,
}
async def empty_receive():
return {"type": "http.disconnect"}
get_request = Request(scope, empty_receive)
# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)
print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)