系列文章目录



前言

本文记录在 campusai 项目中,针对“后台通过 RabbitMQ 推送变更消息,前端按消息类型派发向量化处理”的设计与实现。学习工厂模式在项目中的落地思路,方便工程化复用与扩展。


一、问题与场景

  • 若依后台在对 notice/materials 等表做增删改时,把变更通过 RabbitMQ 发到 AI 应用。
  • 消息形如:{ ids: [“123”], operation: “ADD|UPDATE|DELETE”, type: “notice|materials|…” }。
  • 目标:按 type 动态派发不同向量化处理(add/update/delete),保证低耦合、运行时可扩展、最小化接收端改动。
  • 向量化目的:把文本/文档内容映射为语义向量(embeddings),能够做语义检索、相似度匹配和 RAG(检索增强生成),弥补关键词检索在语义理解上的不足。
  • 实际价值:用户提问时通过向量检索出最相关的知识片段,提供上下文给大模型,提升回答准确性与召回率。SpringAI读取向量数据及知识库的更新如图所示:
    在这里插入图片描述

公告栏逻辑说明:

  1. 管理员发公告:若依(后台)生成notice表给Mysql(包括controller,用一个id写入数据库),Mysql数据库表中的数据向量化为AI能识别的数据(知识库,用redis实现向量库)(引入rabbitMQ传递id进行知识库的写入)
  2. 修改后台生成的controller,使若依后台发给Mysql的同时也发给MessageQuene,MQ发给AI应用(frontend)id号,AI应用根据id号从Mysql中读取相应id对应的数据,最后由AI应用将读回的数据写入知识库(向量库vector)中。知识库返回向量库id号与会话id共同写入中间表中便于删除操作
  3. 管理员删除公告:若依后台删除Mysql表中的相关数据,将id号利用MQ传给AI应用,由frontend用id查找Mysql中的document_ids表(source_id & document_id:notice表的id号 & 向量库中的id号),获取到向量库中的对应id号,frontend再删除向量库中的对应数据,

二、设计要点

  1. 抽象接口:定义 IVectorService,统一 add/update/delete 与 type()。
  2. 工厂注册:VectorServiceFactory 在 Spring 启动时收集所有 IVectorService,实现 type->实现 映射(并检测重复)。
  3. 接收端职责单一:CampusaiMessageReceiver 只负责解析消息并委派给工厂返回的实现执行操作。
  4. 扩展零改动:新增类型仅需新增实现并注册为 Bean(返回不同 type)。

三、关键代码

接口:IVectorService

// ...existing code...
public interface IVectorService {
    void add(String sourceId);
    void update(String sourceId);
    void delete(String sourceId);
    /**
     * 返回本处理器负责的 type 标识,例如 "notice"
     */
    String type();
}
// ...existing code...

工厂:VectorServiceFactory(启动时注册)

// ...existing code...
@Component
public class VectorServiceFactory {
    private final List<IVectorService> services;
    private final Map<String, IVectorService> registry = new HashMap<>();

    @Autowired
    public VectorServiceFactory(List<IVectorService> services){
        this.services = services;
    }

    @PostConstruct
    public void init(){
        for (IVectorService s : services) {
            String t = s.type();
            if (registry.containsKey(t)) {
                throw new IllegalStateException("duplicate IVectorService type: " + t);
            }
            registry.put(t, s);
        }
    }

    public IVectorService get(String type){
        IVectorService s = registry.get(type);
        if (s == null) throw new IllegalArgumentException("unknown vector service type: " + type);
        return s;
    }
}
// ...existing code...

消息 DTO(轻量)

// ...existing code...
public class MessageDto {
    private List<String> ids;
    private String operation;
    private String type;
    // getters/setters省略
}
// ...existing code...

接收器:CampusaiMessageReceiver(解析并委派)

