【Spring AI实战:按消息类型派发的 VectorServiceFactory 设计与实现】
本文介绍了基于Spring AI的多格式文档向量化微服务架构设计,通过工厂模式+模板模式实现动态派发处理。系统通过RabbitMQ接收后台变更消息(如公告/资料增删改),根据消息类型自动调用对应的向量化服务。设计包含IVectorService接口抽象、VectorServiceFactory工厂类、消息接收器CampusaiMessageReceiver等核心组件,实现低耦合、可扩展的架构。当新
·
系列文章目录
前言
本文记录在 campusai 项目中,针对“后台通过 RabbitMQ 推送变更消息,前端按消息类型派发向量化处理”的设计与实现。学习工厂模式在项目中的落地思路,方便工程化复用与扩展。
一、问题与场景
- 若依后台在对 notice/materials 等表做增删改时,把变更通过 RabbitMQ 发到 AI 应用。
- 消息形如:{ ids: [“123”], operation: “ADD|UPDATE|DELETE”, type: “notice|materials|…” }。
- 目标:按 type 动态派发不同向量化处理(add/update/delete),保证低耦合、运行时可扩展、最小化接收端改动。
- 向量化目的:把文本/文档内容映射为语义向量(embeddings),能够做语义检索、相似度匹配和 RAG(检索增强生成),弥补关键词检索在语义理解上的不足。
- 实际价值:用户提问时通过向量检索出最相关的知识片段,提供上下文给大模型,提升回答准确性与召回率。SpringAI读取向量数据及知识库的更新如图所示:

