目录

  1. 概述
  2. Memory 模块
  3. Retrieval 模块
  4. Tool/Action 模块
  5. 端到端实战
  6. 最佳实践
  7. 常见问题与解决方案

概述

AI Agent 系统由三个核心模块组成:

  • Memory(记忆模块):存储和管理 Agent 的历史对话、上下文信息
  • Retrieval(检索模块):从记忆库中快速、准确地检索相关信息
  • Tool/Action(工具/动作模块):执行具体操作,如调用 API、操作数据库等

这三个模块协同工作,使 Agent 能够:

  • 记住历史对话
  • 基于上下文做出决策
  • 执行实际任务

Memory 模块

1. 什么是 Memory?

Memory 是 AI Agent 的"大脑记忆",负责存储和管理:

  • 对话历史:用户与 Agent 的交互记录
  • 上下文信息:当前会话的状态、词槽填充情况等
  • 缓存数据:频繁访问的数据(如 UAV 列表、任务列表等)

2. Memory 的核心原理

2.1 记忆的层次结构
┌─────────────────────────────────────┐
│   短期记忆 (Short-term Memory)      │
│   - 当前会话状态                     │
│   - 临时缓存数据                     │
│   - 会话级别的上下文                 │
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│   长期记忆 (Long-term Memory)        │
│   - 持久化的对话历史                 │
│   - 用户偏好设置                     │
│   - 历史任务记录                     │
└─────────────────────────────────────┘
2.2 记忆的存储方式
  1. 内存缓存:快速访问,但易丢失
  2. 向量数据库:语义搜索,支持相似度匹配
  3. 关系数据库:结构化存储,持久化保存

3. Memory 的实现

3.1 上下文数据缓存 (ContextDataCache)

作用:智能缓存频繁访问的数据,减少 API 调用,提升性能。

核心特性

  • TTL(Time To Live)过期机制
  • LRU(Least Recently Used)淘汰策略
  • 自动缓存预热
  • 降级方案(缓存失效时使用过期数据)

实现示例

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import time
from threading import Lock

@dataclass
class CacheEntry:
    """缓存条目"""
    data: Any
    timestamp: float
    ttl: int  # 过期时间(秒)
    access_count: int = 0
    last_access: float = field(default_factory=time.time)
    
    def is_expired(self) -> bool:
        """检查是否过期"""
        return time.time() - self.timestamp > self.ttl
    
    def touch(self):
        """更新访问时间和计数"""
        self.access_count += 1
        self.last_access = time.time()

class ContextDataCache:
    """上下文数据缓存服务"""
    
    def __init__(self, config: CacheConfig):
        self.config = config
        self.cache: Dict[str, CacheEntry] = {}
        self.lock = Lock()
        
        # TTL映射:不同类型数据有不同的过期时间
        self.ttl_mapping = {
            "uav_list": 300,      # 5分钟
            "task_list": 600,     # 10分钟
            "media_list": 60,     # 1分钟
        }
    
    async def get_cached_data(
        self, 
        key: str, 
        fetch_func, 
        force_refresh: bool = False,
        ttl: Optional[int] = None
    ) -> List[Dict]:
        """
        通用的缓存数据获取方法
        
        Args:
            key: 缓存键
            fetch_func: 数据获取函数(异步)
            force_refresh: 是否强制刷新
            ttl: 过期时间(秒)
        """
        # 1. 检查缓存是否启用
        if not self.config.enabled:
            return await fetch_func()
        
        # 2. 获取TTL
        if ttl is None:
            ttl = self.ttl_mapping.get(key.split(':')[0], self.config.default_ttl)
        
        # 3. 检查缓存(加锁保证线程安全)
        with self.lock:
            if not force_refresh and key in self.cache:
                entry = self.cache[key]
                if not entry.is_expired():
                    entry.touch()  # 更新访问时间
                    logger.debug(f"缓存命中: {key}")
                    return entry.data
                else:
                    logger.info(f"缓存过期: {key}")
                    del self.cache[key]
        
        # 4. 缓存未命中,调用数据源
        logger.info(f"缓存未命中,调用数据源: {key}")
        try:
            raw_data = await fetch_func()
            
            # 5. 更新缓存
            with self.lock:
                self.cache[key] = CacheEntry(
                    data=raw_data,
                    timestamp=time.time(),
                    ttl=ttl
                )
                # 缓存大小控制
                self._evict_if_needed()
            
            return raw_data
            
        except Exception as e:
            logger.error(f"数据获取失败: {key}, 错误: {str(e)}")
            # 降级方案:返回过期缓存
            with self.lock:
                if key in self.cache:
                    logger.warning(f"使用过期缓存作为降级方案: {key}")
                    return self.cache[key].data
            return []
    
    def _evict_if_needed(self):
        """LRU淘汰策略:当缓存超过最大大小时,移除最久未访问的项"""
        if len(self.cache) <= self.config.max_size:
            return
        
        # 按最后访问时间排序
        items = list(self.cache.items())
        items.sort(key=lambda x: x[1].last_access)
        
        # 移除最老的项目
        to_remove = len(items) - self.config.max_size + 1
        for i in range(to_remove):
            key_to_remove = items[i][0]
            del self.cache[key_to_remove]
            logger.debug(f"缓存淘汰: {key_to_remove}")

使用示例

# 初始化缓存服务
cache_service = ContextDataCache(config=CacheConfig(
    enabled=True,
    default_ttl=300,
    max_size=1000
))

# 获取 UAV 列表(带缓存)
async def get_uav_list(mcp_manager, user_info):
    return await cache_service.get_cached_data(
        key=f"uav_list:{user_info.user_id}",
        fetch_func=lambda: mcp_manager.call_function(
            namespace="backend",
            function_name="getUavList",
            parameters=user_info.to_dict()
        ),
        force_refresh=False,
        ttl=300  # 5分钟过期
    )
