Cette page vous explique comment développer et tester un agent Agent2Agent (A2A). Le protocole A2A est une norme ouverte conçue pour permettre une communication et une collaboration fluides entre les agents d'IA. Ce guide se concentre sur le workflow local, qui vous permet de définir et de valider les fonctionnalités de votre agent avant le déploiement.
Le workflow principal comprend les étapes suivantes :
Définir les composants de l'agent
Pour créer un agent A2A, vous devez définir les composants suivants : un AgentCard
, un AgentExecutor
et un LlmAgent
ADK.
AgentCard
contient un document de métadonnées qui décrit les capacités de votre agent.AgentCard
est une sorte de carte de visite que les autres agents peuvent utiliser pour découvrir ce que votre agent peut faire. Pour en savoir plus, consultez les spécifications de la carte d'agent.AgentExecutor
contient la logique de base de l'agent et définit la façon dont il gère les tâches. C'est là que vous implémentez le comportement de l'agent. Pour en savoir plus, consultez la spécification du protocole A2A.- (Facultatif)
LlmAgent
définit l'agent ADK, y compris ses instructions système, son modèle génératif et ses outils.
Définir un AgentCard
L'exemple de code suivant définit un AgentCard
pour un agent de taux de change :
from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card
# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
id='get_exchange_rate',
name='Get Currency Exchange Rate',
description='Retrieves the exchange rate between two currencies on a specified date.',
tags=['Finance', 'Currency', 'Exchange Rate'],
examples=[
'What is the exchange rate from USD to EUR?',
'How many Japanese Yen is 1 US dollar worth today?',
],
)
# Create the agent card using the utility function
agent_card = create_agent_card(
agent_name='Currency Exchange Agent',
description='An agent that can provide currency exchange rates',
skills=[currency_skill]
)
Définir un AgentExecutor
L'exemple de code suivant définit un AgentExecutor
qui répond avec le taux de change. Il prend une instance CurrencyAgent
et initialise l'ADK Runner pour exécuter les requêtes.
import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types
class CurrencyAgentExecutorWithRunner(AgentExecutor):
"""Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""
def __init__(self, agent: LlmAgent):
self.agent = agent
self.runner = None
def _init_adk(self):
if not self.runner:
self.runner = Runner(
app_name=self.agent.name,
agent=self.agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue):
raise ServerError(error=UnsupportedOperationError())
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
self._init_adk() # Initialize on first execute call
if not context.message:
return
user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
if not context.current_task:
await updater.submit()
await updater.start_work()
query = context.get_user_input()
content = types.Content(role='user', parts=[types.Part(text=query)])
try:
session = await self.runner.session_service.get_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
) or await self.runner.session_service.create_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
)
final_event = None
async for event in self.runner.run_async(
session_id=session.id,
user_id=user_id,
new_message=content
):
if event.is_final_response():
final_event = event
if final_event and final_event.content and final_event.content.parts:
response_text = "".join(
part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
)
if response_text:
await updater.add_artifact(
[TextPart(text=response_text)],
name='result',
)
await updater.complete()
return
await updater.update_status(
TaskState.failed,
message=new_agent_text_message('Failed to generate a final response with text content.'),
final=True
)
except Exception as e:
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(f"An error occurred: {str(e)}"),
final=True,
)
Définir un LlmAgent
Commencez par définir un outil de change pour que LlmAgent
puisse l'utiliser :
def get_exchange_rate(
currency_from: str = "USD",
currency_to: str = "EUR",
currency_date: str = "latest",
):
"""Retrieves the exchange rate between two currencies on a specified date.
Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
exchange rate data.
"""
try:
response = requests.get(
f"https://api.frankfurter.app/{currency_date}",
params={"from": currency_from, "to": currency_to},
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
Ensuite, définissez un LlmAgent
ADK qui utilise l'outil.
my_llm_agent = LlmAgent(
model='gemini-2.0-flash',
name='currency_exchange_agent',
description='An agent that can provide currency exchange rates.',
instruction="""You are a helpful currency exchange assistant.
Use the get_exchange_rate tool to answer user questions.
If the tool returns an error, inform the user about the error.""",
tools=[get_exchange_rate],
)
Créer un agent local
Une fois que vous avez défini les composants de votre agent, créez une instance de la classe A2aAgent
qui utilise AgentCard
, AgentExecutor
et LlmAgent
pour commencer les tests locaux.
from vertexai.preview.reasoning_engines import A2aAgent
a2a_agent = A2aAgent(
agent_card=agent_card, # Assuming agent_card is defined
agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
agent=my_llm_agent,
)
)
a2a_agent.set_up()
Le modèle d'agent A2A vous aide à créer un service conforme à A2A. Le service agit comme un wrapper, en faisant abstraction de la couche de conversion.
Tester l'agent local
L'agent de taux de change est compatible avec les trois méthodes suivantes :
handle_authenticated_agent_card
on_message_send
on_get_task
Test handle_authenticated_agent_card
Le code suivant récupère la fiche authentifiée de l'agent, qui décrit ses capacités.
# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)
Test on_message_send
Le code suivant simule un client envoyant un nouveau message à l'agent. A2aAgent
crée une tâche et renvoie son ID.
import json
from starlette.requests import Request
import asyncio
# 1. Define the message payload you want to send.
message_data = {
"message": {
"messageId": "local-test-message-id",
"content":[
{
"text": "What is the exchange rate from USD to EUR today?"
}
],
"role": "ROLE_USER",
},
}
# 2. Construct the request
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"headers": [(b"content-type", b"application/json")],
}
async def receive():
byte_data = json.dumps(message_data).encode("utf-8")
return {"type": "http.request", "body": byte_data, "more_body": False}
post_request = Request(scope, receive=receive)
# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)
print(send_message_response)
Test on_get_task
Le code suivant récupère l'état et le résultat d'une tâche. La sortie indique que la tâche est terminée et inclut l'artefact de réponse "Hello World".
from starlette.requests import Request
import asyncio
# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']
# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}
# 3. Construct the starlette.requests.Request object directly.
scope = {
"type": "http",
"http_version": "1.1",
"method": "GET",
"headers": [],
"query_string": b'',
"path_params": task_data,
}
async def empty_receive():
return {"type": "http.disconnect"}
get_request = Request(scope, empty_receive)
# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)
print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)
Étapes suivantes
- Dépôt d'exemples Agent2Agent
- Évaluez un agent.
- Déployez un agent.
- Résolvez les problèmes liés au développement d'un agent.
- Accédez à l'assistance.