// ...existing code...
@Component
public class CampusaiMessageReceiver {
    @Autowired private VectorServiceFactory factory;
    private final ObjectMapper mapper = new ObjectMapper();

    public void receiveMessage(String message) {
        try {
            MessageDto dto = mapper.readValue(message, MessageDto.class);
            if (dto.getIds() == null || dto.getIds().isEmpty()) return;
            IVectorService svc = factory.get(dto.getType());
            String op = dto.getOperation() == null ? "" : dto.getOperation().toUpperCase();
            for (String id : dto.getIds()) {
                switch (op) {
                    case "ADD": svc.add(id); break;
                    case "UPDATE": svc.update(id); break;
                    case "DELETE": svc.delete(id); break;
                    default: throw new IllegalArgumentException("unknown operation: " + dto.getOperation());
                }
            }
        } catch (Exception e) {
            // 生产环境:日志、告警、重试/死信
            e.printStackTrace();
        }
    }
}
// ...existing code...

实现示例(NoticeVectorServiceImpl)

// ...existing code...
@Service
public class NoticeVectorServiceImpl implements IVectorService {

    @Autowired
    private NoticeMapper noticeMapper;

    @Autowired
    private DocumentIdsMapper documentIdsMapper;

    @Autowired
    private VectorStoreClient vectorStore;

    @Override
    public void add(String sourceId) {
        Notice notice = noticeMapper.selectById(Long.valueOf(sourceId));
        if (notice == null) return;
        List<Document> docs = DocumentSplitter.splitText(notice.getContent());
        List<String> documentIds = vectorStore.addDocuments(docs);
        documentIds.forEach(docId -> documentIdsMapper.insert(sourceId, docId));
    }

    @Override
    public void update(String sourceId) {
        delete(sourceId);
        add(sourceId);
    }

    @Override
    public void delete(String sourceId) {
        List<String> docIds = documentIdsMapper.findBySourceId(sourceId);
        if (docIds == null || docIds.isEmpty()) return;
        docIds.forEach(vectorStore::deleteDocument);
        documentIdsMapper.deleteBySourceId(sourceId);
    }

    @Override
    public String type() { return "notice"; }
}
// ...existing code...

四、流程描述

  1. 若依后台写入 notice -> Controller 同步写 DB 并调用 RabbitSendService 发送 MQ 消息(ids, operation, type)。
  2. Frontend 的 CampusaiMessageReceiver 消费消息,解析 MessageDto。
  3. Receiver 调用 VectorServiceFactory.get(type) 获得实现,并按 operation 调用 add/update/delete。
  4. 实现里读 DB -> 拆分为 Document -> 写入 VectorStore -> 将 sourceId<->documentId 写入 document_ids 中间表。

五、测试

  • 在若依后台新增一条 notice:观察到后台调用 RabbitSendService 发送 MQ。
    在这里插入图片描述
    在这里插入图片描述

  • 暂时不运行前台,在RabbitMQ 管理界面查看到消息。因为这个信息还未被消费,所以一直存在。
    在这里插入图片描述

  • 运行前台,在 frontend 日志确认 Receiver 收到并执行。
    在这里插入图片描述

  • 检查 Redis-stack(向量库)有新增条目。
    在这里插入图片描述

  • 检查 document_ids 表写入映射。
    在这里插入图片描述
    在这里插入图片描述


六、遇见的坑

  • VectorStore 无 update:必须 delete 后再 add,需注意短时间内查询不到向量的问题(可做版本号/时间戳校验)。
  • 批量与并发:针对大量 ids 做批处理或并发限流,避免压垮向量库。
  • 工厂冲突:若两个实现返回相同 type,init 时应抛错或用配置显式指定主实现(@Primary 也可)。

总结

工厂模式把“按 type 派发”的复杂度隔离到注册/查找层,接收端只关心消息解析与委派,业务实现专注向量化细节。项目中把责任清晰拆分后,新增类型几乎零改动接收链路,便于长期维护与扩展。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