3.2 对话持久化 (ConversationPersistenceHelper)

作用:将对话历史保存到数据库,实现长期记忆。

核心功能

  • 保存用户消息
  • 保存 AI 响应
  • 保存执行日志
  • 保存工作流状态

实现示例

class ConversationPersistenceHelper:
    """对话持久化助手类"""
    
    def __init__(
        self,
        session_id: str,
        task_info_id: Optional[int],
        user_id: str,
        user_raw_input: str
    ):
        self.session_id = session_id
        self.task_info_id = task_info_id
        self.user_id = user_id
        self.user_raw_input = user_raw_input
        self.task_record: Optional[Task] = None
    
    def initialize(self) -> bool:
        """初始化任务记录"""
        try:
            # 获取或创建任务记录
            self.task_record, created = Task.get_or_create(
                session_id=self.session_id,
                task_info_id=self.task_info_id,
                defaults={
                    "user_id": self.user_id,
                    "title": self.user_raw_input,
                    "create_time": datetime.datetime.now()
                }
            )
            return True
        except Exception as e:
            logger.error(f"无法初始化对话记录: {e}")
            return False
    
    def save_user_message(self, user_raw_input: str) -> bool:
        """保存用户消息"""
        if not self.task_record:
            return False
        
        try:
            conv_logs = list(self.task_record.conversation_logs or [])
            conv_logs.append({
                "role": "user",
                "content": user_raw_input,
                "time": datetime.datetime.now().isoformat()
            })
            self.task_record.conversation_logs = conv_logs
            self.task_record.save()
            return True
        except Exception as e:
            logger.error(f"保存用户输入失败: {e}")
            return False
    
    def save_ai_message(self, content: str, autel_messages: List[Dict]) -> bool:
        """保存 AI 响应"""
        if not self.task_record:
            return False
        
        try:
            conv_logs = list(self.task_record.conversation_logs or [])
            conv_logs.append({
                "role": "ai",
                "content": content,
                "autel_messages": autel_messages,
                "time": datetime.datetime.now().isoformat()
            })
            self.task_record.conversation_logs = conv_logs
            self.task_record.save()
            return True
        except Exception as e:
            logger.error(f"保存AI响应失败: {e}")
            return False

数据库模型

class Task(BaseModel):
    """对话任务存储表"""
    session_id = CharField(unique=True, index=True)
    task_info_id = ForeignKeyField(AgentTaskInfo, null=True)
    title = CharField(index=True)
    user_id = CharField(index=True)
    conversation_logs = BinaryJSONField(default=[])  # 对话历史
    exec_logs = BinaryJSONField(default=[])          # 执行日志
    state = BinaryJSONField(null=True)               # 当前状态
    workflow_json = TextField(null=True)            # 工作流JSON
    create_time = DateTimeField(default=datetime.datetime.now)
    
    class Meta:
        table_name = 'task'
3.3 会话状态管理 (AgenticState)

作用:管理当前会话的状态信息,包括消息历史、意图识别结果、工具调用结果等。

状态结构

class AgenticState(TypedDict, total=False):
    """Agentic Agent 状态定义"""
    
    # 基础信息
    messages: List[Dict]              # 消息历史
    user_input: str                   # 用户当前输入
    user_raw_input: str               # 原始用户输入
    enhanced_input: Optional[str]     # 增强输入(包含上下文)
    user_info: UserInfo              # 用户信息
    
    # 意图识别
    intent_type: Optional[str]        # 意图类型
    intent_confidence: float          # 意图置信度
    detected_scene: Optional[str]     # 检测到的场景
    
    # MCP调用相关
    tool_calls: Optional[List[Dict]]  # 工具调用列表
    execution_results: Optional[List[Dict]]  # 执行结果
    
    # 场景处理相关
    scene_context: Optional[SceneContext]  # 场景上下文
    waiting_for_user: bool           # 是否等待用户输入

使用示例

# 初始化状态
state: AgenticState = {
    "messages": [],
    "user_input": "",
    "user_info": user_info,
    "intent_type": None,
    "intent_confidence": 0.0
}

# 更新状态
state["user_input"] = "查询所有无人机"
state["intent_type"] = "mcp"
state["tool_calls"] = [{"name": "getUavList", "parameters": {}}]

4. Memory 模块的最佳实践

  1. 合理设置 TTL:根据数据更新频率设置过期时间
  2. 使用 LRU 淘汰:防止内存溢出
  3. 实现降级方案:缓存失效时使用过期数据
  4. 异步操作:避免阻塞主线程
  5. 线程安全:使用锁保护共享资源

Retrieval 模块

1. 什么是 Retrieval?

Retrieval(检索)是从 Memory 中快速、准确地找到相关信息的过程。类似于人类回忆时的"搜索记忆"。

2. Retrieval 的核心原理

2.1 检索的两种方式
  1. 精确匹配:基于键值对的直接查找

    • 优点:速度快、准确
    • 缺点:需要精确的键
  2. 语义相似度搜索:基于向量相似度的模糊匹配

    • 优点:支持自然语言查询、容错性强
    • 缺点:需要向量化、计算开销较大
2.2 向量检索流程
用户查询: "巡检一号无人机"
    ↓
文本向量化 (Embedding)
    ↓
[0.23, -0.45, 0.67, ...]  (384维向量)
    ↓
向量数据库相似度搜索
    ↓
找到最相似的项: "巡检1号" (相似度: 0.92)

3. Retrieval 的实现

3.1 向量存储 (ChromaVectorStore)

作用:使用 ChromaDB 实现语义相似度搜索。

