AI Agent技术完整指南 第五部分:Memory、Retrieval 与 Tool/Action
本文介绍了AI Agent系统的三大核心模块:Memory(记忆模块)、Retrieval(检索模块)和Tool/Action(工具/动作模块)。其中重点阐述了Memory模块的实现,包括其层次结构(短期记忆和长期记忆)、存储方式(内存缓存、向量数据库等)以及具体的ContextDataCache实现方案。该缓存服务支持TTL过期机制、LRU淘汰策略等特性,能有效提升系统性能。文章还提供了Pyth
目录
概述
AI Agent 系统由三个核心模块组成:
- Memory(记忆模块):存储和管理 Agent 的历史对话、上下文信息
- Retrieval(检索模块):从记忆库中快速、准确地检索相关信息
- Tool/Action(工具/动作模块):执行具体操作,如调用 API、操作数据库等
这三个模块协同工作,使 Agent 能够:
- 记住历史对话
- 基于上下文做出决策
- 执行实际任务
Memory 模块
1. 什么是 Memory?
Memory 是 AI Agent 的"大脑记忆",负责存储和管理:
- 对话历史:用户与 Agent 的交互记录
- 上下文信息:当前会话的状态、词槽填充情况等
- 缓存数据:频繁访问的数据(如 UAV 列表、任务列表等)
2. Memory 的核心原理
2.1 记忆的层次结构
┌─────────────────────────────────────┐
│ 短期记忆 (Short-term Memory) │
│ - 当前会话状态 │
│ - 临时缓存数据 │
│ - 会话级别的上下文 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 长期记忆 (Long-term Memory) │
│ - 持久化的对话历史 │
│ - 用户偏好设置 │
│ - 历史任务记录 │
└─────────────────────────────────────┘
2.2 记忆的存储方式
- 内存缓存:快速访问,但易丢失
- 向量数据库:语义搜索,支持相似度匹配
- 关系数据库:结构化存储,持久化保存
3. Memory 的实现
3.1 上下文数据缓存 (ContextDataCache)
作用:智能缓存频繁访问的数据,减少 API 调用,提升性能。
核心特性:
- TTL(Time To Live)过期机制
- LRU(Least Recently Used)淘汰策略
- 自动缓存预热
- 降级方案(缓存失效时使用过期数据)
实现示例:
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import time
from threading import Lock
@dataclass
class CacheEntry:
"""缓存条目"""
data: Any
timestamp: float
ttl: int # 过期时间(秒)
access_count: int = 0
last_access: float = field(default_factory=time.time)
def is_expired(self) -> bool:
"""检查是否过期"""
return time.time() - self.timestamp > self.ttl
def touch(self):
"""更新访问时间和计数"""
self.access_count += 1
self.last_access = time.time()
class ContextDataCache:
"""上下文数据缓存服务"""
def __init__(self, config: CacheConfig):
self.config = config
self.cache: Dict[str, CacheEntry] = {}
self.lock = Lock()
# TTL映射:不同类型数据有不同的过期时间
self.ttl_mapping = {
"uav_list": 300, # 5分钟
"task_list": 600, # 10分钟
"media_list": 60, # 1分钟
}
async def get_cached_data(
self,
key: str,
fetch_func,
force_refresh: bool = False,
ttl: Optional[int] = None
) -> List[Dict]:
"""
通用的缓存数据获取方法
Args:
key: 缓存键
fetch_func: 数据获取函数(异步)
force_refresh: 是否强制刷新
ttl: 过期时间(秒)
"""
# 1. 检查缓存是否启用
if not self.config.enabled:
return await fetch_func()
# 2. 获取TTL
if ttl is None:
ttl = self.ttl_mapping.get(key.split(':')[0], self.config.default_ttl)
# 3. 检查缓存(加锁保证线程安全)
with self.lock:
if not force_refresh and key in self.cache:
entry = self.cache[key]
if not entry.is_expired():
entry.touch() # 更新访问时间
logger.debug(f"缓存命中: {key}")
return entry.data
else:
logger.info(f"缓存过期: {key}")
del self.cache[key]
# 4. 缓存未命中,调用数据源
logger.info(f"缓存未命中,调用数据源: {key}")
try:
raw_data = await fetch_func()
# 5. 更新缓存
with self.lock:
self.cache[key] = CacheEntry(
data=raw_data,
timestamp=time.time(),
ttl=ttl
)
# 缓存大小控制
self._evict_if_needed()
return raw_data
except Exception as e:
logger.error(f"数据获取失败: {key}, 错误: {str(e)}")
# 降级方案:返回过期缓存
with self.lock:
if key in self.cache:
logger.warning(f"使用过期缓存作为降级方案: {key}")
return self.cache[key].data
return []
def _evict_if_needed(self):
"""LRU淘汰策略:当缓存超过最大大小时,移除最久未访问的项"""
if len(self.cache) <= self.config.max_size:
return
# 按最后访问时间排序
items = list(self.cache.items())
items.sort(key=lambda x: x[1].last_access)
# 移除最老的项目
to_remove = len(items) - self.config.max_size + 1
for i in range(to_remove):
key_to_remove = items[i][0]
del self.cache[key_to_remove]
logger.debug(f"缓存淘汰: {key_to_remove}")
使用示例:
# 初始化缓存服务
cache_service = ContextDataCache(config=CacheConfig(
enabled=True,
default_ttl=300,
max_size=1000
))
# 获取 UAV 列表(带缓存)
async def get_uav_list(mcp_manager, user_info):
return await cache_service.get_cached_data(
key=f"uav_list:{user_info.user_id}",
fetch_func=lambda: mcp_manager.call_function(
namespace="backend",
function_name="getUavList",
parameters=user_info.to_dict()
),
force_refresh=False,
ttl=300 # 5分钟过期
)
3.2 对话持久化 (ConversationPersistenceHelper)
作用:将对话历史保存到数据库,实现长期记忆。
核心功能:
- 保存用户消息
- 保存 AI 响应
- 保存执行日志
- 保存工作流状态
实现示例:
class ConversationPersistenceHelper:
"""对话持久化助手类"""
def __init__(
self,
session_id: str,
task_info_id: Optional[int],
user_id: str,
user_raw_input: str
):
self.session_id = session_id
self.task_info_id = task_info_id
self.user_id = user_id
self.user_raw_input = user_raw_input
self.task_record: Optional[Task] = None
def initialize(self) -> bool:
"""初始化任务记录"""
try:
# 获取或创建任务记录
self.task_record, created = Task.get_or_create(
session_id=self.session_id,
task_info_id=self.task_info_id,
defaults={
"user_id": self.user_id,
"title": self.user_raw_input,
"create_time": datetime.datetime.now()
}
)
return True
except Exception as e:
logger.error(f"无法初始化对话记录: {e}")
return False
def save_user_message(self, user_raw_input: str) -> bool:
"""保存用户消息"""
if not self.task_record:
return False
try:
conv_logs = list(self.task_record.conversation_logs or [])
conv_logs.append({
"role": "user",
"content": user_raw_input,
"time": datetime.datetime.now().isoformat()
})
self.task_record.conversation_logs = conv_logs
self.task_record.save()
return True
except Exception as e:
logger.error(f"保存用户输入失败: {e}")
return False
def save_ai_message(self, content: str, autel_messages: List[Dict]) -> bool:
"""保存 AI 响应"""
if not self.task_record:
return False
try:
conv_logs = list(self.task_record.conversation_logs or [])
conv_logs.append({
"role": "ai",
"content": content,
"autel_messages": autel_messages,
"time": datetime.datetime.now().isoformat()
})
self.task_record.conversation_logs = conv_logs
self.task_record.save()
return True
except Exception as e:
logger.error(f"保存AI响应失败: {e}")
return False
数据库模型:
class Task(BaseModel):
"""对话任务存储表"""
session_id = CharField(unique=True, index=True)
task_info_id = ForeignKeyField(AgentTaskInfo, null=True)
title = CharField(index=True)
user_id = CharField(index=True)
conversation_logs = BinaryJSONField(default=[]) # 对话历史
exec_logs = BinaryJSONField(default=[]) # 执行日志
state = BinaryJSONField(null=True) # 当前状态
workflow_json = TextField(null=True) # 工作流JSON
create_time = DateTimeField(default=datetime.datetime.now)
class Meta:
table_name = 'task'
3.3 会话状态管理 (AgenticState)
作用:管理当前会话的状态信息,包括消息历史、意图识别结果、工具调用结果等。
状态结构:
class AgenticState(TypedDict, total=False):
"""Agentic Agent 状态定义"""
# 基础信息
messages: List[Dict] # 消息历史
user_input: str # 用户当前输入
user_raw_input: str # 原始用户输入
enhanced_input: Optional[str] # 增强输入(包含上下文)
user_info: UserInfo # 用户信息
# 意图识别
intent_type: Optional[str] # 意图类型
intent_confidence: float # 意图置信度
detected_scene: Optional[str] # 检测到的场景
# MCP调用相关
tool_calls: Optional[List[Dict]] # 工具调用列表
execution_results: Optional[List[Dict]] # 执行结果
# 场景处理相关
scene_context: Optional[SceneContext] # 场景上下文
waiting_for_user: bool # 是否等待用户输入
使用示例:
# 初始化状态
state: AgenticState = {
"messages": [],
"user_input": "",
"user_info": user_info,
"intent_type": None,
"intent_confidence": 0.0
}
# 更新状态
state["user_input"] = "查询所有无人机"
state["intent_type"] = "mcp"
state["tool_calls"] = [{"name": "getUavList", "parameters": {}}]
4. Memory 模块的最佳实践
- 合理设置 TTL:根据数据更新频率设置过期时间
- 使用 LRU 淘汰:防止内存溢出
- 实现降级方案:缓存失效时使用过期数据
- 异步操作:避免阻塞主线程
- 线程安全:使用锁保护共享资源
Retrieval 模块
1. 什么是 Retrieval?
Retrieval(检索)是从 Memory 中快速、准确地找到相关信息的过程。类似于人类回忆时的"搜索记忆"。
2. Retrieval 的核心原理
2.1 检索的两种方式
-
精确匹配:基于键值对的直接查找
- 优点:速度快、准确
- 缺点:需要精确的键
-
语义相似度搜索:基于向量相似度的模糊匹配
- 优点:支持自然语言查询、容错性强
- 缺点:需要向量化、计算开销较大
2.2 向量检索流程
用户查询: "巡检一号无人机"
↓
文本向量化 (Embedding)
↓
[0.23, -0.45, 0.67, ...] (384维向量)
↓
向量数据库相似度搜索
↓
找到最相似的项: "巡检1号" (相似度: 0.92)
3. Retrieval 的实现
3.1 向量存储 (ChromaVectorStore)
作用:使用 ChromaDB 实现语义相似度搜索。
核心功能:
- 数据同步:将数据转换为向量并存储
- 语义搜索:基于查询文本找到最相似的项
- 多集合管理:支持不同类型的集合(UAV、Task、Mark等)
实现示例:
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
from typing import List, Tuple, NamedTuple
class NamedItem(NamedTuple):
"""通用的命名项"""
id: str
name: str
class ChromaVectorStore:
"""ChromaDB 向量存储服务"""
def __init__(self, persist_directory: Optional[str] = None):
# 初始化 ChromaDB 客户端
if persist_directory:
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
else:
self.client = chromadb.Client(
settings=Settings(anonymized_telemetry=False)
)
# 初始化 embedding 函数(多语言模型)
self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="paraphrase-multilingual-MiniLM-L12-v2"
)
# 缓存 collection 引用
self._collections: Dict[str, Any] = {}
def sync_collection(
self,
collection_name: str,
items: List[NamedItem],
metadata_list: Optional[List[Dict]] = None
) -> int:
"""
同步数据到 collection
流程:
1. 获取或创建 collection
2. 清空旧数据
3. 将数据转换为向量并插入
"""
if not items:
return 0
try:
# 获取或创建 collection
collection = self._get_or_create_collection(collection_name)
# 清空旧数据
existing = collection.get()
if existing['ids']:
collection.delete(ids=existing['ids'])
# 准备数据
ids = []
documents = [] # 用于生成向量的文本
metadatas = []
for i, item in enumerate(items):
ids.append(str(item.id))
documents.append(item.name) # 使用名称作为文本
metadata = metadata_list[i] if metadata_list and i < len(metadata_list) else None
metadatas.append(metadata or {"name": item.name, "id": item.id})
# 批量插入(ChromaDB 会自动生成向量)
collection.add(
ids=ids,
documents=documents,
metadatas=metadatas
)
logger.info(f"同步 {len(ids)} 条数据到 {collection_name}")
return len(ids)
except Exception as e:
logger.error(f"同步数据失败: {e}")
return 0
def search(
self,
query: str,
collection_name: str,
top_k: int = 5,
threshold: float = 0.5
) -> List[Tuple[NamedItem, float]]:
"""
语义相似度搜索
Args:
query: 查询字符串
collection_name: collection 名称
top_k: 返回前k个结果
threshold: 相似度阈值(0-1)
Returns:
List[Tuple[NamedItem, float]]: (item, similarity) 列表
"""
try:
collection = self._get_or_create_collection(collection_name)
# 检查 collection 是否为空
if collection.count() == 0:
return []
# 执行查询(ChromaDB 会自动将 query 向量化)
results = collection.query(
query_texts=[query],
n_results=min(top_k, collection.count()),
include=["documents", "metadatas", "distances"]
)
# 解析结果
matched_items = []
if results['ids'] and results['ids'][0]:
ids = results['ids'][0]
documents = results['documents'][0]
distances = results['distances'][0] # 余弦距离
metadatas = results['metadatas'][0]
for i, doc_id in enumerate(ids):
# 余弦距离转换为相似度
# 余弦距离 = 1 - 余弦相似度
distance = distances[i]
similarity = 1.0 - distance
# 过滤低于阈值的结果
if similarity < threshold:
continue
# 构建 NamedItem
name = documents[i] if i < len(documents) else ""
if not name and i < len(metadatas):
name = metadatas[i].get("name", "")
item = NamedItem(id=doc_id, name=name)
matched_items.append((item, similarity))
# 按相似度降序排序
matched_items.sort(key=lambda x: x[1], reverse=True)
return matched_items
except Exception as e:
logger.error(f"搜索失败: {e}")
return []
def _get_or_create_collection(self, name: str):
"""获取或创建 collection"""
if name not in self._collections:
self._collections[name] = self.client.get_or_create_collection(
name=name,
embedding_function=self.embedding_function,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度
)
return self._collections[name]
使用示例:
# 初始化向量存储
vector_store = ChromaVectorStore(persist_directory="./chroma_db")
# 同步 UAV 列表
uav_items = [
NamedItem(id="1", name="巡检1号"),
NamedItem(id="2", name="巡检2号"),
NamedItem(id="3", name="应急无人机")
]
vector_store.sync_collection("uav", uav_items)
# 语义搜索
results = vector_store.search(
query="巡检一号", # 用户输入(可能是模糊的)
collection_name="uav",
top_k=3,
threshold=0.5
)
# 结果: [(NamedItem(id="1", name="巡检1号"), 0.92), ...]
for item, similarity in results:
print(f"{item.name}: {similarity:.2f}")
3.2 缓存检索集成
将向量检索与缓存结合:
class ContextDataCache:
"""上下文数据缓存服务(集成向量检索)"""
async def get_uav_list(self, mcp_manager, user_info, query: Optional[str] = None):
"""
获取 UAV 列表(支持语义搜索)
Args:
query: 可选的查询字符串,用于语义搜索
"""
# 1. 从缓存获取完整列表
uav_data = await self._get_cached_data(
key=f"uav_list:{user_info.user_id}",
fetch_func=lambda: mcp_manager.call_function(...),
force_refresh=False
)
# 2. 如果有查询字符串,使用向量检索过滤
if query and CHROMA_AVAILABLE:
try:
# 同步到向量存储
items = [
NamedItem(
id=str(uav.get("deviceId", "")),
name=uav.get("deviceName", "")
)
for uav in uav_data
]
vector_store.sync_collection("uav", items)
# 语义搜索
matched_items = vector_store.search(
query=query,
collection_name="uav",
top_k=5,
threshold=0.5
)
# 根据匹配结果过滤原始数据
matched_ids = {item.id for item, _ in matched_items}
uav_data = [
uav for uav in uav_data
if str(uav.get("deviceId", "")) in matched_ids
]
except Exception as e:
logger.warning(f"向量检索失败: {e}")
return uav_data
4. Retrieval 模块的最佳实践
- 选择合适的 Embedding 模型:多语言场景使用多语言模型
- 设置合理的相似度阈值:平衡准确率和召回率
- 定期更新向量库:数据变化时同步更新
- 结合精确匹配:先用精确匹配,再用语义搜索
- 缓存检索结果:避免重复计算
Tool/Action 模块
1. 什么是 Tool/Action?
Tool(工具)和 Action(动作)是 Agent 执行具体操作的能力,例如:
- 调用 API 获取数据
- 操作数据库
- 发送消息
- 控制硬件设备
2. Tool/Action 的核心原理
2.1 ReAct 模式
ReAct(Reasoning + Acting)是 Agent 的核心执行模式:
思考 (Think) → 行动 (Act) → 观察 (Observe) → 思考 (Think) → ...
示例:
用户: "查询所有无人机的状态"
Agent 思考: "我需要调用 getUavList 工具来获取无人机列表"
↓
Agent 行动: 调用 getUavList()
↓
Agent 观察: 返回了3架无人机的信息
↓
Agent 思考: "我已经获取到数据,现在可以生成响应了"
↓
Agent 响应: "当前有3架无人机:巡检1号(在线)、巡检2号(离线)..."
2.2 工具调用的生命周期
1. 工具发现 (Tool Discovery)
↓
2. 意图识别 (Intent Recognition)
↓
3. 参数提取 (Parameter Extraction)
↓
4. 工具执行 (Tool Execution)
↓
5. 结果处理 (Result Processing)
↓
6. 响应生成 (Response Generation)
3. Tool/Action 的实现
3.1 工具执行服务 (ToolExecutionService)
作用:统一管理工具调用的执行策略和结果处理。
核心功能:
- 顺序执行:工具之间有依赖关系
- 并行执行:工具之间独立
- 错误处理:重试机制、降级方案
- 结果验证:检查执行结果的有效性
实现示例:
class ToolExecutionService:
"""工具执行服务"""
def __init__(self, retry_config: RetryConfig):
self.retry_config = retry_config
async def execute_tools(
self,
tool_calls: List[Dict],
execution_strategy: str,
available_tools: Dict,
mcp_manager
) -> List[Dict]:
"""
执行工具调用列表
Args:
tool_calls: 工具调用列表
execution_strategy: 执行策略 ("sequential" 或 "parallel")
available_tools: 可用工具字典
mcp_manager: MCP管理器
"""
if execution_strategy == "parallel":
return await self._execute_tools_parallel(
tool_calls, available_tools, mcp_manager
)
else:
return await self._execute_tools_sequential(
tool_calls, available_tools, mcp_manager
)
async def _execute_tools_sequential(
self,
tool_calls: List[Dict],
available_tools: Dict,
mcp_manager
) -> List[Dict]:
"""顺序执行工具调用"""
execution_results = []
for i, tool_call in enumerate(tool_calls):
logger.info(f"[{i+1}/{len(tool_calls)}] 顺序执行: {tool_call.get('name')}")
try:
result = await self._execute_single_tool(
tool_call, available_tools, mcp_manager
)
execution_results.append(result)
except Exception as e:
logger.error(f"工具执行失败: {e}")
execution_results.append({
"tool_call": tool_call,
"success": False,
"error": str(e),
"result": None
})
return execution_results
async def _execute_tools_parallel(
self,
tool_calls: List[Dict],
available_tools: Dict,
mcp_manager
) -> List[Dict]:
"""并行执行工具调用"""
tasks = [
self._execute_single_tool(tool_call, available_tools, mcp_manager)
for tool_call in tool_calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
execution_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
execution_results.append({
"tool_call": tool_calls[i],
"success": False,
"error": str(result),
"result": None
})
else:
execution_results.append(result)
return execution_results
async def _execute_single_tool(
self,
tool_call: Dict,
available_tools: Dict,
mcp_manager
) -> Dict:
"""执行单个工具调用"""
tool_key = tool_call.get("name", "")
parameters = tool_call.get("parameters", {})
# 解析工具信息
tool_info = self._resolve_tool_info(tool_key, available_tools)
if not tool_info:
raise ValueError(f"工具不存在: {tool_key}")
namespace = tool_info["namespace"]
function_name = tool_info["function_name"]
try:
# 调用工具
execution_result = await mcp_manager.call_function(
namespace=namespace,
function_name=function_name,
parameters=parameters
)
# 处理结果
processed_result = self._process_mcp_result(execution_result)
is_success, status_message = self._validate_mcp_result(
execution_result, function_name
)
return {
"tool_call": tool_call,
"success": is_success,
"message": status_message,
"result": processed_result,
"namespace": namespace,
"function_name": function_name
}
except Exception as e:
logger.error(f"工具调用失败 {tool_key}: {str(e)}")
return {
"tool_call": tool_call,
"success": False,
"error": str(e),
"result": None,
"namespace": namespace,
"function_name": function_name
}
def _process_mcp_result(self, result) -> Union[str, Dict, List]:
"""处理MCP结果"""
try:
# 基础类型直接返回
if isinstance(result, (str, dict, list, int, float, bool)) or result is None:
return result
# 处理MCP CallToolResult对象
if hasattr(result, 'content') and isinstance(result.content, list):
first_item = result.content[0]
if hasattr(first_item, 'text'):
text = first_item.text.strip()
# 尝试解析JSON
if text.startswith('"') and text.endswith('"'):
text = text[1:-1]
try:
return json.loads(text)
except:
return text
return str(result)
except Exception as e:
logger.error(f"处理MCP结果时出错: {str(e)}")
return {"error": True, "message": f"结果处理失败: {str(e)}"}
使用示例:
# 初始化工具执行服务
execution_service = ToolExecutionService(retry_config=RetryConfig())
# 定义工具调用
tool_calls = [
{
"name": "getUavList",
"parameters": {"userId": "123"}
},
{
"name": "getTaskList",
"parameters": {"userId": "123"}
}
]
# 并行执行(两个工具独立,可以并行)
results = await execution_service.execute_tools(
tool_calls=tool_calls,
execution_strategy="parallel", # 或 "sequential"
available_tools=available_tools,
mcp_manager=mcp_manager
)
# 处理结果
for result in results:
if result["success"]:
print(f"工具 {result['function_name']} 执行成功")
print(f"结果: {result['result']}")
else:
print(f"工具 {result['function_name']} 执行失败: {result['error']}")
3.2 ReAct Agent
作用:基于 ReAct 模式实现动态推理执行 Agent。
核心流程:
class ReactAgent:
"""ReAct Agent - 动态推理执行Agent"""
def __init__(self, llm, available_tools):
self.llm = llm
self.available_tools = available_tools
async def execute_stream(
self,
goal: str,
user_info: UserInfo,
hint: Optional[str] = None,
mcp_manager=None
) -> AsyncGenerator[Dict, None]:
"""
流式执行任务
Args:
goal: 目标描述
user_info: 用户信息
hint: 可选的提示信息
mcp_manager: MCP管理器
"""
# 初始化状态
thoughts = []
observations = []
actions = []
max_iterations = 10
iteration = 0
while iteration < max_iterations:
iteration += 1
# 1. 思考:分析当前情况,决定下一步行动
thought = await self._think(
goal=goal,
thoughts=thoughts,
observations=observations,
actions=actions,
hint=hint
)
thoughts.append(thought)
yield {
"type": "thought",
"content": thought
}
# 2. 解析思考结果,提取行动
parsed = self._parse_llm_response(thought)
if parsed["action"] == "FINISH":
# 任务完成
yield {
"type": "final_answer",
"content": parsed.get("final_answer", "")
}
break
if parsed["action"] == "TOOL_CALL":
# 3. 执行工具调用
tool_name = parsed["tool_name"]
tool_params = parsed["tool_params"]
yield {
"type": "action",
"tool_name": tool_name,
"parameters": tool_params
}
observation, error = await self._execute_mcp_tool(
tool_name=tool_name,
tool_params=tool_params,
user_info=user_info,
mcp_manager=mcp_manager
)
if error:
observations.append(f"错误: {error}")
yield {
"type": "error",
"content": error
}
else:
observations.append(observation)
yield {
"type": "observation",
"content": observation
}
# 检查是否达到目标
if self._is_goal_achieved(goal, thoughts, observations):
yield {
"type": "final_answer",
"content": self._generate_final_answer(thoughts, observations)
}
break
async def _think(
self,
goal: str,
thoughts: List[str],
observations: List[str],
actions: List[str],
hint: Optional[str] = None
) -> str:
"""思考下一步行动"""
prompt = f"""
目标: {goal}
{hint if hint else ""}
之前的思考:
{chr(10).join(f"- {t}" for t in thoughts[-3:])}
之前的观察:
{chr(10).join(f"- {o}" for o in observations[-3:])}
可用的工具:
{self._format_available_tools()}
请分析当前情况,决定下一步行动。格式:
Thought: [你的思考]
Action: [TOOL_CALL:tool_name:{{"param": "value"}}] 或 [FINISH]
"""
response = await self.llm.ainvoke(prompt)
return response.content
def _parse_llm_response(self, content: str) -> Dict[str, Any]:
"""解析LLM响应,提取行动"""
# 简化示例,实际实现需要更robust的解析
if "Action: FINISH" in content:
return {"action": "FINISH", "final_answer": content}
if "Action: TOOL_CALL" in content:
# 提取工具名称和参数
# 实际实现需要更复杂的解析逻辑
match = re.search(r'TOOL_CALL:(\w+):({.*?})', content)
if match:
tool_name = match.group(1)
tool_params = json.loads(match.group(2))
return {
"action": "TOOL_CALL",
"tool_name": tool_name,
"tool_params": tool_params
}
return {"action": "UNKNOWN"}
async def _execute_mcp_tool(
self,
tool_name: str,
tool_params: Dict[str, Any],
user_info: UserInfo,
mcp_manager
) -> tuple[str, Optional[str]]:
"""执行 MCP 工具调用"""
try:
# 解析工具名称
parts = tool_name.split(":")
if len(parts) == 2:
namespace, function_name = parts
else:
namespace = "backend"
function_name = tool_name
# 注入用户信息
if user_info and "userInfoDTO" not in tool_params:
tool_params.update(user_info.to_dict())
# 执行调用
result = await mcp_manager.call_function(
namespace,
function_name,
tool_params
)
# 转换为字符串观察结果
if isinstance(result, dict):
observation = json.dumps(result, ensure_ascii=False, indent=2)
else:
observation = str(result)
return observation, None
except Exception as e:
error_msg = f"MCP 调用失败: {str(e)}"
logger.error(f"[ReactAgent] {error_msg}")
return "", error_msg
使用示例:
# 初始化 ReAct Agent
react_agent = ReactAgent(
llm=llm,
available_tools=available_tools
)
# 流式执行任务
async for event in react_agent.execute_stream(
goal="查询所有无人机的直播流信息",
user_info=user_info,
hint="先获取飞机列表,再逐个查询直播流",
mcp_manager=mcp_manager
):
if event["type"] == "thought":
print(f"思考: {event['content']}")
elif event["type"] == "action":
print(f"执行: {event['tool_name']}")
elif event["type"] == "observation":
print(f"观察: {event['content']}")
elif event["type"] == "final_answer":
print(f"最终答案: {event['content']}")
3.3 MCP 工具管理器
作用:管理 MCP(Model Context Protocol)工具的注册、发现和调用。
核心功能:
- 工具注册:注册可用的工具
- 工具发现:列出所有可用工具
- 工具调用:执行工具调用
- 连接管理:管理 MCP 服务器连接
实现示例:
class ImprovedMCPToolManager:
"""改进的MCP工具管理器"""
def __init__(self):
self.clients: Dict[str, EnhancedMCPClient] = {}
self.tools: Dict[str, Dict] = {}
async def register_server(
self,
server_name: str,
server_url: str
) -> bool:
"""注册MCP服务器"""
try:
client = EnhancedMCPClient()
if await client.connect(server_url):
self.clients[server_name] = client
# 获取工具列表
tools = await client.list_tools()
for tool in tools:
tool_key = f"{server_name}:{tool.name}"
self.tools[tool_key] = {
"namespace": server_name,
"function_name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
logger.info(f"注册服务器 {server_name}: {len(tools)} 个工具")
return True
else:
return False
except Exception as e:
logger.error(f"注册服务器失败: {e}")
return False
async def call_function(
self,
namespace: str,
function_name: str,
parameters: Dict[str, Any]
) -> Any:
"""调用MCP函数"""
if namespace not in self.clients:
raise ValueError(f"服务器 {namespace} 未注册")
client = self.clients[namespace]
result = await client.call_tool(function_name, parameters)
return result
def get_available_tools(self) -> Dict[str, Dict]:
"""获取所有可用工具"""
return self.tools.copy()
4. Tool/Action 模块的最佳实践
-
工具设计原则:
- 单一职责:每个工具只做一件事
- 幂等性:多次调用结果一致
- 错误处理:明确的错误信息
-
执行策略选择:
- 有依赖关系 → 顺序执行
- 独立操作 → 并行执行
-
错误处理:
- 重试机制:临时错误自动重试
- 降级方案:工具失败时的备选方案
- 错误传播:清晰的错误信息
-
性能优化:
- 批量操作:合并多个调用
- 缓存结果:避免重复调用
- 异步执行:不阻塞主流程
端到端实战
1. 完整示例:智能无人机管理系统
让我们构建一个完整的示例,展示三个模块如何协同工作。
1.1 系统架构
┌─────────────────────────────────────────┐
│ AgenticAgent (主Agent) │
├─────────────────────────────────────────┤
│ Memory Module │
│ ├─ ContextDataCache (缓存) │
│ ├─ ConversationPersistenceHelper │
│ └─ AgenticState (状态管理) │
├─────────────────────────────────────────┤
│ Retrieval Module │
│ └─ ChromaVectorStore (向量检索) │
├─────────────────────────────────────────┤
│ Tool/Action Module │
│ ├─ ToolExecutionService │
│ ├─ ReactAgent │
│ └─ ImprovedMCPToolManager │
└─────────────────────────────────────────┘
1.2 完整代码实现
import asyncio
from typing import Dict, List, Optional, AsyncGenerator
from app.agents.agentic.agentic_agent import AgenticAgent
from app.schemas.intent_parser import UserInfo
class SmartUAVAgent:
"""智能无人机管理Agent"""
def __init__(self):
self.agent = None
async def initialize(self):
"""初始化Agent"""
self.agent = await AgenticAgent.create()
logger.info("智能无人机管理Agent初始化完成")
async def process_user_query(
self,
user_input: str,
user_info: UserInfo,
session_id: str
) -> AsyncGenerator[Dict, None]:
"""
处理用户查询(端到端流程)
流程:
1. Memory: 加载历史对话
2. Retrieval: 检索相关信息
3. Tool/Action: 执行工具调用
4. Memory: 保存对话历史
"""
# 1. Memory: 加载历史对话
history = await self._load_conversation_history(session_id)
# 2. Retrieval: 如果查询涉及实体(如无人机名称),进行语义检索
enhanced_input = await self._enhance_input_with_retrieval(
user_input, user_info
)
# 3. 流式处理用户输入
async for event in self.agent.process_input_stream_agui(
user_input=user_input,
user_raw_input=user_input,
user_info=user_info,
session_id=session_id
):
yield event
# 4. Memory: 保存对话历史(在Agent内部自动完成)
async def _load_conversation_history(self, session_id: str) -> List[Dict]:
"""加载对话历史"""
try:
task = Task.get(Task.session_id == session_id)
return task.conversation_logs or []
except Task.DoesNotExist:
return []
async def _enhance_input_with_retrieval(
self,
user_input: str,
user_info: UserInfo
) -> str:
"""使用检索增强输入"""
# 检测是否包含实体查询(如"巡检一号")
# 这里简化处理,实际可以使用NER模型
# 如果有实体查询,使用向量检索
if any(keyword in user_input for keyword in ["无人机", "飞机", "任务"]):
# 从缓存获取数据(Memory)
cache_service = self.agent.cache_service
uav_list = await cache_service.get_uav_list(
self.agent.mcp_manager,
user_info
)
# 使用向量检索匹配(Retrieval)
vector_store = ChromaVectorStore.get_instance()
# 这里可以提取实体并进行检索
# 简化示例:直接返回原始输入
return user_input
return user_input
# 使用示例
async def main():
# 初始化Agent
smart_agent = SmartUAVAgent()
await smart_agent.initialize()
# 用户信息
user_info = UserInfo(
user_id="123",
tenantId="tenant_1",
siteId="site_1"
)
# 处理用户查询
async for event in smart_agent.process_user_query(
user_input="查询巡检一号无人机的状态",
user_info=user_info,
session_id="session_123"
):
print(f"事件: {event['type']}")
if event['type'] == 'content':
print(f"内容: {event['data']}")
# 运行
if __name__ == "__main__":
asyncio.run(main())
1.3 执行流程详解
场景:用户查询"巡检一号无人机的状态"
1. Memory 模块
├─ 加载历史对话(如果有)
└─ 初始化会话状态
2. Retrieval 模块
├─ 检测到实体查询:"巡检一号"
├─ 向量检索:在UAV集合中搜索"巡检一号"
└─ 匹配结果:找到 deviceId="1", deviceName="巡检1号"
3. Tool/Action 模块
├─ 意图识别:识别为MCP工具调用
├─ 参数提取:deviceId="1"
├─ 工具执行:调用 getUavStatus(deviceId="1")
└─ 结果处理:返回状态信息
4. Memory 模块
├─ 保存用户消息:"查询巡检一号无人机的状态"
├─ 保存AI响应:"巡检1号当前状态:在线"
└─ 更新会话状态
5. 响应生成
└─ 生成最终响应给用户
2. 进阶示例:多轮对话场景
async def multi_turn_conversation_example():
"""多轮对话示例"""
agent = await AgenticAgent.create()
user_info = UserInfo(user_id="123", tenantId="t1", siteId="s1")
session_id = "session_multi_turn"
# 第一轮:用户查询无人机列表
print("=== 第一轮对话 ===")
async for event in agent.process_input_stream_agui(
user_input="有哪些无人机?",
user_raw_input="有哪些无人机?",
user_info=user_info,
session_id=session_id
):
if event.get("type") == "final_data":
print(f"响应: {event['data']}")
# 第二轮:基于历史对话,用户继续提问
print("\n=== 第二轮对话 ===")
async for event in agent.process_input_stream_agui(
user_input="巡检1号的状态是什么?", # 引用了上一轮的"巡检1号"
user_raw_input="巡检1号的状态是什么?",
user_info=user_info,
session_id=session_id # 相同的session_id,会加载历史
):
if event.get("type") == "final_data":
print(f"响应: {event['data']}")
# Memory模块会自动:
# 1. 加载第一轮的对话历史
# 2. 理解"巡检1号"的上下文
# 3. 保存第二轮对话
最佳实践
1. Memory 模块最佳实践
1.1 缓存策略
# ✅ 好的实践:根据数据特性设置TTL
cache_config = CacheConfig(
uav_list_ttl=300, # UAV列表5分钟(变化频繁)
task_list_ttl=600, # 任务列表10分钟(变化较少)
media_list_ttl=60, # 媒体列表1分钟(实时性要求高)
default_ttl=300
)
# ❌ 不好的实践:所有数据使用相同TTL
cache_config = CacheConfig(
default_ttl=300 # 所有数据都是5分钟,不够灵活
)
1.2 对话历史管理
# ✅ 好的实践:限制历史长度,避免上下文过长
def load_conversation_history(session_id: str, max_turns: int = 10):
"""加载最近的N轮对话"""
task = Task.get(Task.session_id == session_id)
history = task.conversation_logs or []
return history[-max_turns:] # 只返回最近10轮
# ❌ 不好的实践:加载所有历史
def load_conversation_history(session_id: str):
task = Task.get(Task.session_id == session_id)
return task.conversation_logs or [] # 可能很长,影响性能
2. Retrieval 模块最佳实践
2.1 相似度阈值调优
# ✅ 好的实践:根据场景调整阈值
def search_uav(query: str, strict: bool = False):
threshold = 0.7 if strict else 0.5 # 严格模式用更高阈值
return vector_store.search(
query=query,
collection_name="uav",
threshold=threshold
)
# ❌ 不好的实践:固定阈值
def search_uav(query: str):
return vector_store.search(
query=query,
collection_name="uav",
threshold=0.5 # 固定值,不够灵活
)
2.2 混合检索策略
# ✅ 好的实践:精确匹配 + 语义检索
def search_with_fallback(query: str, items: List[NamedItem]):
# 1. 先尝试精确匹配
exact_match = [item for item in items if query.lower() == item.name.lower()]
if exact_match:
return exact_match
# 2. 精确匹配失败,使用语义检索
return vector_store.search(query, items)
3. Tool/Action 模块最佳实践
3.1 工具设计
# ✅ 好的实践:工具职责单一,参数明确
@tool
def get_uav_status(device_id: str) -> Dict:
"""获取无人机状态
Args:
device_id: 无人机设备ID
Returns:
包含状态信息的字典
"""
# 实现...
# ❌ 不好的实践:工具职责不清,参数混乱
@tool
def uav_operation(operation: str, *args, **kwargs) -> Any:
"""执行无人机操作""" # 太宽泛
# 实现...
3.2 错误处理
# ✅ 好的实践:明确的错误处理和重试
async def execute_with_retry(tool_call: Dict, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await execute_tool(tool_call)
except TemporaryError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
continue
raise
except PermanentError as e:
# 永久错误,不重试
raise
# ❌ 不好的实践:所有错误都重试
async def execute_with_retry(tool_call: Dict, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await execute_tool(tool_call)
except Exception as e: # 捕获所有异常
if attempt < max_retries - 1:
continue
raise
常见问题与解决方案
Q1: 缓存数据不一致怎么办?
问题:缓存中的数据可能与实际数据不一致。
解决方案:
- 设置合理的TTL
- 关键操作后主动失效缓存
- 实现缓存版本号机制
# 解决方案示例
async def update_uav_status(device_id: str, status: str):
# 1. 更新数据库
await db.update_uav_status(device_id, status)
# 2. 失效相关缓存
cache_service.invalidate_cache(pattern=f"uav_list:*")
cache_service.invalidate_cache(pattern=f"uav_detail:{device_id}")
Q2: 向量检索不准确怎么办?
问题:语义搜索返回的结果不相关。
解决方案:
- 调整相似度阈值
- 使用更好的Embedding模型
- 结合精确匹配
# 解决方案示例
def hybrid_search(query: str):
# 1. 精确匹配
exact_results = exact_match(query)
if exact_results:
return exact_results
# 2. 语义检索(提高阈值)
semantic_results = vector_store.search(
query=query,
threshold=0.7 # 提高阈值,更严格
)
# 3. 如果语义检索结果太少,降低阈值
if len(semantic_results) < 3:
semantic_results = vector_store.search(
query=query,
threshold=0.5
)
return semantic_results
Q3: 工具调用超时怎么办?
问题:工具调用时间过长,影响用户体验。
解决方案:
- 设置超时时间
- 实现异步执行
- 提供进度反馈
# 解决方案示例
async def execute_with_timeout(tool_call: Dict, timeout: float = 30.0):
try:
return await asyncio.wait_for(
execute_tool(tool_call),
timeout=timeout
)
except asyncio.TimeoutError:
logger.warning(f"工具调用超时: {tool_call['name']}")
return {
"success": False,
"error": "工具调用超时,请稍后重试"
}
Q4: 如何优化检索性能?
问题:向量检索速度慢。
解决方案:
- 使用索引(如HNSW)
- 限制检索范围
- 缓存检索结果
# 解决方案示例
class OptimizedVectorStore:
def __init__(self):
self.search_cache: Dict[str, List] = {}
def search(self, query: str, collection_name: str, top_k: int = 5):
# 1. 检查缓存
cache_key = f"{query}:{collection_name}:{top_k}"
if cache_key in self.search_cache:
return self.search_cache[cache_key]
# 2. 执行检索
results = self._do_search(query, collection_name, top_k)
# 3. 缓存结果(TTL较短)
self.search_cache[cache_key] = results
asyncio.create_task(self._expire_cache(cache_key, ttl=60))
return results
总结
本文详细介绍了 AI Agent 的三个核心模块:
-
Memory 模块:负责存储和管理对话历史、上下文信息
- ContextDataCache:智能缓存
- ConversationPersistenceHelper:对话持久化
- AgenticState:状态管理
-
Retrieval 模块:负责从记忆库中检索相关信息
- ChromaVectorStore:向量存储和语义搜索
- 混合检索策略:精确匹配 + 语义检索
-
Tool/Action 模块:负责执行具体操作
- ToolExecutionService:工具执行服务
- ReactAgent:动态推理执行
- ImprovedMCPToolManager:工具管理
这三个模块协同工作,构成了一个完整的 AI Agent 系统。通过合理的设计和实现,可以构建出高效、可靠的智能Agent。
更多推荐



所有评论(0)