Sviluppare un agente Agent2Agent

Questa pagina mostra come sviluppare e testare un agente Agent2Agent (A2A). Il protocollo A2A è uno standard aperto progettato per consentire una comunicazione e una collaborazione senza interruzioni tra gli agenti AI. Questa guida si concentra sul flusso di lavoro locale, che ti consente di definire e verificare la funzionalità dell'agente prima del deployment.

Il flusso di lavoro principale prevede i seguenti passaggi:

  1. Definisci i componenti chiave
  2. Crea agente locale
  3. Testare l'agente locale

Definisci i componenti dell'agente

Per creare un agente A2A, devi definire i seguenti componenti: un AgentCard, un AgentExecutor e un ADK LlmAgent.

  • AgentCard contiene un documento di metadati che descrive le funzionalità dell'agente. AgentCard è come un biglietto da visita che altri agenti possono utilizzare per scoprire cosa può fare il tuo agente. Per maggiori dettagli, consulta le specifiche della scheda dell'agente.
  • AgentExecutor contiene la logica principale dell'agente e definisce il modo in cui gestisce le attività. È qui che implementi il comportamento dell'agente. Per saperne di più, consulta la specifica del protocollo A2A.
  • (Facoltativo) LlmAgent definisce l'agente ADK, incluse le istruzioni di sistema, il modello generativo e gli strumenti.

Definisci un AgentCard

Il seguente esempio di codice definisce un AgentCard per un agente di cambio valuta:

from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

Definisci un AgentExecutor

Il seguente esempio di codice definisce un AgentExecutor che risponde con il tasso di cambio. Prende un'istanza CurrencyAgent e inizializza ADK Runner per eseguire le richieste.

import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise ServerError(error=UnsupportedOperationError())

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        if not context.current_task:
            await updater.submit()
        await updater.start_work()

        query = context.get_user_input()
        content = types.Content(role='user', parts=[types.Part(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [TextPart(text=response_text)],
                        name='result',
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message('Failed to generate a final response with text content.'),
                final=True
            )

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                message=new_agent_text_message(f"An error occurred: {str(e)}"),
                final=True,
            )

Definisci un LlmAgent

Innanzitutto, definisci uno strumento di conversione della valuta da utilizzare per LlmAgent:

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Poi, definisci un ADK LlmAgent che utilizza lo strumento.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Crea un agente locale

Dopo aver definito i componenti dell'agente, crea un'istanza della classe A2aAgent che utilizza AgentCard, AgentExecutor e LlmAgent per iniziare il test locale.

from vertexai.preview.reasoning_engines import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

Il modello di agente A2A ti aiuta a creare un servizio conforme ad A2A. Il servizio funge da wrapper, astraendo il livello di conversione.

Testare l'agente locale

L'agente del tasso di cambio supporta i seguenti tre metodi:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Test handle_authenticated_agent_card

Il seguente codice recupera la scheda autenticata dell'agente, che descrive le sue funzionalità.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

Test on_message_send

Il seguente codice simula un client che invia un nuovo messaggio all'agente. A2aAgent crea una nuova attività e restituisce il relativo ID.

import json
from starlette.requests import Request
import asyncio

# 1. Define the message payload you want to send.
message_data = {
    "message": {
        "messageId": "local-test-message-id",
        "content":[
              {
                  "text": "What is the exchange rate from USD to EUR today?"
              }
          ],
        "role": "ROLE_USER",
    },
}

# 2. Construct the request
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "POST",
    "headers": [(b"content-type", b"application/json")],
}

async def receive():
    byte_data = json.dumps(message_data).encode("utf-8")
    return {"type": "http.request", "body": byte_data, "more_body": False}

post_request = Request(scope, receive=receive)

# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)

print(send_message_response)

Test on_get_task

Il seguente codice recupera lo stato e il risultato di un'attività. L'output mostra che l'attività è stata completata e include l'artefatto di risposta "Hello World".


from starlette.requests import Request
import asyncio

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']

# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}

# 3. Construct the starlette.requests.Request object directly.
scope = {
    "type": "http",
    "http_version": "1.1",
    "method": "GET",
    "headers": [],
    "query_string": b'',
    "path_params": task_data,
}

async def empty_receive():
    return {"type": "http.disconnect"}

get_request = Request(scope, empty_receive)

# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

Passaggi successivi