核心功能

  • 数据同步:将数据转换为向量并存储
  • 语义搜索:基于查询文本找到最相似的项
  • 多集合管理:支持不同类型的集合(UAV、Task、Mark等)

实现示例

import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
from typing import List, Tuple, NamedTuple

class NamedItem(NamedTuple):
    """通用的命名项"""
    id: str
    name: str

class ChromaVectorStore:
    """ChromaDB 向量存储服务"""
    
    def __init__(self, persist_directory: Optional[str] = None):
        # 初始化 ChromaDB 客户端
        if persist_directory:
            self.client = chromadb.PersistentClient(
                path=persist_directory,
                settings=Settings(anonymized_telemetry=False)
            )
        else:
            self.client = chromadb.Client(
                settings=Settings(anonymized_telemetry=False)
            )
        
        # 初始化 embedding 函数(多语言模型)
        self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name="paraphrase-multilingual-MiniLM-L12-v2"
        )
        
        # 缓存 collection 引用
        self._collections: Dict[str, Any] = {}
    
    def sync_collection(
        self,
        collection_name: str,
        items: List[NamedItem],
        metadata_list: Optional[List[Dict]] = None
    ) -> int:
        """
        同步数据到 collection
        
        流程:
        1. 获取或创建 collection
        2. 清空旧数据
        3. 将数据转换为向量并插入
        """
        if not items:
            return 0
        
        try:
            # 获取或创建 collection
            collection = self._get_or_create_collection(collection_name)
            
            # 清空旧数据
            existing = collection.get()
            if existing['ids']:
                collection.delete(ids=existing['ids'])
            
            # 准备数据
            ids = []
            documents = []  # 用于生成向量的文本
            metadatas = []
            
            for i, item in enumerate(items):
                ids.append(str(item.id))
                documents.append(item.name)  # 使用名称作为文本
                metadata = metadata_list[i] if metadata_list and i < len(metadata_list) else None
                metadatas.append(metadata or {"name": item.name, "id": item.id})
            
            # 批量插入(ChromaDB 会自动生成向量)
            collection.add(
                ids=ids,
                documents=documents,
                metadatas=metadatas
            )
            
            logger.info(f"同步 {len(ids)} 条数据到 {collection_name}")
            return len(ids)
            
        except Exception as e:
            logger.error(f"同步数据失败: {e}")
            return 0
    
    def search(
        self,
        query: str,
        collection_name: str,
        top_k: int = 5,
        threshold: float = 0.5
    ) -> List[Tuple[NamedItem, float]]:
        """
        语义相似度搜索
        
        Args:
            query: 查询字符串
            collection_name: collection 名称
            top_k: 返回前k个结果
            threshold: 相似度阈值(0-1)
            
        Returns:
            List[Tuple[NamedItem, float]]: (item, similarity) 列表
        """
        try:
            collection = self._get_or_create_collection(collection_name)
            
            # 检查 collection 是否为空
            if collection.count() == 0:
                return []
            
            # 执行查询(ChromaDB 会自动将 query 向量化)
            results = collection.query(
                query_texts=[query],
                n_results=min(top_k, collection.count()),
                include=["documents", "metadatas", "distances"]
            )
            
            # 解析结果
            matched_items = []
            if results['ids'] and results['ids'][0]:
                ids = results['ids'][0]
                documents = results['documents'][0]
                distances = results['distances'][0]  # 余弦距离
                metadatas = results['metadatas'][0]
                
                for i, doc_id in enumerate(ids):
                    # 余弦距离转换为相似度
                    # 余弦距离 = 1 - 余弦相似度
                    distance = distances[i]
                    similarity = 1.0 - distance
                    
                    # 过滤低于阈值的结果
                    if similarity < threshold:
                        continue
                    
                    # 构建 NamedItem
                    name = documents[i] if i < len(documents) else ""
                    if not name and i < len(metadatas):
                        name = metadatas[i].get("name", "")
                    
                    item = NamedItem(id=doc_id, name=name)
                    matched_items.append((item, similarity))
            
            # 按相似度降序排序
            matched_items.sort(key=lambda x: x[1], reverse=True)
            
            return matched_items
            
        except Exception as e:
            logger.error(f"搜索失败: {e}")
            return []
    
    def _get_or_create_collection(self, name: str):
        """获取或创建 collection"""
        if name not in self._collections:
            self._collections[name] = self.client.get_or_create_collection(
                name=name,
                embedding_function=self.embedding_function,
                metadata={"hnsw:space": "cosine"}  # 使用余弦相似度
            )
        return self._collections[name]

使用示例

# 初始化向量存储
vector_store = ChromaVectorStore(persist_directory="./chroma_db")

# 同步 UAV 列表
uav_items = [
    NamedItem(id="1", name="巡检1号"),
    NamedItem(id="2", name="巡检2号"),
    NamedItem(id="3", name="应急无人机")
]
vector_store.sync_collection("uav", uav_items)

# 语义搜索
results = vector_store.search(
    query="巡检一号",  # 用户输入(可能是模糊的)
    collection_name="uav",
    top_k=3,
    threshold=0.5
)

# 结果: [(NamedItem(id="1", name="巡检1号"), 0.92), ...]
for item, similarity in results:
    print(f"{item.name}: {similarity:.2f}")
3.2 缓存检索集成

将向量检索与缓存结合

