目前在评估Coze这个开源版本作为一个音频教育产品的智能体实现,分析下来还是有很多需要优化的细节,发出来给大家分享

SingleAgent历史记录机制分析文档

1. 设备与Agent之间、设备与对话记录的关系

1.1 SingleAgent与用户关联机制

1.1.1 核心数据结构
type SingleAgent struct {
    AgentID     int64
    CreatorID   int64  // 创建者ID
    SpaceID     int64
    ConnectorID int64  // 连接器ID - 关键字段
    // ... 其他字段
}

type AgentIdentity struct {
    AgentID     int64
    Version     string
    IsDraft     bool
    ConnectorID int64  // 连接器标识
}

type ExecuteRequest struct {
    Identity *AgentIdentity
    UserID   string  // 用户ID - 字符串类型,支持任何身份标识
    // ... 其他字段
}
1.1.2 关联关系分析
  • SingleAgent不强制与用户关联:系统设计灵活,支持多种身份标识方式
  • 连接器机制:通过ConnectorID区分不同的接入方式(Web SDK、API、Coze等)
  • 用户标识灵活性UserID为字符串类型,可以是真实用户ID,也可以是设备标识

1.2 IoT设备关联方案

1.2.1 方案一:设备ID作为UserID
executeReq := &model.ExecuteRequest{
    Identity: &model.AgentIdentity{
        AgentID:     agentID,
        ConnectorID: IoTConnectorID,
        Version:     version,
        IsDraft:     false,
    },
    UserID: fmt.Sprintf("device_%s", deviceID), // 设备ID作为用户标识
}
1.2.2 方案二:创建专门的IoT连接器
  • 在连接器服务中添加IoT设备类型
  • 为IoT设备创建专门的ConnectorID
  • 在连接器配置中处理设备认证和授权
1.2.3 方案三:扩展身份标识结构
type AgentIdentity struct {
    AgentID     int64
    Version     string
    IsDraft     bool
    ConnectorID int64
    DeviceID    string  // 新增设备ID字段
    DeviceType  string  // 设备类型
}

2. 历史记录的存储过程及机制分析

2.1 存储架构

2.1.1 分层存储结构
┌─────────────────┐
│   Conversation  │ ← 会话层(一对一关系)
│   (会话信息表)    │
└─────────────────┘
         │
         ├── ID (主键)
         ├── ConnectorID (业务线ID)
         ├── AgentID (智能体ID)
         ├── CreatorID (创建者ID) ← 关键关联字段
         └── SectionID (最新段落ID)
         
┌─────────────────┐
│     Message     │ ← 消息层(一对多关系)
│    (消息表)      │
└─────────────────┘
         │
         ├── ID (主键)
         ├── RunID (执行ID)
         ├── ConversationID (会话ID) ← 关联会话
         ├── UserID (用户ID) ← 关键关联字段
         ├── AgentID (智能体ID)
         └── SectionID (段落ID)
2.1.2 关联关系
  • Conversation层面:通过CreatorID关联用户,用于权限控制和会话归属
  • Message层面:通过UserID关联用户,支持任何形式的身份标识
  • 查询机制:先通过用户ID获取会话,再通过会话ID查询消息

2.2 消息持久化机制

2.2.1 流式处理架构
用户输入 → 预创建消息 → 流式执行 → 实时持久化 → 完成确认
    ↓           ↓           ↓           ↓           ↓
  请求接收    PreCreate    事件处理    数据库写入    状态更新
2.2.2 预创建机制(伪缓冲)
func (dao *MessageDAO) PreCreate(ctx context.Context, msg *entity.Message) (*entity.Message, error) {
    poData, err := dao.messageDO2PO(ctx, msg)
    if err != nil {
        return nil, err
    }
    return dao.messagePO2DO(poData), nil  // 只转换,不写入数据库
}
2.2.3 实时持久化流程
func (c *runImpl) push(ctx context.Context, mainChan chan *entity.AgentRespEvent, sw *schema.StreamWriter[*entity.AgentRunResponse], rtDependence *runtimeDependence) {
    for {
        chunk, ok := <-mainChan
        switch chunk.EventType {
        case message.MessageTypeFunctionCall:
            // 函数调用消息立即持久化
        case message.MessageTypeToolResponse:
            // 工具响应消息立即持久化
        case message.MessageTypeAnswer:
            // 答案消息流式持久化
        }
    }
}

2.3 历史记录查询机制

2.3.1 每次对话的查询流程
func (c *runImpl) handlerHistory(ctx context.Context, rtDependence *runtimeDependence) ([]*msgEntity.Message, error) {
    // 1. 确定历史轮数(默认100轮)
    conversationTurns := entity.ConversationTurnsDefault // 100
    
    // 2. 查询RunRecord表
    runRecordList, err := c.RunRecordRepo.List(ctx, rtDependence.runMeta.ConversationID, rtDependence.runMeta.SectionID, conversationTurns)
    
    // 3. 根据RunID查询Message表
    runIDS := c.getRunID(runRecordList)
    history, err := crossmessage.DefaultSVC().GetByRunIDs(ctx, rtDependence.runMeta.ConversationID, runIDS)
    
    return history, nil
}
2.3.2 数据库查询操作
// RunRecord查询
func (dao *MessageDAO) List(ctx context.Context, conversationID int64, limit int, cursor int64, direction entity.ScrollPageDirection, messageType *message.MessageType) ([]*entity.Message, bool, error) {
    m := dao.query.Message
    do := m.WithContext(ctx).Debug().Where(m.ConversationID.Eq(conversationID))
    // 直接查询数据库,无缓存
    messageList, err := do.Find()
    return dao.batchMessagePO2DO(messageList), hasMore, nil
}

