基于Spring AI的分布式在线考试系统-事件处理架构
/ 1. 分布式事件基类@Data/*** 事件唯一ID(雪花算法生成)*//*** 事件类型*//*** 事件版本*//*** 触发时间*//*** 来源服务*//*** 业务数据*//*** 扩展属性*/// 2. 考试提交事件@Data// 使用时间(秒)// 设置payload));// 3. AI评分完成事件@Data));
·
基于Spring AI的分布式在线考试系统完整架构
一、完整系统架构图
二、核心时序图
2.1 考生提交试卷完整流程时序图
2.2 Spring Event + MQ 事件桥接时序图
三、核心代码实现
3.1 统一事件模型定义
// 1. 分布式事件基类
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public abstract class BaseEvent {
/**
* 事件唯一ID(雪花算法生成)
*/
private String eventId;
/**
* 事件类型
*/
private String eventType;
/**
* 事件版本
*/
private Integer version = 1;
/**
* 触发时间
*/
private LocalDateTime triggerTime;
/**
* 来源服务
*/
private String sourceService;
/**
* 业务数据
*/
private Map<String, Object> payload;
/**
* 扩展属性
*/
private Map<String, Object> extensions = new HashMap<>();
protected BaseEvent(String eventType, String sourceService, Map<String, Object> payload) {
this.eventId = IdUtil.getSnowflakeNextIdStr();
this.eventType = eventType;
this.sourceService = sourceService;
this.payload = payload;
this.triggerTime = LocalDateTime.now();
}
}
// 2. 考试提交事件
@Data
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class ExamSubmitEvent extends BaseEvent {
private Long examId;
private Long userId;
private Long paperId;
private LocalDateTime submitTime;
private Integer useTime; // 使用时间(秒)
public ExamSubmitEvent(Long examId, Long userId, Long paperId,
LocalDateTime submitTime, Integer useTime) {
super("EXAM_SUBMIT", "exam-service", new HashMap<>());
this.examId = examId;
this.userId = userId;
this.paperId = paperId;
this.submitTime = submitTime;
this.useTime = useTime;
// 设置payload
this.setPayload(Map.of(
"examId", examId,
"userId", userId,
"paperId", paperId,
"submitTime", submitTime,
"useTime", useTime
));
}
}
// 3. AI评分完成事件
@Data
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class ScoreCompleteEvent extends BaseEvent {
private Long examId;
private Long userId;
private BigDecimal score;
private String aiComment;
private LocalDateTime scoreTime;
private String scoreType; // AI/AUTO/MANUAL
public ScoreCompleteEvent(Long examId, Long userId, BigDecimal score,
String aiComment, String scoreType) {
super("SCORE_COMPLETE", "score-service", new HashMap<>());
this.examId = examId;
this.userId = userId;
this.score = score;
this.aiComment = aiComment;
this.scoreType = scoreType;
this.scoreTime = LocalDateTime.now();
this.setPayload(Map.of(
"examId", examId,
"userId", userId,
"score", score,
"aiComment", aiComment,
"scoreType", scoreType
));
}
}
3.2 事件桥接器实现
// 1. 事件桥接器接口
public interface EventBridge {
/**
* 发布本地事件
*/
void publishLocalEvent(ApplicationEvent event);
/**
* 发送MQ事件
*/
<T extends BaseEvent> void sendMqEvent(T event);
/**
* 发布本地并发送MQ(双写)
*/
<T extends BaseEvent> void publishAndSend(T event, ApplicationEvent localEvent);
/**
* 接收MQ消息并转换为本地事件
*/
<T extends BaseEvent> void receiveAndPublish(T mqEvent,
Class<? extends ApplicationEvent> localEventClass);
}
// 2. 事件桥接器实现
@Component
@Slf4j
public class EventBridgeImpl implements EventBridge, ApplicationContextAware {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private ApplicationContext applicationContext;
// 事件类型到Topic的映射
private static final Map<String, String> EVENT_TOPIC_MAPPING = Map.of(
"EXAM_SUBMIT", "EXAM_EVENT_TOPIC",
"SCORE_COMPLETE", "SCORE_EVENT_TOPIC",
"NOTIFY_SEND", "NOTIFY_EVENT_TOPIC"
);
// 事件类型到Tag的映射
private static final Map<String, String> EVENT_TAG_MAPPING = Map.of(
"EXAM_SUBMIT", "exam_submit",
"SCORE_COMPLETE", "score_complete"
);
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void publishLocalEvent(ApplicationEvent event) {
log.info("发布本地事件: {}", event.getClass().getSimpleName());
eventPublisher.publishEvent(event);
}
@Override
public <T extends BaseEvent> void sendMqEvent(T event) {
String topic = EVENT_TOPIC_MAPPING.get(event.getEventType());
String tag = EVENT_TAG_MAPPING.get(event.getEventType());
if (topic == null || tag == null) {
log.warn("未找到事件类型映射: {}", event.getEventType());
return;
}
try {
// 幂等检查
if (isEventProcessed(event.getEventId())) {
log.info("事件已处理,跳过发送: {}", event.getEventId());
return;
}
// 发送MQ消息
Message<T> message = MessageBuilder.withPayload(event)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.build();
rocketMQTemplate.syncSend(topic + ":" + tag, message);
// 记录事件已发送
markEventAsSent(event.getEventId());
log.info("发送MQ事件成功: topic={}, tag={}, eventId={}",
topic, tag, event.getEventId());
} catch (Exception e) {
log.error("发送MQ事件失败: {}", event.getEventType(), e);
throw new EventSendException("发送MQ事件失败", e);
}
}
@Override
public <T extends BaseEvent> void publishAndSend(T event, ApplicationEvent localEvent) {
// 先发布本地事件
publishLocalEvent(localEvent);
// 再发送MQ事件
sendMqEvent(event);
}
@Override
public <T extends BaseEvent> void receiveAndPublish(T mqEvent,
Class<? extends ApplicationEvent> localEventClass) {
// 幂等检查
if (isEventProcessed(mqEvent.getEventId())) {
log.info("事件已处理,跳过: {}", mqEvent.getEventId());
return;
}
try {
// 通过反射创建本地事件实例
Constructor<? extends ApplicationEvent> constructor =
localEventClass.getConstructor(Object.class, BaseEvent.class);
ApplicationEvent localEvent = constructor.newInstance(this, mqEvent);
// 发布本地事件
publishLocalEvent(localEvent);
// 标记事件已处理
markEventAsProcessed(mqEvent.getEventId());
log.info("MQ事件转换为本地事件成功: {}", mqEvent.getEventType());
} catch (Exception e) {
log.error("转换MQ事件为本地事件失败", e);
throw new EventConvertException("事件转换失败", e);
}
}
private boolean isEventProcessed(String eventId) {
String key = String.format("event:processed:%s", eventId);
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
private void markEventAsSent(String eventId) {
String key = String.format("event:sent:%s", eventId);
redisTemplate.opsForValue().set(key, "1", Duration.ofHours(24));
}
private void markEventAsProcessed(String eventId) {
String key = String.format("event:processed:%s", eventId);
redisTemplate.opsForValue().set(key, "1", Duration.ofHours(24));
}
}
3.3 考试服务-本地事件+MQ发送
// 1. 考试服务实现
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class ExamServiceImpl implements ExamService {
@Autowired
private ExamMapper examMapper;
@Autowired
private ExamRecordMapper examRecordMapper;
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private EventBridge eventBridge;
@Override
public ExamSubmitResult submitExam(ExamSubmitDTO submitDTO) {
Long examId = submitDTO.getExamId();
Long userId = submitDTO.getUserId();
// 1. 检查考试状态
Exam exam = examMapper.selectById(examId);
if (exam == null) {
throw new BusinessException("考试不存在");
}
if (!ExamStatus.IN_PROGRESS.equals(exam.getStatus())) {
throw new BusinessException("考试不在进行中");
}
// 2. 保存考生答案
saveExamAnswers(submitDTO);
// 3. 更新考试记录状态
ExamRecord record = updateExamRecord(submitDTO);
// 4. 发布本地Spring Event
publishLocalSubmitEvent(exam, record, submitDTO);
// 5. 发送MQ分布式事件
sendMqSubmitEvent(exam, record, submitDTO);
return ExamSubmitResult.builder()
.examId(examId)
.userId(userId)
.submitTime(LocalDateTime.now())
.message("试卷提交成功,请等待评分结果")
.build();
}
private void publishLocalSubmitEvent(Exam exam, ExamRecord record, ExamSubmitDTO submitDTO) {
// 本地事件:考试提交事件
ExamSubmitLocalEvent localEvent = new ExamSubmitLocalEvent(
this,
exam.getId(),
record.getUserId(),
record.getPaperId(),
submitDTO.getAnswers()
);
eventPublisher.publishEvent(localEvent);
log.info("发布本地考试提交事件: examId={}, userId={}",
exam.getId(), record.getUserId());
}
private void sendMqSubmitEvent(Exam exam, ExamRecord record, ExamSubmitDTO submitDTO) {
// 构建分布式事件
ExamSubmitEvent mqEvent = ExamSubmitEvent.builder()
.examId(exam.getId())
.userId(record.getUserId())
.paperId(record.getPaperId())
.submitTime(LocalDateTime.now())
.useTime(submitDTO.getUseTime())
.build();
// 通过事件桥接器发送MQ
eventBridge.sendMqEvent(mqEvent);
log.info("发送MQ考试提交事件: examId={}, userId={}",
exam.getId(), record.getUserId());
}
// 本地事件监听器
@Component
@Slf4j
public class ExamLocalEventListener {
@EventListener
@Async
public void handleExamSubmit(ExamSubmitLocalEvent event) {
log.info("处理本地考试提交事件: examId={}, userId={}",
event.getExamId(), event.getUserId());
// 异步处理本地逻辑
try {
// 1. 清理缓存
clearExamCache(event.getExamId(), event.getUserId());
// 2. 释放分布式锁
releaseExamLock(event.getExamId(), event.getUserId());
// 3. 记录操作日志
logExamSubmitOperation(event);
log.info("本地考试提交事件处理完成");
} catch (Exception e) {
log.error("处理本地考试提交事件失败", e);
}
}
private void clearExamCache(Long examId, Long userId) {
String cacheKey = String.format("exam:%d:user:%d", examId, userId);
// 清理Redis缓存
}
private void releaseExamLock(Long examId, Long userId) {
String lockKey = String.format("exam:lock:%d:user:%d", examId, userId);
// 释放分布式锁
}
}
}
3.4 AI评分服务-MQ消费+Spring AI集成
// 1. MQ消费者
@Component
@Slf4j
@RocketMQMessageListener(
topic = "EXAM_EVENT_TOPIC",
selectorExpression = "exam_submit",
consumerGroup = "score-service-exam-submit-group",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = MessageModel.CLUSTERING // 集群模式
)
public class ExamSubmitMQConsumer implements RocketMQListener<ExamSubmitEvent> {
@Autowired
private EventBridge eventBridge;
@Autowired
private AIScoreService aiScoreService;
@Override
public void onMessage(ExamSubmitEvent event) {
log.info("收到考试提交MQ事件: eventId={}, examId={}, userId={}",
event.getEventId(), event.getExamId(), event.getUserId());
try {
// 转换为本地事件
eventBridge.receiveAndPublish(event, AIScoreStartEvent.class);
// 触发AI评分
aiScoreService.startAIScoring(event);
} catch (Exception e) {
log.error("处理考试提交MQ事件失败", e);
throw new RuntimeException("处理考试提交事件失败", e);
}
}
}
// 2. AI评分服务
@Service
@Slf4j
public class AIScoreServiceImpl implements AIScoreService {
@Autowired
private ChatClient chatClient; // Spring AI注入
@Autowired
private ExamServiceClient examServiceClient; // Feign客户端
@Autowired
private EventBridge eventBridge;
@Autowired
private ScoreRecordMapper scoreRecordMapper;
@Override
@Async
public void startAIScoring(ExamSubmitEvent event) {
Long examId = event.getExamId();
Long userId = event.getUserId();
log.info("开始AI评分: examId={}, userId={}", examId, userId);
try {
// 1. 获取考试数据
ExamDetailDTO examDetail = examServiceClient.getExamDetail(examId);
List<QuestionDTO> questions = examDetail.getQuestions();
// 2. 获取考生答案
UserAnswerDTO userAnswer = examServiceClient.getUserAnswer(examId, userId);
// 3. 调用Spring AI进行评分
ScoreResultDTO aiScoreResult = callAIScoring(questions, userAnswer);
// 4. 保存评分结果
saveScoreResult(examId, userId, aiScoreResult);
// 5. 发送评分完成事件
sendScoreCompleteEvent(examId, userId, aiScoreResult);
log.info("AI评分完成: examId={}, userId={}, score={}",
examId, userId, aiScoreResult.getScore());
} catch (Exception e) {
log.error("AI评分失败", e);
// 发送评分失败事件
sendScoreErrorEvent(examId, userId, e.getMessage());
}
}
private ScoreResultDTO callAIScoring(List<QuestionDTO> questions,
UserAnswerDTO userAnswer) {
// 构建评分prompt
String prompt = buildScoringPrompt(questions, userAnswer);
// 调用Spring AI
ChatResponse response = chatClient.call(
new UserMessage(prompt)
);
// 解析AI返回结果
String aiResponse = response.getResult().getOutput().getContent();
return parseAIScoreResult(aiResponse);
}
private String buildScoringPrompt(List<QuestionDTO> questions,
UserAnswerDTO userAnswer) {
StringBuilder prompt = new StringBuilder();
prompt.append("你是在线考试系统的AI评分老师,请对以下考生答卷进行评分:\n\n");
for (int i = 0; i < questions.size(); i++) {
QuestionDTO question = questions.get(i);
String userAnswerText = userAnswer.getAnswers().get(i);
prompt.append(String.format("""
第%d题:
题目:%s
标准答案:%s
考生答案:%s
题目类型:%s
分值:%d分
""",
i + 1,
question.getContent(),
question.getCorrectAnswer(),
userAnswerText,
question.getType(),
question.getScore()
));
}
prompt.append("""
评分要求:
1. 客观题(选择题、判断题)严格按标准答案评分
2. 主观题(简答题、论述题)按要点评分,考虑答案的完整性、准确性
3. 总分100分
4. 返回JSON格式:
{
"totalScore": 85,
"scoreDetails": [
{"questionId": 1, "score": 10, "comment": "答案正确"},
{"questionId": 2, "score": 8, "comment": "答案部分正确"}
],
"aiComment": "整体表现良好,基础知识掌握扎实"
}
""");
return prompt.toString();
}
private void sendScoreCompleteEvent(Long examId, Long userId,
ScoreResultDTO scoreResult) {
ScoreCompleteEvent event = ScoreCompleteEvent.builder()
.examId(examId)
.userId(userId)
.score(scoreResult.getTotalScore())
.aiComment(scoreResult.getAiComment())
.scoreType("AI")
.build();
eventBridge.sendMqEvent(event);
}
}
// 3. AI评分本地事件监听器
@Component
@Slf4j
public class AIScoreEventListener {
@EventListener
@Async
public void handleAIScoreStart(AIScoreStartEvent event) {
log.info("收到AI评分开始事件: examId={}, userId={}",
event.getExamSubmitEvent().getExamId(),
event.getExamSubmitEvent().getUserId());
// 这里可以添加一些预处理逻辑
// 比如:记录开始时间、初始化评分状态等
}
}
四、配置说明
4.1 Spring AI配置
# application.yml
spring:
ai:
# 智谱AI配置
zhipu:
api-key: ${ZHIPU_API_KEY}
chat:
options:
model: glm-4
temperature: 0.7
max-tokens: 2000
# 向量存储配置
vectorstore:
qdrant:
url: ${QDRANT_URL}
collection-name: exam_questions
api-key: ${QDRANT_API_KEY}
# Embedding配置
embedding:
zhipu:
api-key: ${ZHIPU_API_KEY}
options:
model: text_embedding
# RocketMQ配置
cloud:
stream:
rocketmq:
binder:
name-server: ${ROCKETMQ_NAME_SERVER:localhost:9876}
bindings:
exam-event-input:
consumer:
group: score-service-exam-submit-group
subscription: EXAM_EVENT_TOPIC
tags: exam_submit
score-event-output:
producer:
group: score-service-group
topic: SCORE_EVENT_TOPIC
tags: score_complete
4.2 事件存储表设计
-- 事件存储表
CREATE TABLE event_store (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(64) NOT NULL COMMENT '事件ID',
event_type VARCHAR(50) NOT NULL COMMENT '事件类型',
event_data JSON NOT NULL COMMENT '事件数据',
source_service VARCHAR(50) NOT NULL COMMENT '来源服务',
trigger_time DATETIME NOT NULL COMMENT '触发时间',
status VARCHAR(20) DEFAULT 'PENDING' COMMENT '状态:PENDING,PROCESSED,FAILED',
retry_count INT DEFAULT 0 COMMENT '重试次数',
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_event_id (event_id),
KEY idx_event_type (event_type),
KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事件存储表';
-- 幂等控制表
CREATE TABLE idempotent_control (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
business_key VARCHAR(200) NOT NULL COMMENT '业务唯一键',
event_type VARCHAR(50) NOT NULL COMMENT '事件类型',
processed_time DATETIME NOT NULL COMMENT '处理时间',
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_business_key (business_key, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等控制表';
这个架构设计完整展现了基于Spring AI的分布式在线考试系统,通过Spring Event + RocketMQ的事件机制实现了服务内和服务间的解耦,结合Spring AI提供了智能评分、智能出题等AI能力。
更多推荐


所有评论(0)