这是一个完整的Agent-to-Agent (A2A)协议实现示例,展示了如何创建A2A服务器和三种不同类型的客户端来进行智能体间通信。

项目结构分析

1. 项目配置 (pyproject.toml)

  • Python版本要求:>=3.12
  • 核心依赖
    • strands-agents[a2a]>=1.3.0 - Strands框架的A2A支持
    • strands-agents-tools[a2a]>=0.2.0 - A2A客户端工具
    • a2a-sdk[sql]>=0.3.0 - A2A SDK核心库

2. 服务器实现 (server.py)

from strands import Agent
from strands.multiagent.a2a import A2AServer
from strands_tools.calculator import calculator

# Create a Strands agent with calculator tool
strands_agent = Agent(
    name="Calculator Agent",
    description="A calculator agent that can perform basic arithmetic operations.",
    tools=[calculator],
    callback_handler=None,
)

# Create A2A server
a2a_server = A2AServer(agent=strands_agent)

# Start the server
a2a_server.serve()
# 核心架构:
- 创建带计算器工具的Strands智能体
- 使用A2AServer包装智能体
- 在localhost:9000上提供服务

关键特点

  • 简洁的实现(仅18行代码)
  • 自动暴露智能体的计算器功能
  • 支持A2A协议的标准通信

3. 三种客户端实现模式

A. 智能体客户端 (client_agent.py)
import asyncio

from strands import Agent
from strands_tools.a2a_client import A2AClientToolProvider

# initialize collection of A2A tools for the agent
provider = A2AClientToolProvider(known_agent_urls=["http://localhost:9000"])

# initialize agent with tools
agent = Agent(tools=provider.tools)
# you can also invoke the agent in a non-async context
# print(agent("pick an agent and make a sample call to test its functionality"))


# run the agent in an async context
async def main():
    await agent.invoke_async(
        "pick an agent and make a sample call to test its functionality"
    )


# run
asyncio.run(main())
# 高级抽象模式:
- 使用A2AClientToolProvider自动发现智能体
- 创建具有A2A工具的Strands智能体
- 支持自然语言交互

优势

  • 最简单的使用方式
  • 自动智能体发现
  • 自然语言接口
B. 流式客户端 (client_streaming.py)
import asyncio
import logging

from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 300 # set request timeout to 5 minutes

def create_message(*, role: Role = Role.user, text: str) -> Message:
    return Message(
        kind="message",
        role=role,
        parts=[Part(TextPart(kind="text", text=text))],
        message_id=uuid4().hex,
    )


async def send_streaming_message(message: str, base_url: str = "http://localhost:9000"):
    async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
        # Get agent card
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
        agent_card = await resolver.get_agent_card()

        # Create client using factory
        config = ClientConfig(
            httpx_client=httpx_client,
            streaming=True,  # Use streaming mode
        )
        factory = ClientFactory(config)
        client = factory.create(agent_card)

        # Create and send message
        msg = create_message(text=message)

        async for event in client.send_message(msg):
            if isinstance(event, Message):
                logger.info(event.model_dump_json(exclude_none=True, indent=2))
            elif isinstance(event, tuple) and len(event) == 2:
                # (Task, UpdateEvent) tuple
                task, update_event = event
                logger.info(f"Task: {task.model_dump_json(exclude_none=True, indent=2)}")
                if update_event:
                    logger.info(f"Update: {update_event.model_dump_json(exclude_none=True, indent=2)}")
            else:
                # Fallback for other response types
                logger.info(f"Response: {str(event)}")


asyncio.run(send_streaming_message("what is 123 * 12"))
# 实时通信模式:
- 直接使用A2A协议进行流式通信
- 实时处理响应事件
- 适合长时间计算任务

特点

  • 实时响应处理
  • 支持任务状态更新
  • 适合复杂计算场景
C. 同步客户端 (client_sync.py)
import asyncio
import logging

from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 300 # set request timeout to 5 minutes

def create_message(*, role: Role = Role.user, text: str) -> Message:
    return Message(
        kind="message",
        role=role,
        parts=[Part(TextPart(kind="text", text=text))],
        message_id=uuid4().hex,
    )


async def send_sync_message(message: str, base_url: str = "http://localhost:9000"):
    async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
        # Get agent card
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
        agent_card = await resolver.get_agent_card()

        # Create client using factory
        config = ClientConfig(
            httpx_client=httpx_client,
            streaming=False,  # Use non-streaming mode for sync response
        )
        factory = ClientFactory(config)
        client = factory.create(agent_card)

        # Create and send message
        msg = create_message(text=message)

        # With streaming=False, this will yield exactly one result
        async for event in client.send_message(msg):
            if isinstance(event, Message):
                logger.info(event.model_dump_json(exclude_none=True, indent=2))
                return event
            elif isinstance(event, tuple) and len(event) == 2:
                # (Task, UpdateEvent) tuple
                task, update_event = event
                logger.info(f"Task: {task.model_dump_json(exclude_none=True, indent=2)}")
                if update_event:
                    logger.info(f"Update: {update_event.model_dump_json(exclude_none=True, indent=2)}")
                return task
            else:
                # Fallback for other response types
                logger.info(f"Response: {str(event)}")
                return event