class ContextDataCache:
    """上下文数据缓存服务(集成向量检索)"""
    
    async def get_uav_list(self, mcp_manager, user_info, query: Optional[str] = None):
        """
        获取 UAV 列表(支持语义搜索)
        
        Args:
            query: 可选的查询字符串,用于语义搜索
        """
        # 1. 从缓存获取完整列表
        uav_data = await self._get_cached_data(
            key=f"uav_list:{user_info.user_id}",
            fetch_func=lambda: mcp_manager.call_function(...),
            force_refresh=False
        )
        
        # 2. 如果有查询字符串,使用向量检索过滤
        if query and CHROMA_AVAILABLE:
            try:
                # 同步到向量存储
                items = [
                    NamedItem(
                        id=str(uav.get("deviceId", "")),
                        name=uav.get("deviceName", "")
                    )
                    for uav in uav_data
                ]
                vector_store.sync_collection("uav", items)
                
                # 语义搜索
                matched_items = vector_store.search(
                    query=query,
                    collection_name="uav",
                    top_k=5,
                    threshold=0.5
                )
                
                # 根据匹配结果过滤原始数据
                matched_ids = {item.id for item, _ in matched_items}
                uav_data = [
                    uav for uav in uav_data
                    if str(uav.get("deviceId", "")) in matched_ids
                ]
                
            except Exception as e:
                logger.warning(f"向量检索失败: {e}")
        
        return uav_data

4. Retrieval 模块的最佳实践

  1. 选择合适的 Embedding 模型:多语言场景使用多语言模型
  2. 设置合理的相似度阈值:平衡准确率和召回率
  3. 定期更新向量库:数据变化时同步更新
  4. 结合精确匹配:先用精确匹配,再用语义搜索
  5. 缓存检索结果:避免重复计算

Tool/Action 模块

1. 什么是 Tool/Action?

Tool(工具)和 Action(动作)是 Agent 执行具体操作的能力,例如:

  • 调用 API 获取数据
  • 操作数据库
  • 发送消息
  • 控制硬件设备

2. Tool/Action 的核心原理

2.1 ReAct 模式

ReAct(Reasoning + Acting)是 Agent 的核心执行模式:

思考 (Think) → 行动 (Act) → 观察 (Observe) → 思考 (Think) → ...

示例

用户: "查询所有无人机的状态"

Agent 思考: "我需要调用 getUavList 工具来获取无人机列表"
    ↓
Agent 行动: 调用 getUavList()
    ↓
Agent 观察: 返回了3架无人机的信息
    ↓
Agent 思考: "我已经获取到数据,现在可以生成响应了"
    ↓
Agent 响应: "当前有3架无人机:巡检1号(在线)、巡检2号(离线)..."
2.2 工具调用的生命周期
1. 工具发现 (Tool Discovery)
   ↓
2. 意图识别 (Intent Recognition)
   ↓
3. 参数提取 (Parameter Extraction)
   ↓
4. 工具执行 (Tool Execution)
   ↓
5. 结果处理 (Result Processing)
   ↓
6. 响应生成 (Response Generation)

3. Tool/Action 的实现

3.1 工具执行服务 (ToolExecutionService)

作用:统一管理工具调用的执行策略和结果处理。

核心功能

  • 顺序执行:工具之间有依赖关系
  • 并行执行:工具之间独立
  • 错误处理:重试机制、降级方案
  • 结果验证:检查执行结果的有效性

实现示例

