处理高并发下奖励领取场景
本文针对高并发场景下的奖励发放系统提出了一套完整的解决方案。通过Resilience4j实现接口限流、熔断保护,使用RocketMQ进行异步削峰处理。核心创新点包括:1)基于Redis分布式锁+数据库唯一索引的双重幂等保障;2)结合MQ重试和定时补偿的双重容错机制;3)支持可重复领取的动态bizId生成策略。系统采用最终一致性设计,通过记录状态(PENDING/SUCCESS/FAILED)+异步
(爆肝一万两千字!下去好好沉淀!!)
业务场景分析:
- 高并发: 用户在完成任务后集中领取奖励
- 发奖不能失败和重复:奖励发放不能失败和重复发送,对数据要求十分精准,需要做好高容错处理。
- 可容忍延迟:发奖不需要立即到账,可允许稍微延迟,对实时性要求不高。
- 发奖存在稳定性问题: 在发奖时可能会调用第三方系统,存在不稳定
- 需要具有幂等性:发奖逻辑可以设计为幂等性(taskId + userId)
技术方案选型:
通过对业务的分析,目前就可以选用合适的技术栈进行实现。
- Resilience4j
- 这个工具类轻量易上手,主要用来,限流,熔断,超时保护,防止大量请求集中打到接口导致服务不可用。
- RocketMQ
- 这里使用mq来进行流量削峰,发奖逻辑异步操作作用,将瞬时大量请求转为时间窗口慢慢处理。不阻挡主流程。
代码具体落地:
InDto层
- 领取奖励入参请求体,携带重试次数
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RewardRequest implements Serializable {
private Long userId;
private Integer taskId;
private int retryCount = 0;
}
controller层
- RateLimiter(控制接口每秒最多允许多少个请求,防止刷接口)
- CircuitBreaker(当失败率过高自动熔断,暂停调用保护服务)
TimeLimiter(若内部处理超时(如 MQ 卡顿)自动降级 fallback)
@PostMapping("/receive")
@RateLimiter(name = "rewardRateLimiter")
@CircuitBreaker(name = "rewardCircuitBreaker", fallbackMethod = "fallback")
@TimeLimiter(name = "rewardTimeLimiter")
-
用户的“领取奖励”请求不会同步执行逻辑,而是发消息;
-
控制层立即响应,保证响应速度和用户体验
return CompletableFuture.supplyAsync(() -> {
rewardProducer.sendImmediate(request); // 发MQ消息
return "请求接收成功,奖励处理中";
});
MQ Producer
- sendImmediate: 当消息是首次发送时为立即发送
- sendWithRetry: 当消息处理失败时,发送延时消息
- getDelayLevel: 根据InDto设置的重试次数来决定延时多久。
public void send(RewardRequest request, int delayLevel) {
Message<RewardRequest> message = MessageBuilder.withPayload(request).build();
rocketMQTemplate.syncSend(TOPIC + ":award", message, 3000, delayLevel); // 超时 + 延迟
}
public void sendImmediate(RewardRequest request) {
send(request, 0); // 不延迟
}
public void sendWithRetry(RewardRequest request) {
int retryCount = request.getRetryCount();
if (retryCount >= 3) {
System.err.println("重试超过最大次数,放弃发奖 user=" + request.getUserId());
return;
}
request.setRetryCount(retryCount + 1);
int delayLevel = getDelayLevel(retryCount);
send(request, delayLevel);
}
private int getDelayLevel(int retryCount) {
// RocketMQ支持的延迟等级:1s、5s、10s、30s、1m、2m、3m、4m、5m...(等级1~18)
return switch (retryCount) {
case 1 -> 3; // 10s
case 2 -> 5; // 30s
case 3 -> 7; // 1m
default -> 1;
};
}
MQ Consumer
- processorService.process: 发奖处理逻辑;
public void onMessage(RewardRequest request) {
try {
processorService.process(request);
} catch (Exception e) {
System.err.println("发奖失败,进行重试 user=" + request.getUserId());
rewardProducer.sendWithRetry(request);
}
}
ProcessorService层
- 在这里处理真正的发送逻辑,可以是调用第三方系统,也可以是本地进行逻辑操作。
public void process(RewardRequest request) {
System.out.printf("开始发奖 user=%d, task=%d, 第%d次尝试\n",
request.getUserId(), request.getTaskId(), request.getRetryCount());
// 模拟随机失败
if (Math.random() < 0.3) {
throw new RuntimeException("模拟发奖失败");
}
System.out.println("发奖成功 ✅");
}
额外知识点补充:
幂等具体实现:
业务场景问题分析:
如果没有进行幂等操作,那么如果用户重复提交领取奖励的请求,那么就有可能会出现奖励重复发送。
幂等方案有很多,这里我们用数据库 + redis双保险
代码具体实现:
奖励发放记录表(user_task_reward)
- 这张表记录了用户领取奖励的记录表,其中task_id + user_id设置为唯一索引来保证唯一性。
CREATE TABLE user_task_reward (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
task_id INT NOT NULL,
reward_amount INT,
create_time DATETIME,
UNIQUE KEY uk_user_task (user_id, task_id) -- 幂等唯一约束
);
ProcessorService层
- 首先存入redis中的key 肯定是userId + taskId。
- 使用redis自有的分布式锁来判断该用户是否已经领取奖励了。
- 尝试在db添加记录。
- 当在发放奖励逻辑过程中如果失败,捕获异常,删除redis分布式锁,这样重试的时候可以进入step2。但这样做并没有把情况考虑完全。
public void process(RewardRequest request) {
String key = String.format("reward:lock:%d:%d", request.getUserId(), request.getTaskId());
// STEP 1:用Redis加锁实现幂等防重
Boolean locked = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(5));
if (Boolean.FALSE.equals(locked)) {
System.out.println("已经处理过,跳过发奖");
return;
}
try {
// STEP 2:尝试插入奖励记录(DB层幂等)
UserTaskReward reward = new UserTaskReward();
reward.setUserId(request.getUserId());
reward.setTaskId(request.getTaskId());
reward.setRewardAmount(100);
reward.setCreateTime(LocalDateTime.now());
rewardMapper.insertReward(reward); // 有唯一索引保证幂等
// STEP 3:调用实际发奖逻辑
System.out.printf("✅ 发奖成功 user=%d, task=%d\n",
request.getUserId(), request.getTaskId());
} catch (DuplicateKeyException e) {
System.out.println("数据库唯一索引幂等命中,忽略");
} catch (Exception e) {
System.out.println("发奖失败,释放锁");
redisTemplate.delete(key); // 失败要释放锁,避免永远跳过
throw e;
}
}
新的问题:如果第一次出问题是在step3后呢,记录已经插入,但是发奖逻辑失败,此时就回出现业务假成功(脏写)
解决方案
- 使用事务@Transactional
在方法上加上@Transactional注解,因为使用了try -catch所以在catch块中手动抛异常。看着解决了遇到的问题,但是仍然有问题,目前只能回滚本地数据库操作,如果你在处理发放逻辑中调用了其他服务,其他服务进行了对应的数据操作,那么也可能会造成脏数据。
但如果坚持使用事务来解决问题,就需要考虑到分布式事务(seata,tcc等)但是,不建议,不建议,不建议。重要的事情说三遍
为什么呢? 分布式事务,它本身就是为了协调多个系统之间,多个数据库之间的一个一致性问题。所以可想而知,分布式需要协调多个系统之间的锁和提交状态。这使得分布式事务效率十分低下,性能差。分布式事务架构复杂,维护成本高,拿tcc举例,得为每个服务写三套接口,并且还要考虑其中带来的问题(业务悬挂,其他失败场景)成本很高。分布式事务还容易造成系统锁死,可用性差,当某一个参与者挂掉,那其他参与者都得阻塞等他。当事务协调者也就是(seata或者tcc)挂掉,整个事务也就会卡住或者失败。
@Transactional(rollbackFor = Exception.class)
public void process(RewardRequest request) {
String repeatKey = generateRepeatKey(request); // 支持可重复任务
String key = String.format("reward:lock:%d:%d:%s", request.getUserId(), request.getTaskId(), repeatKey);
// Redis防重复
Boolean locked = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(5));
if (Boolean.FALSE.equals(locked)) {
System.out.println("已处理过,跳过");
return;
}
try {
// STEP 1:插入领取记录
UserTaskReward reward = new UserTaskReward();
reward.setUserId(request.getUserId());
reward.setTaskId(request.getTaskId());
reward.setRepeatKey(repeatKey);
reward.setCreateTime(LocalDateTime.now());
rewardMapper.insertReward(reward); // 有唯一索引
// STEP 2:执行实际发奖逻辑(可能调用远程服务)
actualRewardService.doReward(request);
System.out.println("✅ 发奖成功");
} catch (DuplicateKeyException e) {
System.out.println("数据库幂等命中,跳过");
} catch (Exception e) {
// STEP 3:失败时释放 Redis 锁
redisTemplate.delete(key);
throw e; // 事务回滚
}
}
有没有其他解决方案呢?包的老弟,包的老弟。接下来就用另外一个也是非常推荐的方案使用
2.记录状态 + 异步补偿
补偿机制是实现最终一致性的主流办法。
代码具体实现
// ✅ 核心发奖重试 + 补偿 + 幂等防重模块
表
CREATE TABLE `user_task_reward` (
`id` BIGINT AUTO_INCREMENT COMMENT '主键ID',
`user_id` BIGINT NOT NULL COMMENT '用户ID',
`task_id` INT NOT NULL COMMENT '任务ID',
`reward_type` VARCHAR(32) NOT NULL COMMENT '奖励类型,如 POINTS/COUPON/CASH',
`amount` INT DEFAULT 0 COMMENT '奖励数额(如积分/优惠券面额等)',
`biz_id` VARCHAR(64) NOT NULL COMMENT '幂等ID,如 reward:{taskId}:{userId}:{repeatKey}',
`status` VARCHAR(16) NOT NULL DEFAULT 'PENDING' COMMENT '处理状态:PENDING、SUCCESS、FAILED、ABANDONED',
`retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
`last_retry_time` DATETIME DEFAULT NULL COMMENT '最近重试时间',
`fail_reason` VARCHAR(255) DEFAULT NULL COMMENT '失败原因(最后一次)',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_biz_id` (`biz_id`), -- 幂等唯一键
KEY `idx_user_id` (`user_id`),
KEY `idx_task_id` (`task_id`),
KEY `idx_status_retry` (`status`, `retry_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户任务发奖记录表';
// === 1. 发奖请求 DTO ===
@Data
public class RewardRequest {
private Long userId;
private Integer taskId;
private Integer amount;
private String bizId; // 幂等标识:reward:taskId:userId
private Integer retryCount;
}
// === 2. 发奖记录实体 ===
@Data
public class UserTaskReward {
private Long id;
private Long userId;
private Integer taskId;
private Integer amount;
private String status; // PENDING, SUCCESS, FAILED
private String bizId;
private Integer retryCount;
private LocalDateTime lastRetryTime;
private LocalDateTime createTime;
}
// === 3. 发奖服务(幂等 + 状态落库) ===
@Service
@RequiredArgsConstructor
public class RewardProcessorService {
private final RewardRecordMapper rewardMapper;
private final ActualRewardService actualRewardService;
private final StringRedisTemplate redisTemplate;
public void process(RewardRequest request) {
String lockKey = String.format("reward:lock:%s", request.getBizId());
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofMinutes(5));
if (Boolean.FALSE.equals(locked)) return;
try {
UserTaskReward reward = rewardMapper.findByBizId(request.getBizId());
if (reward != null && "SUCCESS".equals(reward.getStatus())) return;
if (reward == null) {
reward = new UserTaskReward();
reward.setUserId(request.getUserId());
reward.setTaskId(request.getTaskId());
reward.setRewardType(request.getRewardType());
reward.setAmount(request.getAmount());
reward.setStatus("PENDING");
reward.setBizId(request.getBizId());
reward.setCreateTime(LocalDateTime.now());
rewardMapper.insert(reward);
}
actualRewardService.sendReward(request); // ✅ 发奖必须幂等
rewardMapper.updateStatus(reward.getId(), "SUCCESS");
} catch (Exception e) {
rewardMapper.updateStatusWithRetry(request.getBizId(), "FAILED");
redisTemplate.delete(lockKey); // 释放锁,允许补偿重试
throw e;
}
}
}
// === 4. MQ 消费端 + 自定义重试逻辑 ===
@Component
@RequiredArgsConstructor
public class RewardMqListener {
private final RewardProcessorService processorService;
private final RewardProducer rewardProducer;
@RocketMQMessageListener(topic = "reward-topic", consumerGroup = "reward-cg")
public void onMessage(RewardRequest request) {
try {
processorService.process(request);
} catch (Exception e) {
log.error("发奖失败,进入自定义重试,bizId={}", request.getBizId());
rewardProducer.sendWithRetry(request);
}
}
}
// === 5. 定时补偿任务 ===
@Component
@RequiredArgsConstructor
public class RewardCompensator {
private final RewardRecordMapper rewardMapper;
private final RewardProcessorService processorService;
@Scheduled(fixedDelay = 5 * 60 * 1000)
public void retryFailedRewards() {
List<UserTaskReward> failedList = rewardMapper.queryByStatus("FAILED", 3);
for (UserTaskReward r : failedList) {
if ("SUCCESS".equals(r.getStatus())) continue;
try {
RewardRequest req = buildRequest(r);
processorService.process(req);
} catch (Exception e) {
rewardMapper.increaseRetryCount(r.getId());
log.warn("补偿发奖失败 bizId={}", r.getBizId());
}
}
}
private RewardRequest buildRequest(UserTaskReward r) {
RewardRequest req = new RewardRequest();
req.setUserId(r.getUserId());
req.setTaskId(r.getTaskId());
req.setAmount(r.getAmount());
req.setRewardType(r.getRewardType());
req.setBizId(r.getBizId());
return req;
}
}
总结:
总结一下幂等实现方案和包括前面的总体处理逻辑。
最开始,是用Resilience4j来对请求进行限流处理,熔断操作。保证不会一下出现大量请求打进接口。然后使用 rocketmq + CompletableFuture 异步处理,不影响主流程,并且将瞬时大流量转为时间窗口执行,削峰处理。第一次发送时是用立即发送方法sendImmediate。当消费消息处理逻辑时,核心逻辑,首先会根据bizId去redis是否上锁,如果没有则根据bizId去数据库查看是否有数据。如果没有,就表示第一次处理,statu设置PENDING。如果状态为success,表示已经处理成功。当成功发送,将状态设置为SUCCESS。如果失败,就需要设置为FAILD,并且释放redis锁。
这里将会有可能触发两种重试机制。
- sendWithRetry
- 自定义的重试机制
- 定时补偿机制
- 最终一致性兜底策略
为什么有一个重试方法sendWithRetry,还要写一个定时补偿机制。可能有疑问。这里说一下,
二者并不是冗余关系,而且相反是一个互补的关系
一个是当业务出现问题,马上进行重试,一个是定期查看库中符合条件的进行重试补偿。
一个是秒级,一个是分钟级。
简单来说sendWithRetry就是当有东西掉了,就赶紧去把他捡回来。但是也有可能这个过程会失败。定时补偿就是,重新找一个,不漏掉一个锅。所以二者并不冲突,并且是一个互补。
其中定时补偿可以自己定义,是选择继续重试,还是直接记录失败,让人工进行补偿。具体业务具体分析。
奖励可重复领取场景:
业务场景分析:
以上都在说一个用户只能领取一次奖励,但是!!!真实业务场景对于用户奖励怎么可能只会有一次的情况,完全会存在用户可以重复领取一个任务多次,比如回答问题等。完全有这种场景。那接下来就来考虑这个场景
存在问题:
之前的bizId是用来做幂等的,也是用来redis加分布式锁的。组成是 userId + taskId。这个的问题就是,一个用户只能领取一次奖励,当领取一次后,后面的领取请求就不好使了,就连redis锁这一关就过不了
具体实现:
在配置任务类型时往往会有一个配置选项,就是支持重复领取吗,如果不支持就是仅限一次,如果支持那么该任务就可以重复领取。
整体逻辑还是围绕bizId解决,之前为什么不支持重复领取,就是因为bizId是通过userId + taskId 这样就会因为幂等操作,拒绝重复领取。 如果bizId为 userId + taskId + activeId,activeId就是每一次请求独一无二的,比如每次答题都会有一个答题id,每次请求都是不同的,如果支持重复领取就没问题
bizId设计方案:
先从任务配置中获取该任务是否可重复领取,然后生成对应的bizId
/**
* 生成 bizId
*
* @param taskId 任务ID
* @param userId 用户ID
* @param allowRepeat 是否允许重复
* @param repeatKey 可重复任务的区分标识,如答题ID、订单号等。若是一次性任务则传 null
*/
public static String generateBizId(Long taskId, Long userId, boolean allowRepeat, String repeatKey) {
String rk;
if (!allowRepeat) {
rk = "once"; // 一次性任务的唯一标识
} else {
rk = StringUtils.hasText(repeatKey) ? repeatKey : UUID.randomUUID().toString(); // 保证唯一性
}
return String.format("reward:%d:%d:%s", taskId, userId, rk);
}
核心process代码实现
@Service
@RequiredArgsConstructor
public class RewardProcessorService {
private final TaskConfigService taskConfigService;
private final RewardRecordMapper rewardMapper;
private final ActualRewardService actualRewardService;
private final StringRedisTemplate redisTemplate;
/**
* 核心发奖流程,支持重复/不重复领取控制
*/
public void process(RewardRequest request) {
// 1. 查询任务配置(是否允许重复)
TaskConfig config = taskConfigService.getByTaskId(request.getTaskId());
if (config == null) {
throw new BusinessException("任务配置不存在,taskId=" + request.getTaskId());
}
// 2. 生成 bizId(用于幂等 & 唯一发奖标识)
String bizId = BizIdGenerator.generateBizId(request.getTaskId(), request.getUserId(), config, request.getRepeatKey());
request.setBizId(bizId);
// 3. Redis 加锁防重(5分钟过期,避免并发处理)
String lockKey = "reward:lock:" + bizId;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofMinutes(5));
if (Boolean.FALSE.equals(locked)) {
log.warn("发奖请求正在处理中,跳过重复请求。bizId={}", bizId);
return;
}
try {
// 4. 幂等校验(DB 层记录)
UserTaskReward reward = rewardMapper.findByBizId(bizId);
if (reward != null && "SUCCESS".equals(reward.getStatus())) {
log.info("发奖记录已存在且成功,跳过。bizId={}", bizId);
return;
}
// 5. 初始化发奖记录(首次处理)
if (reward == null) {
reward = new UserTaskReward();
reward.setUserId(request.getUserId());
reward.setTaskId(request.getTaskId());
reward.setRewardType(request.getRewardType());
reward.setAmount(request.getAmount());
reward.setStatus("PENDING");
reward.setBizId(bizId);
reward.setCreateTime(LocalDateTime.now());
reward.setRetryCount(0);
rewardMapper.insert(reward);
}
// 6. 调用实际发奖逻辑(需幂等实现)
actualRewardService.sendReward(request);
// 7. 更新发奖状态为成功
rewardMapper.updateStatus(reward.getId(), "SUCCESS");
log.info("✅ 发奖成功,bizId={}, userId={}, taskId={}", bizId, request.getUserId(), request.getTaskId());
} catch (Exception e) {
// 8. 记录失败状态 & 准备补偿
rewardMapper.updateStatusWithRetry(request.getBizId(), "FAILED");
redisTemplate.delete(lockKey); // 解锁让补偿任务可以重试
log.error("发奖失败,bizId={}, error={}", request.getBizId(), e.getMessage(), e);
throw e;
}
}
}
核心问题已解决,细节就不多说了,该怎么去存储,查询的时候用缓存吗。这些不说了。好了写了一篇论文了,下去沉淀吧!!!!!
更多推荐



所有评论(0)