对于A2A框架 AWS strands agents SDK的一些理解
这是一个完整的Agent-to-Agent (A2A)协议实现示例,展示了如何创建A2A服务器和三种不同类型的客户端来进行智能体间通信。
·
这是一个完整的Agent-to-Agent (A2A)协议实现示例,展示了如何创建A2A服务器和三种不同类型的客户端来进行智能体间通信。
项目结构分析
1. 项目配置 (pyproject.toml
)
- Python版本要求:>=3.12
- 核心依赖:
strands-agents[a2a]>=1.3.0
- Strands框架的A2A支持strands-agents-tools[a2a]>=0.2.0
- A2A客户端工具a2a-sdk[sql]>=0.3.0
- A2A SDK核心库
2. 服务器实现 (server.py
)
from strands import Agent
from strands.multiagent.a2a import A2AServer
from strands_tools.calculator import calculator
# Create a Strands agent with calculator tool
strands_agent = Agent(
name="Calculator Agent",
description="A calculator agent that can perform basic arithmetic operations.",
tools=[calculator],
callback_handler=None,
)
# Create A2A server
a2a_server = A2AServer(agent=strands_agent)
# Start the server
a2a_server.serve()
# 核心架构:
- 创建带计算器工具的Strands智能体
- 使用A2AServer包装智能体
- 在localhost:9000上提供服务
关键特点:
- 简洁的实现(仅18行代码)
- 自动暴露智能体的计算器功能
- 支持A2A协议的标准通信
3. 三种客户端实现模式
A. 智能体客户端 (client_agent.py
)
import asyncio
from strands import Agent
from strands_tools.a2a_client import A2AClientToolProvider
# initialize collection of A2A tools for the agent
provider = A2AClientToolProvider(known_agent_urls=["http://localhost:9000"])
# initialize agent with tools
agent = Agent(tools=provider.tools)
# you can also invoke the agent in a non-async context
# print(agent("pick an agent and make a sample call to test its functionality"))
# run the agent in an async context
async def main():
await agent.invoke_async(
"pick an agent and make a sample call to test its functionality"
)
# run
asyncio.run(main())
# 高级抽象模式:
- 使用A2AClientToolProvider自动发现智能体
- 创建具有A2A工具的Strands智能体
- 支持自然语言交互
优势:
- 最简单的使用方式
- 自动智能体发现
- 自然语言接口
B. 流式客户端 (client_streaming.py
)
import asyncio
import logging
from uuid import uuid4
import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 300 # set request timeout to 5 minutes
def create_message(*, role: Role = Role.user, text: str) -> Message:
return Message(
kind="message",
role=role,
parts=[Part(TextPart(kind="text", text=text))],
message_id=uuid4().hex,
)
async def send_streaming_message(message: str, base_url: str = "http://localhost:9000"):
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
# Get agent card
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
agent_card = await resolver.get_agent_card()
# Create client using factory
config = ClientConfig(
httpx_client=httpx_client,
streaming=True, # Use streaming mode
)
factory = ClientFactory(config)
client = factory.create(agent_card)
# Create and send message
msg = create_message(text=message)
async for event in client.send_message(msg):
if isinstance(event, Message):
logger.info(event.model_dump_json(exclude_none=True, indent=2))
elif isinstance(event, tuple) and len(event) == 2:
# (Task, UpdateEvent) tuple
task, update_event = event
logger.info(f"Task: {task.model_dump_json(exclude_none=True, indent=2)}")
if update_event:
logger.info(f"Update: {update_event.model_dump_json(exclude_none=True, indent=2)}")
else:
# Fallback for other response types
logger.info(f"Response: {str(event)}")
asyncio.run(send_streaming_message("what is 123 * 12"))
# 实时通信模式:
- 直接使用A2A协议进行流式通信
- 实时处理响应事件
- 适合长时间计算任务
特点:
- 实时响应处理
- 支持任务状态更新
- 适合复杂计算场景
C. 同步客户端 (client_sync.py
)
import asyncio
import logging
from uuid import uuid4
import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 300 # set request timeout to 5 minutes
def create_message(*, role: Role = Role.user, text: str) -> Message:
return Message(
kind="message",
role=role,
parts=[Part(TextPart(kind="text", text=text))],
message_id=uuid4().hex,
)
async def send_sync_message(message: str, base_url: str = "http://localhost:9000"):
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
# Get agent card
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
agent_card = await resolver.get_agent_card()
# Create client using factory
config = ClientConfig(
httpx_client=httpx_client,
streaming=False, # Use non-streaming mode for sync response
)
factory = ClientFactory(config)
client = factory.create(agent_card)
# Create and send message
msg = create_message(text=message)
# With streaming=False, this will yield exactly one result
async for event in client.send_message(msg):
if isinstance(event, Message):
logger.info(event.model_dump_json(exclude_none=True, indent=2))
return event
elif isinstance(event, tuple) and len(event) == 2:
# (Task, UpdateEvent) tuple
task, update_event = event
logger.info(f"Task: {task.model_dump_json(exclude_none=True, indent=2)}")
if update_event:
logger.info(f"Update: {update_event.model_dump_json(exclude_none=True, indent=2)}")
return task
else:
# Fallback for other response types
logger.info(f"Response: {str(event)}")
return event
asyncio.run(send_sync_message("what is 3 to the power fo 7"))
# 标准请求-响应模式:
- 同步通信模式
- 等待完整响应后返回
- 适合简单查询任务
特点:
- 简单的请求-响应模式
- 适合快速计算
- 易于集成到现有系统
技术架构亮点
1. 协议标准化
- 使用标准A2A协议进行通信
- 支持跨平台智能体互操作
- 统一的消息格式和通信模式
2. 多种通信模式
- 高级抽象:通过Strands智能体工具
- 流式通信:实时数据流处理
- 同步通信:传统请求-响应
3. 消息结构
Message(
kind="message",
role=Role.user,
parts=[Part(TextPart(kind="text", text=text))],
message_id=uuid4().hex,
)
4. 客户端配置
ClientConfig(
httpx_client=httpx_client,
streaming=True/False, # 控制通信模式
)
实际应用价值
在你的PPT项目中,这个示例可以启发以下应用:
1. 模块化智能体架构
- 内容分析智能体:专门处理PDF/文档分析
- 设计生成智能体:专门处理PPT设计和布局
- 格式转换智能体:专门处理文件格式转换
2. 分布式处理
- 不同智能体可以部署在不同服务器上
- 通过A2A协议进行协调和通信
- 提高系统的可扩展性和容错性
3. 工作流编排
- 使用智能体客户端模式进行工作流编排
- 自动发现和调用相关智能体
- 支持复杂的多步骤PPT生成流程
这个示例为构建复杂的多智能体系统提供了完整的参考实现,特别适合需要多个AI组件协作的应用场景。
我会有如下几个问题
1、他如何选择智能体,他又如何知道智能体能干什么
2、他的对话是调用对话智能体还是本身
3、他如果是本身的话他又如何能够对话,因为他好像没有模型调用
4、他们之间的记忆是如何共享的
1. 智能体选择和能力发现机制
智能体发现过程
provider = A2AClientToolProvider(known_agent_urls=["http://localhost:9000"])
A2AClientToolProvider的工作原理:
- 连接到已知的智能体URL列表
- 通过A2A协议的
get_agent_card()
获取每个智能体的"名片" - 智能体名片包含:名称、描述、可用技能/工具列表
- 自动将这些信息转换为可调用的工具
智能体能力识别
智能体会通过以下方式了解其他智能体的能力:
- 智能体名片:包含描述和技能列表
- 工具描述:每个智能体暴露的工具都有详细描述
- 动态发现:可以实时查询智能体的当前状态和能力
2. 对话机制分析
对话的实际执行者
从代码来看:
agent = Agent(tools=provider.tools)
agent("pick an agent and make a sample call")
这里的对话是由中控智能体本身处理的,不是调用其他对话智能体。流程是:
- 用户与中控智能体对话
- 中控智能体理解用户意图
- 中控智能体选择合适的工具(其他智能体)
- 调用目标智能体执行具体任务
- 中控智能体整合结果返回给用户
3. 中控智能体的模型调用
隐含的模型支持
虽然代码中没有显式指定模型,但Agent
类内部必然包含:
- 语言模型:用于理解用户输入和生成响应
- 工具调用能力:决定何时调用哪个工具
- 上下文管理:维护对话历史
这可能通过以下方式实现:
# 实际的Agent初始化可能包含默认模型
agent = Agent(
model="gpt-4", # 或其他默认模型
tools=provider.tools,
# 其他配置...
)
4. 记忆共享机制
当前架构的记忆限制
从示例来看,智能体之间默认是无状态的:
- 每次A2A调用都是独立的请求
- 没有共享的记忆存储
- 上下文需要在每次调用时传递
可能的记忆共享方案
A. 中控智能体维护全局上下文
# 中控智能体负责记忆管理
class MasterAgent:
def __init__(self):
self.conversation_history = []
self.task_context = {}
def call_sub_agent(self, agent_name, task, context):
# 将相关上下文传递给子智能体
return sub_agent.process(task, context)
B. 共享存储系统
# 使用外部存储共享记忆
class SharedMemory:
def __init__(self):
self.redis_client = Redis()
def store_context(self, session_id, context):
self.redis_client.set(f"session:{session_id}", context)
C. A2A协议扩展
# 在A2A消息中携带上下文
message = Message(
content="分析这个文档",
context={
"session_id": "xxx",
"previous_results": {...},
"user_preferences": {...}
}
)
实际应用建议
对于你的PPT项目,我建议:
1. 分层记忆架构
- 中控层:维护用户会话和全局上下文
- 任务层:每个任务的临时上下文
- 智能体层:专业智能体的领域知识
2. 上下文传递策略
- 关键信息通过A2A消息传递
- 大型数据通过共享存储引用
- 用户偏好和历史通过中控智能体管理
3. 智能体发现增强
- 动态注册机制
- 能力标签系统
- 负载均衡和故障转移
更多推荐
所有评论(0)