公告栏逻辑说明:
- 管理员发公告:若依(后台)生成notice表给Mysql(包括controller,用一个id写入数据库),Mysql数据库表中的数据向量化为AI能识别的数据(知识库,用redis实现向量库)(引入rabbitMQ传递id进行知识库的写入)
- 修改后台生成的controller,使若依后台发给Mysql的同时也发给MessageQuene,MQ发给AI应用(frontend)id号,AI应用根据id号从Mysql中读取相应id对应的数据,最后由AI应用将读回的数据写入知识库(向量库vector)中。知识库返回向量库id号与会话id共同写入中间表中便于删除操作
- 管理员删除公告:若依后台删除Mysql表中的相关数据,将id号利用MQ传给AI应用,由frontend用id查找Mysql中的document_ids表(source_id & document_id:notice表的id号 & 向量库中的id号),获取到向量库中的对应id号,frontend再删除向量库中的对应数据,
二、设计要点
- 抽象接口:定义 IVectorService,统一 add/update/delete 与 type()。
- 工厂注册:VectorServiceFactory 在 Spring 启动时收集所有 IVectorService,实现 type->实现 映射(并检测重复)。
- 接收端职责单一:CampusaiMessageReceiver 只负责解析消息并委派给工厂返回的实现执行操作。
- 扩展零改动:新增类型仅需新增实现并注册为 Bean(返回不同 type)。
三、关键代码
接口:IVectorService
// ...existing code...
public interface IVectorService {
void add(String sourceId);
void update(String sourceId);
void delete(String sourceId);
/**
* 返回本处理器负责的 type 标识,例如 "notice"
*/
String type();
}
// ...existing code...
工厂:VectorServiceFactory(启动时注册)
// ...existing code...
@Component
public class VectorServiceFactory {
private final List<IVectorService> services;
private final Map<String, IVectorService> registry = new HashMap<>();
@Autowired
public VectorServiceFactory(List<IVectorService> services){
this.services = services;
}
@PostConstruct
public void init(){
for (IVectorService s : services) {
String t = s.type();
if (registry.containsKey(t)) {
throw new IllegalStateException("duplicate IVectorService type: " + t);
}
registry.put(t, s);
}
}
public IVectorService get(String type){
IVectorService s = registry.get(type);
if (s == null) throw new IllegalArgumentException("unknown vector service type: " + type);
return s;
}
}
// ...existing code...
消息 DTO(轻量)
// ...existing code...
public class MessageDto {
private List<String> ids;
private String operation;
private String type;
// getters/setters省略
}
// ...existing code...
接收器:CampusaiMessageReceiver(解析并委派)
// ...existing code...
@Component
public class CampusaiMessageReceiver {
@Autowired private VectorServiceFactory factory;
private final ObjectMapper mapper = new ObjectMapper();
public void receiveMessage(String message) {
try {
MessageDto dto = mapper.readValue(message, MessageDto.class);
if (dto.getIds() == null || dto.getIds().isEmpty()) return;
IVectorService svc = factory.get(dto.getType());
String op = dto.getOperation() == null ? "" : dto.getOperation().toUpperCase();
for (String id : dto.getIds()) {
switch (op) {
case "ADD": svc.add(id); break;
case "UPDATE": svc.update(id); break;
case "DELETE": svc.delete(id); break;
default: throw new IllegalArgumentException("unknown operation: " + dto.getOperation());
}
}
} catch (Exception e) {
// 生产环境:日志、告警、重试/死信
e.printStackTrace();
}
}
}
// ...existing code...
实现示例(NoticeVectorServiceImpl)
// ...existing code...
@Service
public class NoticeVectorServiceImpl implements IVectorService {
@Autowired
private NoticeMapper noticeMapper;
@Autowired
private DocumentIdsMapper documentIdsMapper;
@Autowired
private VectorStoreClient vectorStore;
@Override
public void add(String sourceId) {
Notice notice = noticeMapper.selectById(Long.valueOf(sourceId));
if (notice == null) return;
List<Document> docs = DocumentSplitter.splitText(notice.getContent());
List<String> documentIds = vectorStore.addDocuments(docs);
documentIds.forEach(docId -> documentIdsMapper.insert(sourceId, docId));
}
@Override
public void update(String sourceId) {
delete(sourceId);
add(sourceId);
}
@Override
public void delete(String sourceId) {
List<String> docIds = documentIdsMapper.findBySourceId(sourceId);
if (docIds == null || docIds.isEmpty()) return;
docIds.forEach(vectorStore::deleteDocument);
documentIdsMapper.deleteBySourceId(sourceId);
}
@Override
public String type() { return "notice"; }
}
// ...existing code...
四、流程描述
- 若依后台写入 notice -> Controller 同步写 DB 并调用 RabbitSendService 发送 MQ 消息(ids, operation, type)。
- Frontend 的 CampusaiMessageReceiver 消费消息,解析 MessageDto。
- Receiver 调用 VectorServiceFactory.get(type) 获得实现,并按 operation 调用 add/update/delete。
- 实现里读 DB -> 拆分为 Document -> 写入 VectorStore -> 将 sourceId<->documentId 写入 document_ids 中间表。
五、测试
-
在若依后台新增一条 notice:观察到后台调用 RabbitSendService 发送 MQ。


-
暂时不运行前台,在RabbitMQ 管理界面查看到消息。因为这个信息还未被消费,所以一直存在。

-
运行前台,在 frontend 日志确认 Receiver 收到并执行。

-
检查 Redis-stack(向量库)有新增条目。

-
检查 document_ids 表写入映射。


六、遇见的坑
- VectorStore 无 update:必须 delete 后再 add,需注意短时间内查询不到向量的问题(可做版本号/时间戳校验)。
- 批量与并发:针对大量 ids 做批处理或并发限流,避免压垮向量库。
- 工厂冲突:若两个实现返回相同 type,init 时应抛错或用配置显式指定主实现(@Primary 也可)。
总结
工厂模式把“按 type 派发”的复杂度隔离到注册/查找层,接收端只关心消息解析与委派,业务实现专注向量化细节。项目中把责任清晰拆分后,新增类型几乎零改动接收链路,便于长期维护与扩展。
更多推荐

所有评论(0)