class ToolExecutionService:
    """工具执行服务"""
    
    def __init__(self, retry_config: RetryConfig):
        self.retry_config = retry_config
    
    async def execute_tools(
        self,
        tool_calls: List[Dict],
        execution_strategy: str,
        available_tools: Dict,
        mcp_manager
    ) -> List[Dict]:
        """
        执行工具调用列表
        
        Args:
            tool_calls: 工具调用列表
            execution_strategy: 执行策略 ("sequential" 或 "parallel")
            available_tools: 可用工具字典
            mcp_manager: MCP管理器
        """
        if execution_strategy == "parallel":
            return await self._execute_tools_parallel(
                tool_calls, available_tools, mcp_manager
            )
        else:
            return await self._execute_tools_sequential(
                tool_calls, available_tools, mcp_manager
            )
    
    async def _execute_tools_sequential(
        self,
        tool_calls: List[Dict],
        available_tools: Dict,
        mcp_manager
    ) -> List[Dict]:
        """顺序执行工具调用"""
        execution_results = []
        
        for i, tool_call in enumerate(tool_calls):
            logger.info(f"[{i+1}/{len(tool_calls)}] 顺序执行: {tool_call.get('name')}")
            try:
                result = await self._execute_single_tool(
                    tool_call, available_tools, mcp_manager
                )
                execution_results.append(result)
            except Exception as e:
                logger.error(f"工具执行失败: {e}")
                execution_results.append({
                    "tool_call": tool_call,
                    "success": False,
                    "error": str(e),
                    "result": None
                })
        
        return execution_results
    
    async def _execute_tools_parallel(
        self,
        tool_calls: List[Dict],
        available_tools: Dict,
        mcp_manager
    ) -> List[Dict]:
        """并行执行工具调用"""
        tasks = [
            self._execute_single_tool(tool_call, available_tools, mcp_manager)
            for tool_call in tool_calls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        execution_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                execution_results.append({
                    "tool_call": tool_calls[i],
                    "success": False,
                    "error": str(result),
                    "result": None
                })
            else:
                execution_results.append(result)
        
        return execution_results
    
    async def _execute_single_tool(
        self,
        tool_call: Dict,
        available_tools: Dict,
        mcp_manager
    ) -> Dict:
        """执行单个工具调用"""
        tool_key = tool_call.get("name", "")
        parameters = tool_call.get("parameters", {})
        
        # 解析工具信息
        tool_info = self._resolve_tool_info(tool_key, available_tools)
        if not tool_info:
            raise ValueError(f"工具不存在: {tool_key}")
        
        namespace = tool_info["namespace"]
        function_name = tool_info["function_name"]
        
        try:
            # 调用工具
            execution_result = await mcp_manager.call_function(
                namespace=namespace,
                function_name=function_name,
                parameters=parameters
            )
            
            # 处理结果
            processed_result = self._process_mcp_result(execution_result)
            is_success, status_message = self._validate_mcp_result(
                execution_result, function_name
            )
            
            return {
                "tool_call": tool_call,
                "success": is_success,
                "message": status_message,
                "result": processed_result,
                "namespace": namespace,
                "function_name": function_name
            }
            
        except Exception as e:
            logger.error(f"工具调用失败 {tool_key}: {str(e)}")
            return {
                "tool_call": tool_call,
                "success": False,
                "error": str(e),
                "result": None,
                "namespace": namespace,
                "function_name": function_name
            }
    
    def _process_mcp_result(self, result) -> Union[str, Dict, List]:
        """处理MCP结果"""
        try:
            # 基础类型直接返回
            if isinstance(result, (str, dict, list, int, float, bool)) or result is None:
                return result
            
            # 处理MCP CallToolResult对象
            if hasattr(result, 'content') and isinstance(result.content, list):
                first_item = result.content[0]
                if hasattr(first_item, 'text'):
                    text = first_item.text.strip()
                    # 尝试解析JSON
                    if text.startswith('"') and text.endswith('"'):
                        text = text[1:-1]
                    try:
                        return json.loads(text)
                    except:
                        return text
            
            return str(result)
            
        except Exception as e:
            logger.error(f"处理MCP结果时出错: {str(e)}")
            return {"error": True, "message": f"结果处理失败: {str(e)}"}

使用示例

# 初始化工具执行服务
execution_service = ToolExecutionService(retry_config=RetryConfig())

# 定义工具调用
tool_calls = [
    {
        "name": "getUavList",
        "parameters": {"userId": "123"}
    },
    {
        "name": "getTaskList",
        "parameters": {"userId": "123"}
    }
]

# 并行执行(两个工具独立,可以并行)
results = await execution_service.execute_tools(
    tool_calls=tool_calls,
    execution_strategy="parallel",  # 或 "sequential"
    available_tools=available_tools,
    mcp_manager=mcp_manager
)

# 处理结果
for result in results:
    if result["success"]:
        print(f"工具 {result['function_name']} 执行成功")
        print(f"结果: {result['result']}")
    else:
        print(f"工具 {result['function_name']} 执行失败: {result['error']}")
3.2 ReAct Agent

作用:基于 ReAct 模式实现动态推理执行 Agent。

核心流程

class ReactAgent:
    """ReAct Agent - 动态推理执行Agent"""
    
    def __init__(self, llm, available_tools):
        self.llm = llm
        self.available_tools = available_tools
    
    async def execute_stream(
        self,
        goal: str,
        user_info: UserInfo,
        hint: Optional[str] = None,
        mcp_manager=None
    ) -> AsyncGenerator[Dict, None]:
        """
        流式执行任务
        
        Args:
            goal: 目标描述
            user_info: 用户信息
            hint: 可选的提示信息
            mcp_manager: MCP管理器
        """
        # 初始化状态
        thoughts = []
        observations = []
        actions = []
        
        max_iterations = 10
        iteration = 0
        
        while iteration < max_iterations:
            iteration += 1
            
            # 1. 思考:分析当前情况,决定下一步行动
            thought = await self._think(
                goal=goal,
                thoughts=thoughts,
                observations=observations,
                actions=actions,
                hint=hint
            )
            thoughts.append(thought)
            
            yield {
                "type": "thought",
                "content": thought
            }
            
            # 2. 解析思考结果,提取行动
            parsed = self._parse_llm_response(thought)
            
            if parsed["action"] == "FINISH":
                # 任务完成
                yield {
                    "type": "final_answer",
                    "content": parsed.get("final_answer", "")
                }
                break
            
            if parsed["action"] == "TOOL_CALL":
                # 3. 执行工具调用
                tool_name = parsed["tool_name"]
                tool_params = parsed["tool_params"]
                
                yield {
                    "type": "action",
                    "tool_name": tool_name,
                    "parameters": tool_params
                }
                
                observation, error = await self._execute_mcp_tool(
                    tool_name=tool_name,
                    tool_params=tool_params,
                    user_info=user_info,
                    mcp_manager=mcp_manager
                )
                
                if error:
                    observations.append(f"错误: {error}")
                    yield {
                        "type": "error",
                        "content": error
                    }
                else:
                    observations.append(observation)
                    yield {
                        "type": "observation",
                        "content": observation
                    }
            
            # 检查是否达到目标
            if self._is_goal_achieved(goal, thoughts, observations):
                yield {
                    "type": "final_answer",
                    "content": self._generate_final_answer(thoughts, observations)
                }
                break
    
    async def _think(
        self,
        goal: str,
        thoughts: List[str],
        observations: List[str],
        actions: List[str],
        hint: Optional[str] = None
    ) -> str:
        """思考下一步行动"""
        prompt = f"""
目标: {goal}

{hint if hint else ""}

之前的思考:
{chr(10).join(f"- {t}" for t in thoughts[-3:])}

之前的观察:
{chr(10).join(f"- {o}" for o in observations[-3:])}

可用的工具:
{self._format_available_tools()}

请分析当前情况,决定下一步行动。格式:
Thought: [你的思考]
Action: [TOOL_CALL:tool_name:{{"param": "value"}}] 或 [FINISH]
"""
        
        response = await self.llm.ainvoke(prompt)
        return response.content
    
    def _parse_llm_response(self, content: str) -> Dict[str, Any]:
        """解析LLM响应,提取行动"""
        # 简化示例,实际实现需要更robust的解析
        if "Action: FINISH" in content:
            return {"action": "FINISH", "final_answer": content}
        
        if "Action: TOOL_CALL" in content:
            # 提取工具名称和参数
            # 实际实现需要更复杂的解析逻辑
            match = re.search(r'TOOL_CALL:(\w+):({.*?})', content)
            if match:
                tool_name = match.group(1)
                tool_params = json.loads(match.group(2))
                return {
                    "action": "TOOL_CALL",
                    "tool_name": tool_name,
                    "tool_params": tool_params
                }
        
        return {"action": "UNKNOWN"}
    
    async def _execute_mcp_tool(
        self,
        tool_name: str,
        tool_params: Dict[str, Any],
        user_info: UserInfo,
        mcp_manager
    ) -> tuple[str, Optional[str]]:
        """执行 MCP 工具调用"""
        try:
            # 解析工具名称
            parts = tool_name.split(":")
            if len(parts) == 2:
                namespace, function_name = parts
            else:
                namespace = "backend"
                function_name = tool_name
            
            # 注入用户信息
            if user_info and "userInfoDTO" not in tool_params:
                tool_params.update(user_info.to_dict())
            
            # 执行调用
            result = await mcp_manager.call_function(
                namespace,
                function_name,
                tool_params
            )
            
            # 转换为字符串观察结果
            if isinstance(result, dict):
                observation = json.dumps(result, ensure_ascii=False, indent=2)
            else:
                observation = str(result)
            
            return observation, None
            
        except Exception as e:
            error_msg = f"MCP 调用失败: {str(e)}"
            logger.error(f"[ReactAgent] {error_msg}")
            return "", error_msg

使用示例

# 初始化 ReAct Agent
react_agent = ReactAgent(
    llm=llm,
    available_tools=available_tools
)

# 流式执行任务
async for event in react_agent.execute_stream(
    goal="查询所有无人机的直播流信息",
    user_info=user_info,
    hint="先获取飞机列表,再逐个查询直播流",
    mcp_manager=mcp_manager
):
    if event["type"] == "thought":
        print(f"思考: {event['content']}")
    elif event["type"] == "action":
        print(f"执行: {event['tool_name']}")
    elif event["type"] == "observation":
        print(f"观察: {event['content']}")
    elif event["type"] == "final_answer":
        print(f"最终答案: {event['content']}")
3.3 MCP 工具管理器

作用:管理 MCP(Model Context Protocol)工具的注册、发现和调用。

核心功能

  • 工具注册:注册可用的工具
  • 工具发现:列出所有可用工具
  • 工具调用:执行工具调用
  • 连接管理:管理 MCP 服务器连接

实现示例

class ImprovedMCPToolManager:
    """改进的MCP工具管理器"""
    
    def __init__(self):
        self.clients: Dict[str, EnhancedMCPClient] = {}
        self.tools: Dict[str, Dict] = {}
    
    async def register_server(
        self,
        server_name: str,
        server_url: str
    ) -> bool:
        """注册MCP服务器"""
        try:
            client = EnhancedMCPClient()
            if await client.connect(server_url):
                self.clients[server_name] = client
                
                # 获取工具列表
                tools = await client.list_tools()
                for tool in tools:
                    tool_key = f"{server_name}:{tool.name}"
                    self.tools[tool_key] = {
                        "namespace": server_name,
                        "function_name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema
                    }
                
                logger.info(f"注册服务器 {server_name}: {len(tools)} 个工具")
                return True
            else:
                return False
        except Exception as e:
            logger.error(f"注册服务器失败: {e}")
            return False
    
    async def call_function(
        self,
        namespace: str,
        function_name: str,
        parameters: Dict[str, Any]
    ) -> Any:
        """调用MCP函数"""
        if namespace not in self.clients:
            raise ValueError(f"服务器 {namespace} 未注册")
        
        client = self.clients[namespace]
        result = await client.call_tool(function_name, parameters)
        return result
    
    def get_available_tools(self) -> Dict[str, Dict]:
        """获取所有可用工具"""
        return self.tools.copy()

4. Tool/Action 模块的最佳实践

  1. 工具设计原则

    • 单一职责:每个工具只做一件事
    • 幂等性:多次调用结果一致
    • 错误处理:明确的错误信息
  2. 执行策略选择

    • 有依赖关系 → 顺序执行
    • 独立操作 → 并行执行
  3. 错误处理

    • 重试机制:临时错误自动重试
    • 降级方案:工具失败时的备选方案
    • 错误传播:清晰的错误信息
  4. 性能优化

    • 批量操作:合并多个调用
    • 缓存结果:避免重复调用
    • 异步执行:不阻塞主流程

端到端实战

1. 完整示例:智能无人机管理系统

让我们构建一个完整的示例,展示三个模块如何协同工作。

1.1 系统架构
┌─────────────────────────────────────────┐
│          AgenticAgent (主Agent)          │
├─────────────────────────────────────────┤
│  Memory Module                          │
│  ├─ ContextDataCache (缓存)             │
│  ├─ ConversationPersistenceHelper       │
│  └─ AgenticState (状态管理)             │
├─────────────────────────────────────────┤
│  Retrieval Module                       │
│  └─ ChromaVectorStore (向量检索)        │
├─────────────────────────────────────────┤
│  Tool/Action Module                     │
│  ├─ ToolExecutionService                │
│  ├─ ReactAgent                          │
│  └─ ImprovedMCPToolManager              │
└─────────────────────────────────────────┘
1.2 完整代码实现
import asyncio
from typing import Dict, List, Optional, AsyncGenerator
from app.agents.agentic.agentic_agent import AgenticAgent
from app.schemas.intent_parser import UserInfo

class SmartUAVAgent:
    """智能无人机管理Agent"""
    
    def __init__(self):
        self.agent = None
    
    async def initialize(self):
        """初始化Agent"""
        self.agent = await AgenticAgent.create()
        logger.info("智能无人机管理Agent初始化完成")
    
    async def process_user_query(
        self,
        user_input: str,
        user_info: UserInfo,
        session_id: str
    ) -> AsyncGenerator[Dict, None]:
        """
        处理用户查询(端到端流程)
        
        流程:
        1. Memory: 加载历史对话
        2. Retrieval: 检索相关信息
        3. Tool/Action: 执行工具调用
        4. Memory: 保存对话历史
        """
        # 1. Memory: 加载历史对话
        history = await self._load_conversation_history(session_id)
        
        # 2. Retrieval: 如果查询涉及实体(如无人机名称),进行语义检索
        enhanced_input = await self._enhance_input_with_retrieval(
            user_input, user_info
        )
        
        # 3. 流式处理用户输入
        async for event in self.agent.process_input_stream_agui(
            user_input=user_input,
            user_raw_input=user_input,
            user_info=user_info,
            session_id=session_id
        ):
            yield event
        
        # 4. Memory: 保存对话历史(在Agent内部自动完成)
    
    async def _load_conversation_history(self, session_id: str) -> List[Dict]:
        """加载对话历史"""
        try:
            task = Task.get(Task.session_id == session_id)
            return task.conversation_logs or []
        except Task.DoesNotExist:
            return []
    
    async def _enhance_input_with_retrieval(
        self,
        user_input: str,
        user_info: UserInfo
    ) -> str:
        """使用检索增强输入"""
        # 检测是否包含实体查询(如"巡检一号")
        # 这里简化处理,实际可以使用NER模型
        
        # 如果有实体查询,使用向量检索
        if any(keyword in user_input for keyword in ["无人机", "飞机", "任务"]):
            # 从缓存获取数据(Memory)
            cache_service = self.agent.cache_service
            uav_list = await cache_service.get_uav_list(
                self.agent.mcp_manager,
                user_info
            )
            
            # 使用向量检索匹配(Retrieval)
            vector_store = ChromaVectorStore.get_instance()
            # 这里可以提取实体并进行检索
            # 简化示例:直接返回原始输入
            return user_input
        
        return user_input

# 使用示例
async def main():
    # 初始化Agent
    smart_agent = SmartUAVAgent()
    await smart_agent.initialize()
    
    # 用户信息
    user_info = UserInfo(
        user_id="123",
        tenantId="tenant_1",
        siteId="site_1"
    )
    
    # 处理用户查询
    async for event in smart_agent.process_user_query(
        user_input="查询巡检一号无人机的状态",
        user_info=user_info,
        session_id="session_123"
    ):
        print(f"事件: {event['type']}")
        if event['type'] == 'content':
            print(f"内容: {event['data']}")

# 运行
if __name__ == "__main__":
    asyncio.run(main())
1.3 执行流程详解

场景:用户查询"巡检一号无人机的状态"

1. Memory 模块
   ├─ 加载历史对话(如果有)
   └─ 初始化会话状态

2. Retrieval 模块
   ├─ 检测到实体查询:"巡检一号"
   ├─ 向量检索:在UAV集合中搜索"巡检一号"
   └─ 匹配结果:找到 deviceId="1", deviceName="巡检1号"

3. Tool/Action 模块
   ├─ 意图识别:识别为MCP工具调用
   ├─ 参数提取:deviceId="1"
   ├─ 工具执行:调用 getUavStatus(deviceId="1")
   └─ 结果处理:返回状态信息

4. Memory 模块
   ├─ 保存用户消息:"查询巡检一号无人机的状态"
   ├─ 保存AI响应:"巡检1号当前状态:在线"
   └─ 更新会话状态

5. 响应生成
   └─ 生成最终响应给用户

2. 进阶示例:多轮对话场景

async def multi_turn_conversation_example():
    """多轮对话示例"""
    agent = await AgenticAgent.create()
    user_info = UserInfo(user_id="123", tenantId="t1", siteId="s1")
    session_id = "session_multi_turn"
    
    # 第一轮:用户查询无人机列表
    print("=== 第一轮对话 ===")
    async for event in agent.process_input_stream_agui(
        user_input="有哪些无人机?",
        user_raw_input="有哪些无人机?",
        user_info=user_info,
        session_id=session_id
    ):
        if event.get("type") == "final_data":
            print(f"响应: {event['data']}")
    
    # 第二轮:基于历史对话,用户继续提问
    print("\n=== 第二轮对话 ===")
    async for event in agent.process_input_stream_agui(
        user_input="巡检1号的状态是什么?",  # 引用了上一轮的"巡检1号"
        user_raw_input="巡检1号的状态是什么?",
        user_info=user_info,
        session_id=session_id  # 相同的session_id,会加载历史
    ):
        if event.get("type") == "final_data":
            print(f"响应: {event['data']}")
    
    # Memory模块会自动:
    # 1. 加载第一轮的对话历史
    # 2. 理解"巡检1号"的上下文
    # 3. 保存第二轮对话

最佳实践

1. Memory 模块最佳实践

1.1 缓存策略
# ✅ 好的实践:根据数据特性设置TTL
cache_config = CacheConfig(
    uav_list_ttl=300,      # UAV列表5分钟(变化频繁)
    task_list_ttl=600,     # 任务列表10分钟(变化较少)
    media_list_ttl=60,     # 媒体列表1分钟(实时性要求高)
    default_ttl=300
)

# ❌ 不好的实践:所有数据使用相同TTL
cache_config = CacheConfig(
    default_ttl=300  # 所有数据都是5分钟,不够灵活
)
1.2 对话历史管理
# ✅ 好的实践:限制历史长度,避免上下文过长
def load_conversation_history(session_id: str, max_turns: int = 10):
    """加载最近的N轮对话"""
    task = Task.get(Task.session_id == session_id)
    history = task.conversation_logs or []
    return history[-max_turns:]  # 只返回最近10轮

# ❌ 不好的实践:加载所有历史
def load_conversation_history(session_id: str):
    task = Task.get(Task.session_id == session_id)
    return task.conversation_logs or []  # 可能很长,影响性能

2. Retrieval 模块最佳实践

2.1 相似度阈值调优
# ✅ 好的实践:根据场景调整阈值
def search_uav(query: str, strict: bool = False):
    threshold = 0.7 if strict else 0.5  # 严格模式用更高阈值
    return vector_store.search(
        query=query,
        collection_name="uav",
        threshold=threshold
    )

# ❌ 不好的实践:固定阈值
def search_uav(query: str):
    return vector_store.search(
        query=query,
        collection_name="uav",
        threshold=0.5  # 固定值,不够灵活
    )
2.2 混合检索策略
# ✅ 好的实践:精确匹配 + 语义检索
def search_with_fallback(query: str, items: List[NamedItem]):
    # 1. 先尝试精确匹配
    exact_match = [item for item in items if query.lower() == item.name.lower()]
    if exact_match:
        return exact_match
    
    # 2. 精确匹配失败,使用语义检索
    return vector_store.search(query, items)

3. Tool/Action 模块最佳实践

3.1 工具设计
# ✅ 好的实践:工具职责单一,参数明确
@tool
def get_uav_status(device_id: str) -> Dict:
    """获取无人机状态
    
    Args:
        device_id: 无人机设备ID
        
    Returns:
        包含状态信息的字典
    """
    # 实现...

# ❌ 不好的实践:工具职责不清,参数混乱
@tool
def uav_operation(operation: str, *args, **kwargs) -> Any:
    """执行无人机操作"""  # 太宽泛
    # 实现...
3.2 错误处理
# ✅ 好的实践:明确的错误处理和重试
async def execute_with_retry(tool_call: Dict, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await execute_tool(tool_call)
        except TemporaryError as e:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # 指数退避
                continue
            raise
        except PermanentError as e:
            # 永久错误,不重试
            raise

# ❌ 不好的实践:所有错误都重试
async def execute_with_retry(tool_call: Dict, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await execute_tool(tool_call)
        except Exception as e:  # 捕获所有异常
            if attempt < max_retries - 1:
                continue
            raise

常见问题与解决方案

Q1: 缓存数据不一致怎么办?

问题:缓存中的数据可能与实际数据不一致。

解决方案

  1. 设置合理的TTL
  2. 关键操作后主动失效缓存
  3. 实现缓存版本号机制
# 解决方案示例
async def update_uav_status(device_id: str, status: str):
    # 1. 更新数据库
    await db.update_uav_status(device_id, status)
    
    # 2. 失效相关缓存
    cache_service.invalidate_cache(pattern=f"uav_list:*")
    cache_service.invalidate_cache(pattern=f"uav_detail:{device_id}")

Q2: 向量检索不准确怎么办?

问题:语义搜索返回的结果不相关。

解决方案

  1. 调整相似度阈值
  2. 使用更好的Embedding模型
  3. 结合精确匹配
# 解决方案示例
def hybrid_search(query: str):
    # 1. 精确匹配
    exact_results = exact_match(query)
    if exact_results:
        return exact_results
    
    # 2. 语义检索(提高阈值)
    semantic_results = vector_store.search(
        query=query,
        threshold=0.7  # 提高阈值,更严格
    )
    
    # 3. 如果语义检索结果太少,降低阈值
    if len(semantic_results) < 3:
        semantic_results = vector_store.search(
            query=query,
            threshold=0.5
        )
    
    return semantic_results

Q3: 工具调用超时怎么办?

问题:工具调用时间过长,影响用户体验。

解决方案

  1. 设置超时时间
  2. 实现异步执行
  3. 提供进度反馈
# 解决方案示例
async def execute_with_timeout(tool_call: Dict, timeout: float = 30.0):
    try:
        return await asyncio.wait_for(
            execute_tool(tool_call),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        logger.warning(f"工具调用超时: {tool_call['name']}")
        return {
            "success": False,
            "error": "工具调用超时,请稍后重试"
        }

Q4: 如何优化检索性能?

问题:向量检索速度慢。

解决方案

  1. 使用索引(如HNSW)
  2. 限制检索范围
  3. 缓存检索结果
# 解决方案示例
class OptimizedVectorStore:
    def __init__(self):
        self.search_cache: Dict[str, List] = {}
    
    def search(self, query: str, collection_name: str, top_k: int = 5):
        # 1. 检查缓存
        cache_key = f"{query}:{collection_name}:{top_k}"
        if cache_key in self.search_cache:
            return self.search_cache[cache_key]
        
        # 2. 执行检索
        results = self._do_search(query, collection_name, top_k)
        
        # 3. 缓存结果(TTL较短)
        self.search_cache[cache_key] = results
        asyncio.create_task(self._expire_cache(cache_key, ttl=60))
        
        return results

总结

本文详细介绍了 AI Agent 的三个核心模块:

  1. Memory 模块:负责存储和管理对话历史、上下文信息

    • ContextDataCache:智能缓存
    • ConversationPersistenceHelper:对话持久化
    • AgenticState:状态管理
  2. Retrieval 模块:负责从记忆库中检索相关信息

    • ChromaVectorStore:向量存储和语义搜索
    • 混合检索策略:精确匹配 + 语义检索
  3. Tool/Action 模块:负责执行具体操作

    • ToolExecutionService:工具执行服务
    • ReactAgent:动态推理执行
    • ImprovedMCPToolManager:工具管理

这三个模块协同工作,构成了一个完整的 AI Agent 系统。通过合理的设计和实现,可以构建出高效、可靠的智能Agent。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