asyncio.run(send_sync_message("what is 3 to the power fo 7"))
# 标准请求-响应模式:
- 同步通信模式
- 等待完整响应后返回
- 适合简单查询任务

特点

  • 简单的请求-响应模式
  • 适合快速计算
  • 易于集成到现有系统

技术架构亮点

1. 协议标准化

  • 使用标准A2A协议进行通信
  • 支持跨平台智能体互操作
  • 统一的消息格式和通信模式

2. 多种通信模式

  • 高级抽象:通过Strands智能体工具
  • 流式通信:实时数据流处理
  • 同步通信:传统请求-响应

3. 消息结构

Message(
    kind="message",
    role=Role.user,
    parts=[Part(TextPart(kind="text", text=text))],
    message_id=uuid4().hex,
)

4. 客户端配置

ClientConfig(
    httpx_client=httpx_client,
    streaming=True/False,  # 控制通信模式
)

实际应用价值

在你的PPT项目中,这个示例可以启发以下应用:

1. 模块化智能体架构

  • 内容分析智能体:专门处理PDF/文档分析
  • 设计生成智能体:专门处理PPT设计和布局
  • 格式转换智能体:专门处理文件格式转换

2. 分布式处理

  • 不同智能体可以部署在不同服务器上
  • 通过A2A协议进行协调和通信
  • 提高系统的可扩展性和容错性

3. 工作流编排

  • 使用智能体客户端模式进行工作流编排
  • 自动发现和调用相关智能体
  • 支持复杂的多步骤PPT生成流程

这个示例为构建复杂的多智能体系统提供了完整的参考实现,特别适合需要多个AI组件协作的应用场景。


我会有如下几个问题

1、他如何选择智能体,他又如何知道智能体能干什么

2、他的对话是调用对话智能体还是本身

3、他如果是本身的话他又如何能够对话,因为他好像没有模型调用

4、他们之间的记忆是如何共享的


1. 智能体选择和能力发现机制

智能体发现过程

provider = A2AClientToolProvider(known_agent_urls=["http://localhost:9000"])

A2AClientToolProvider的工作原理

  • 连接到已知的智能体URL列表
  • 通过A2A协议的get_agent_card()获取每个智能体的"名片"
  • 智能体名片包含:名称、描述、可用技能/工具列表
  • 自动将这些信息转换为可调用的工具

智能体能力识别

智能体会通过以下方式了解其他智能体的能力:

  • 智能体名片:包含描述和技能列表
  • 工具描述:每个智能体暴露的工具都有详细描述
  • 动态发现:可以实时查询智能体的当前状态和能力

2. 对话机制分析

对话的实际执行者

从代码来看:

agent = Agent(tools=provider.tools)
agent("pick an agent and make a sample call")

这里的对话是由中控智能体本身处理的,不是调用其他对话智能体。流程是:

  1. 用户与中控智能体对话
  2. 中控智能体理解用户意图
  3. 中控智能体选择合适的工具(其他智能体)
  4. 调用目标智能体执行具体任务
  5. 中控智能体整合结果返回给用户

3. 中控智能体的模型调用

隐含的模型支持

虽然代码中没有显式指定模型,但Agent类内部必然包含:

  • 语言模型:用于理解用户输入和生成响应
  • 工具调用能力:决定何时调用哪个工具
  • 上下文管理:维护对话历史

这可能通过以下方式实现:

# 实际的Agent初始化可能包含默认模型
agent = Agent(
    model="gpt-4",  # 或其他默认模型
    tools=provider.tools,
    # 其他配置...
)

4. 记忆共享机制

当前架构的记忆限制

从示例来看,智能体之间默认是无状态的

  • 每次A2A调用都是独立的请求
  • 没有共享的记忆存储
  • 上下文需要在每次调用时传递

可能的记忆共享方案

A. 中控智能体维护全局上下文
# 中控智能体负责记忆管理
class MasterAgent:
    def __init__(self):
        self.conversation_history = []
        self.task_context = {}
    
    def call_sub_agent(self, agent_name, task, context):
        # 将相关上下文传递给子智能体
        return sub_agent.process(task, context)
B. 共享存储系统
# 使用外部存储共享记忆
class SharedMemory:
    def __init__(self):
        self.redis_client = Redis()
    
    def store_context(self, session_id, context):
        self.redis_client.set(f"session:{session_id}", context)
C. A2A协议扩展
# 在A2A消息中携带上下文
message = Message(
    content="分析这个文档",
    context={
        "session_id": "xxx",
        "previous_results": {...},
        "user_preferences": {...}
    }
)

实际应用建议

对于你的PPT项目,我建议:

1. 分层记忆架构

  • 中控层:维护用户会话和全局上下文
  • 任务层:每个任务的临时上下文
  • 智能体层:专业智能体的领域知识

2. 上下文传递策略

  • 关键信息通过A2A消息传递
  • 大型数据通过共享存储引用
  • 用户偏好和历史通过中控智能体管理

3. 智能体发现增强

  • 动态注册机制
  • 能力标签系统
  • 负载均衡和故障转移

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