// Message查询
func (dao *MessageDAO) GetByRunIDs(ctx context.Context, runIDs []int64, orderBy string) ([]*entity.Message, error) {
    m := dao.query.Message
    do := m.WithContext(ctx).Debug().Where(m.RunID.In(runIDs...))
    // 直接查询数据库,无缓存
    poList, err := do.Find()
    return dao.batchMessagePO2DO(poList), nil
}

2.4 前端缓冲机制

2.4.1 流式消息缓冲
export class StreamBufferHelper {
  // 流式消息缓存
  streamMessageBuffer: Message<ContentType>[] = [];
  streamChunkBuffer: ChunkRaw[] = [];

  concatContentAndUpdateMessage(message: Message<ContentType>) {
    // 在内存中合并和更新消息内容
    const previousIndex = this.streamMessageBuffer.findIndex(
      item => item.message_id === message.message_id,
    );
    // 新增或更新消息
  }
}
2.4.2 本地消息管理
export class PreSendLocalMessageEventsManager {
  private preSendLocalMessageEventsMap: Map<string, PreSendLocalMessage<ContentType>> = new Map();
  
  // 缓存本地预发送消息
  add(message: Message<ContentType>) {
    this.preSendLocalMessageEventsMap.set(message.extra_info.local_message_id, message);
  }
}

3. 存在的缺陷

3.1 性能缺陷

3.1.1 每次对话都要查询数据库

问题描述

  • 每次用户输入都会触发完整的历史记录查询流程
  • 需要执行2次数据库查询:RunRecord表 + Message表
  • 默认查询100轮历史记录

性能影响

对话轮次    查询数据量    延迟影响
第1次      0轮历史      ~15-25ms
第10次     9轮历史      ~25-40ms
第50次     49轮历史     ~40-70ms
第100次    99轮历史     ~60-100ms
3.1.2 重复查询相同数据

问题描述

  • 相同的历史数据在每次对话中被重复查询
  • 没有缓存机制,浪费数据库资源和网络带宽
  • 高并发场景下数据库成为瓶颈
3.1.3 内存资源浪费

问题描述

  • 历史消息在每次对话中重新加载到内存
  • 大量重复的内存分配和释放
  • 特别是多模态内容(图片、音频、视频)占用大量内存

3.2 架构缺陷

3.2.1 缺乏缓存机制

问题描述

  • 后端没有实现传统的缓存机制
  • 只有前端实现了流式缓冲,但仅限于单次对话
  • 缺乏多级缓存设计(内存缓存 + Redis缓存)
3.2.2 直接数据库读写

问题描述

func (dao *MessageDAO) Create(ctx context.Context, msg *entity.Message) (*entity.Message, error) {
    // 直接写入数据库,无缓冲机制
    do := dao.query.Message.WithContext(ctx).Debug()
    cErr := do.Create(poData)
    return dao.messagePO2DO(poData), nil
}
3.2.3 扩展性问题

问题描述

  • 随着用户数量和对话轮数增加,性能线性下降
  • 数据库连接池可能成为瓶颈
  • 缺乏水平扩展能力

3.3 设计缺陷

3.3.1 历史轮数限制不合理

问题描述

  • 默认100轮历史记录过多,大部分场景用不到
  • 没有根据智能体类型或场景动态调整
  • 缺乏智能的历史记录裁剪机制
3.3.2 上下文管理不当

问题描述

  • 每次都加载完整历史,无法按需加载
  • 缺乏上下文压缩和摘要机制
  • 没有考虑Token限制对历史记录的影响

3.4 具体数据流分析

3.4.1 单次对话的数据库访问
用户输入 → AgentRun → handlerHistory → 2次DB查询 → 构建历史 → AI推理
3.4.2 多轮对话的累积影响
第1轮:2次DB查询(0条历史记录)
第2轮:2次DB查询(1轮历史记录)
第3轮:2次DB查询(2轮历史记录)
...
第N轮:2次DB查询(N-1轮历史记录)

总计:2N次数据库查询,查询数据量呈线性增长

4. 优化建议

4.1 引入多级缓存机制

type ConversationCache struct {
    // L1: 内存缓存
    memoryCache map[string][]*Message
    
    // L2: Redis缓存
    redisCache *redis.Client
    
    // L3: 数据库
    database MessageDAO
}

4.2 实现智能预加载

  • 在用户开始对话前异步预加载历史记录
  • 根据用户行为模式预测需要的历史数据
  • 实现增量更新机制

4.3 优化历史记录管理

  • 动态调整历史轮数限制
  • 实现上下文压缩和摘要
  • 支持按需加载历史记录

4.4 改进数据库设计

  • 添加合适的索引优化查询性能
  • 考虑分库分表支持水平扩展
  • 实现读写分离减轻数据库压力

5. 总结

SingleAgent的历史记录机制在功能上是完整的,支持灵活的设备关联和实时的消息持久化。但在性能和架构设计上存在明显缺陷,特别是每次对话都要从数据库查询历史记录的设计,会导致严重的性能问题。

关键问题

  1. 缺乏缓存机制,每次对话都要查询数据库
  2. 重复查询相同数据,浪费资源
  3. 随着对话轮数增加,性能线性下降
  4. 高并发场景下数据库成为瓶颈

建议优先级

  1. 高优先级:引入Redis缓存机制
  2. 中优先级:实现智能历史记录管理
  3. 低优先级:优化数据库设计和架构
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