(爆肝一万两千字!下去好好沉淀!!)

业务场景分析:

  • 高并发: 用户在完成任务后集中领取奖励
  • 发奖不能失败和重复:奖励发放不能失败和重复发送,对数据要求十分精准,需要做好高容错处理。
  • 可容忍延迟:发奖不需要立即到账,可允许稍微延迟,对实时性要求不高。
  • 发奖存在稳定性问题: 在发奖时可能会调用第三方系统,存在不稳定
  • 需要具有幂等性:发奖逻辑可以设计为幂等性(taskId + userId)

技术方案选型:

通过对业务的分析,目前就可以选用合适的技术栈进行实现。

  • Resilience4j
    • 这个工具类轻量易上手,主要用来,限流,熔断,超时保护,防止大量请求集中打到接口导致服务不可用。
  • RocketMQ
    • 这里使用mq来进行流量削峰,发奖逻辑异步操作作用,将瞬时大量请求转为时间窗口慢慢处理。不阻挡主流程。

代码具体落地:

InDto层

  • 领取奖励入参请求体,携带重试次数
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RewardRequest implements Serializable {
    private Long userId;
    private Integer taskId;
    private int retryCount = 0;
}

controller层

  • RateLimiter(控制接口每秒最多允许多少个请求,防止刷接口)
  • CircuitBreaker(当失败率过高自动熔断,暂停调用保护服务)
  • TimeLimiter(若内部处理超时(如 MQ 卡顿)自动降级 fallback)
@PostMapping("/receive")
@RateLimiter(name = "rewardRateLimiter")
@CircuitBreaker(name = "rewardCircuitBreaker", fallbackMethod = "fallback")
@TimeLimiter(name = "rewardTimeLimiter")
  • 用户的“领取奖励”请求不会同步执行逻辑,而是发消息;

  • 控制层立即响应,保证响应速度和用户体验

return CompletableFuture.supplyAsync(() -> {
    rewardProducer.sendImmediate(request); // 发MQ消息
    return "请求接收成功,奖励处理中";
});

MQ Producer

  • sendImmediate: 当消息是首次发送时为立即发送
  • sendWithRetry:     当消息处理失败时,发送延时消息
  • getDelayLevel:      根据InDto设置的重试次数来决定延时多久。
    public void send(RewardRequest request, int delayLevel) {
        Message<RewardRequest> message = MessageBuilder.withPayload(request).build();
        rocketMQTemplate.syncSend(TOPIC + ":award", message, 3000, delayLevel); // 超时 + 延迟
    }

public void sendImmediate(RewardRequest request) {
        send(request, 0); // 不延迟
    }

    public void sendWithRetry(RewardRequest request) {
        int retryCount = request.getRetryCount();
        if (retryCount >= 3) {
            System.err.println("重试超过最大次数,放弃发奖 user=" + request.getUserId());
            return;
        }

        request.setRetryCount(retryCount + 1);
        int delayLevel = getDelayLevel(retryCount);
        send(request, delayLevel);
    }




    private int getDelayLevel(int retryCount) {
        // RocketMQ支持的延迟等级:1s、5s、10s、30s、1m、2m、3m、4m、5m...(等级1~18)
        return switch (retryCount) {
            case 1 -> 3; // 10s
            case 2 -> 5; // 30s
            case 3 -> 7; // 1m
            default -> 1;
        };
    }

MQ Consumer

  • processorService.process: 发奖处理逻辑;
public void onMessage(RewardRequest request) {
        try {
            processorService.process(request);
        } catch (Exception e) {
            System.err.println("发奖失败,进行重试 user=" + request.getUserId());
            rewardProducer.sendWithRetry(request);
        }
    }
ProcessorService层
  • 在这里处理真正的发送逻辑,可以是调用第三方系统,也可以是本地进行逻辑操作。
public void process(RewardRequest request) {
        System.out.printf("开始发奖 user=%d, task=%d, 第%d次尝试\n",
                request.getUserId(), request.getTaskId(), request.getRetryCount());

        // 模拟随机失败
        if (Math.random() < 0.3) {
            throw new RuntimeException("模拟发奖失败");
        }

        System.out.println("发奖成功 ✅");
    }

额外知识点补充:

幂等具体实现:

业务场景问题分析:

如果没有进行幂等操作,那么如果用户重复提交领取奖励的请求,那么就有可能会出现奖励重复发送。

幂等方案有很多,这里我们用数据库 + redis双保险

代码具体实现:

奖励发放记录表(user_task_reward)
  • 这张表记录了用户领取奖励的记录表,其中task_id + user_id设置为唯一索引来保证唯一性。
CREATE TABLE user_task_reward (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id BIGINT NOT NULL,
  task_id INT NOT NULL,
  reward_amount INT,
  create_time DATETIME,
  UNIQUE KEY uk_user_task (user_id, task_id)  -- 幂等唯一约束
);
ProcessorService层
  • 首先存入redis中的key 肯定是userId + taskId。
    • 使用redis自有的分布式锁来判断该用户是否已经领取奖励了。
    • 尝试在db添加记录。
  • 当在发放奖励逻辑过程中如果失败,捕获异常,删除redis分布式锁,这样重试的时候可以进入step2。但这样做并没有把情况考虑完全。
public void process(RewardRequest request) {
        String key = String.format("reward:lock:%d:%d", request.getUserId(), request.getTaskId());

        // STEP 1:用Redis加锁实现幂等防重
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(5));
        if (Boolean.FALSE.equals(locked)) {
            System.out.println("已经处理过,跳过发奖");
            return;
        }

        try {
            // STEP 2:尝试插入奖励记录(DB层幂等)
            UserTaskReward reward = new UserTaskReward();
            reward.setUserId(request.getUserId());
            reward.setTaskId(request.getTaskId());
            reward.setRewardAmount(100);
            reward.setCreateTime(LocalDateTime.now());

            rewardMapper.insertReward(reward); // 有唯一索引保证幂等

            // STEP 3:调用实际发奖逻辑
            System.out.printf("✅ 发奖成功 user=%d, task=%d\n",
                    request.getUserId(), request.getTaskId());

        } catch (DuplicateKeyException e) {
            System.out.println("数据库唯一索引幂等命中,忽略");
        } catch (Exception e) {
            System.out.println("发奖失败,释放锁");
            redisTemplate.delete(key); // 失败要释放锁,避免永远跳过
            throw e;
        }
    }

新的问题:如果第一次出问题是在step3后呢,记录已经插入,但是发奖逻辑失败,此时就回出现业务假成功(脏写)

解决方案

  1. 使用事务@Transactional

在方法上加上@Transactional注解,因为使用了try -catch所以在catch块中手动抛异常。看着解决了遇到的问题,但是仍然有问题,目前只能回滚本地数据库操作,如果你在处理发放逻辑中调用了其他服务,其他服务进行了对应的数据操作,那么也可能会造成脏数据。

但如果坚持使用事务来解决问题,就需要考虑到分布式事务(seata,tcc等)但是,不建议,不建议,不建议。重要的事情说三遍

为什么呢? 分布式事务,它本身就是为了协调多个系统之间,多个数据库之间的一个一致性问题。所以可想而知,分布式需要协调多个系统之间的锁和提交状态。这使得分布式事务效率十分低下,性能差。分布式事务架构复杂,维护成本高,拿tcc举例,得为每个服务写三套接口,并且还要考虑其中带来的问题(业务悬挂,其他失败场景)成本很高。分布式事务还容易造成系统锁死,可用性差,当某一个参与者挂掉,那其他参与者都得阻塞等他。当事务协调者也就是(seata或者tcc)挂掉,整个事务也就会卡住或者失败。

@Transactional(rollbackFor = Exception.class)
    public void process(RewardRequest request) {
        String repeatKey = generateRepeatKey(request); // 支持可重复任务
        String key = String.format("reward:lock:%d:%d:%s", request.getUserId(), request.getTaskId(), repeatKey);

        // Redis防重复
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(5));
        if (Boolean.FALSE.equals(locked)) {
            System.out.println("已处理过,跳过");
            return;
        }

        try {
            // STEP 1:插入领取记录
            UserTaskReward reward = new UserTaskReward();
            reward.setUserId(request.getUserId());
            reward.setTaskId(request.getTaskId());
            reward.setRepeatKey(repeatKey);
            reward.setCreateTime(LocalDateTime.now());

            rewardMapper.insertReward(reward); // 有唯一索引

            // STEP 2:执行实际发奖逻辑(可能调用远程服务)
            actualRewardService.doReward(request);

            System.out.println("✅ 发奖成功");

        } catch (DuplicateKeyException e) {
            System.out.println("数据库幂等命中,跳过");

        } catch (Exception e) {
            // STEP 3:失败时释放 Redis 锁
            redisTemplate.delete(key);
            throw e; // 事务回滚
        }
    }

有没有其他解决方案呢?包的老弟,包的老弟。接下来就用另外一个也是非常推荐的方案使用

        2.记录状态 + 异步补偿

补偿机制是实现最终一致性的主流办法。

        代码具体实现

// ✅ 核心发奖重试 + 补偿 + 幂等防重模块

表
CREATE TABLE `user_task_reward` (
  `id` BIGINT AUTO_INCREMENT COMMENT '主键ID',
  
  `user_id` BIGINT NOT NULL COMMENT '用户ID',
  `task_id` INT NOT NULL COMMENT '任务ID',
  `reward_type` VARCHAR(32) NOT NULL COMMENT '奖励类型,如 POINTS/COUPON/CASH',
  `amount` INT DEFAULT 0 COMMENT '奖励数额(如积分/优惠券面额等)',

  `biz_id` VARCHAR(64) NOT NULL COMMENT '幂等ID,如 reward:{taskId}:{userId}:{repeatKey}',
  
  `status` VARCHAR(16) NOT NULL DEFAULT 'PENDING' COMMENT '处理状态:PENDING、SUCCESS、FAILED、ABANDONED',
  `retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
  `last_retry_time` DATETIME DEFAULT NULL COMMENT '最近重试时间',
  `fail_reason` VARCHAR(255) DEFAULT NULL COMMENT '失败原因(最后一次)',

  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',

  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_biz_id` (`biz_id`), -- 幂等唯一键
  KEY `idx_user_id` (`user_id`),
  KEY `idx_task_id` (`task_id`),
  KEY `idx_status_retry` (`status`, `retry_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户任务发奖记录表';


// === 1. 发奖请求 DTO ===
@Data
public class RewardRequest {
    private Long userId;
    private Integer taskId;
    private Integer amount;
    private String bizId;       // 幂等标识:reward:taskId:userId
    private Integer retryCount;
}

// === 2. 发奖记录实体 ===
@Data
public class UserTaskReward {
    private Long id;
    private Long userId;
    private Integer taskId;
    private Integer amount;
    private String status;      // PENDING, SUCCESS, FAILED
    private String bizId;
    private Integer retryCount;
    private LocalDateTime lastRetryTime;
    private LocalDateTime createTime;
}

// === 3. 发奖服务(幂等 + 状态落库) ===
@Service
@RequiredArgsConstructor
public class RewardProcessorService {

    private final RewardRecordMapper rewardMapper;
    private final ActualRewardService actualRewardService;
    private final StringRedisTemplate redisTemplate;

    public void process(RewardRequest request) {
        String lockKey = String.format("reward:lock:%s", request.getBizId());
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofMinutes(5));
        if (Boolean.FALSE.equals(locked)) return;

        try {
            UserTaskReward reward = rewardMapper.findByBizId(request.getBizId());
            if (reward != null && "SUCCESS".equals(reward.getStatus())) return;

            if (reward == null) {
                reward = new UserTaskReward();
                reward.setUserId(request.getUserId());
                reward.setTaskId(request.getTaskId());
                reward.setRewardType(request.getRewardType());
                reward.setAmount(request.getAmount());
                reward.setStatus("PENDING");
                reward.setBizId(request.getBizId());
                reward.setCreateTime(LocalDateTime.now());
                rewardMapper.insert(reward);
            }

            actualRewardService.sendReward(request); // ✅ 发奖必须幂等

            rewardMapper.updateStatus(reward.getId(), "SUCCESS");

        } catch (Exception e) {
            rewardMapper.updateStatusWithRetry(request.getBizId(), "FAILED");
            redisTemplate.delete(lockKey); // 释放锁,允许补偿重试
            throw e;
        }
    }
}

// === 4. MQ 消费端 + 自定义重试逻辑 ===
@Component
@RequiredArgsConstructor
public class RewardMqListener {

    private final RewardProcessorService processorService;
    private final RewardProducer rewardProducer;

    @RocketMQMessageListener(topic = "reward-topic", consumerGroup = "reward-cg")
    public void onMessage(RewardRequest request) {
        try {
            processorService.process(request);
        } catch (Exception e) {
            log.error("发奖失败,进入自定义重试,bizId={}", request.getBizId());
            rewardProducer.sendWithRetry(request);
        }
    }
}

// === 5. 定时补偿任务 ===
@Component
@RequiredArgsConstructor
public class RewardCompensator {

    private final RewardRecordMapper rewardMapper;
    private final RewardProcessorService processorService;

    @Scheduled(fixedDelay = 5 * 60 * 1000)
    public void retryFailedRewards() {
        List<UserTaskReward> failedList = rewardMapper.queryByStatus("FAILED", 3);
        for (UserTaskReward r : failedList) {
            if ("SUCCESS".equals(r.getStatus())) continue;

            try {
                RewardRequest req = buildRequest(r);
                processorService.process(req);
            } catch (Exception e) {
                rewardMapper.increaseRetryCount(r.getId());
                log.warn("补偿发奖失败 bizId={}", r.getBizId());
            }
        }
    }

    private RewardRequest buildRequest(UserTaskReward r) {
        RewardRequest req = new RewardRequest();
        req.setUserId(r.getUserId());
        req.setTaskId(r.getTaskId());
        req.setAmount(r.getAmount());
        req.setRewardType(r.getRewardType());
        req.setBizId(r.getBizId());
        return req;
    }
}
总结:

总结一下幂等实现方案和包括前面的总体处理逻辑。

最开始,是用Resilience4j来对请求进行限流处理,熔断操作。保证不会一下出现大量请求打进接口。然后使用  rocketmq + CompletableFuture 异步处理,不影响主流程,并且将瞬时大流量转为时间窗口执行,削峰处理。第一次发送时是用立即发送方法sendImmediate。当消费消息处理逻辑时,核心逻辑,首先会根据bizId去redis是否上锁,如果没有则根据bizId去数据库查看是否有数据。如果没有,就表示第一次处理,statu设置PENDING。如果状态为success,表示已经处理成功。当成功发送,将状态设置为SUCCESS。如果失败,就需要设置为FAILD,并且释放redis锁。

这里将会有可能触发两种重试机制。

  • sendWithRetry
    • 自定义的重试机制
  • 定时补偿机制
    • 最终一致性兜底策略

为什么有一个重试方法sendWithRetry,还要写一个定时补偿机制。可能有疑问。这里说一下,

二者并不是冗余关系,而且相反是一个互补的关系

一个是当业务出现问题,马上进行重试,一个是定期查看库中符合条件的进行重试补偿。

一个是秒级,一个是分钟级。

简单来说sendWithRetry就是当有东西掉了,就赶紧去把他捡回来。但是也有可能这个过程会失败。定时补偿就是,重新找一个,不漏掉一个锅。所以二者并不冲突,并且是一个互补。

其中定时补偿可以自己定义,是选择继续重试,还是直接记录失败,让人工进行补偿。具体业务具体分析。

奖励可重复领取场景:

业务场景分析:

以上都在说一个用户只能领取一次奖励,但是!!!真实业务场景对于用户奖励怎么可能只会有一次的情况,完全会存在用户可以重复领取一个任务多次,比如回答问题等。完全有这种场景。那接下来就来考虑这个场景

存在问题:

之前的bizId是用来做幂等的,也是用来redis加分布式锁的。组成是 userId + taskId。这个的问题就是,一个用户只能领取一次奖励,当领取一次后,后面的领取请求就不好使了,就连redis锁这一关就过不了

具体实现:

在配置任务类型时往往会有一个配置选项,就是支持重复领取吗,如果不支持就是仅限一次,如果支持那么该任务就可以重复领取。

整体逻辑还是围绕bizId解决,之前为什么不支持重复领取,就是因为bizId是通过userId + taskId 这样就会因为幂等操作,拒绝重复领取。 如果bizId为 userId + taskId + activeId,activeId就是每一次请求独一无二的,比如每次答题都会有一个答题id,每次请求都是不同的,如果支持重复领取就没问题

bizId设计方案:

先从任务配置中获取该任务是否可重复领取,然后生成对应的bizId

 /**
     * 生成 bizId
     *
     * @param taskId 任务ID
     * @param userId 用户ID
     * @param allowRepeat 是否允许重复
     * @param repeatKey 可重复任务的区分标识,如答题ID、订单号等。若是一次性任务则传 null
     */
    public static String generateBizId(Long taskId, Long userId, boolean allowRepeat, String repeatKey) {
        String rk;
        if (!allowRepeat) {
            rk = "once"; // 一次性任务的唯一标识
        } else {
            rk = StringUtils.hasText(repeatKey) ? repeatKey : UUID.randomUUID().toString(); // 保证唯一性
        }
        return String.format("reward:%d:%d:%s", taskId, userId, rk);
    }

核心process代码实现

@Service
@RequiredArgsConstructor
public class RewardProcessorService {

    private final TaskConfigService taskConfigService;
    private final RewardRecordMapper rewardMapper;
    private final ActualRewardService actualRewardService;
    private final StringRedisTemplate redisTemplate;

    /**
     * 核心发奖流程,支持重复/不重复领取控制
     */
    public void process(RewardRequest request) {
        // 1. 查询任务配置(是否允许重复)
        TaskConfig config = taskConfigService.getByTaskId(request.getTaskId());
        if (config == null) {
            throw new BusinessException("任务配置不存在,taskId=" + request.getTaskId());
        }

        // 2. 生成 bizId(用于幂等 & 唯一发奖标识)
        String bizId = BizIdGenerator.generateBizId(request.getTaskId(), request.getUserId(), config, request.getRepeatKey());
        request.setBizId(bizId);

        // 3. Redis 加锁防重(5分钟过期,避免并发处理)
        String lockKey = "reward:lock:" + bizId;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofMinutes(5));
        if (Boolean.FALSE.equals(locked)) {
            log.warn("发奖请求正在处理中,跳过重复请求。bizId={}", bizId);
            return;
        }

        try {
            // 4. 幂等校验(DB 层记录)
            UserTaskReward reward = rewardMapper.findByBizId(bizId);
            if (reward != null && "SUCCESS".equals(reward.getStatus())) {
                log.info("发奖记录已存在且成功,跳过。bizId={}", bizId);
                return;
            }

            // 5. 初始化发奖记录(首次处理)
            if (reward == null) {
                reward = new UserTaskReward();
                reward.setUserId(request.getUserId());
                reward.setTaskId(request.getTaskId());
                reward.setRewardType(request.getRewardType());
                reward.setAmount(request.getAmount());
                reward.setStatus("PENDING");
                reward.setBizId(bizId);
                reward.setCreateTime(LocalDateTime.now());
                reward.setRetryCount(0);
                rewardMapper.insert(reward);
            }

            // 6. 调用实际发奖逻辑(需幂等实现)
            actualRewardService.sendReward(request);

            // 7. 更新发奖状态为成功
            rewardMapper.updateStatus(reward.getId(), "SUCCESS");

            log.info("✅ 发奖成功,bizId={}, userId={}, taskId={}", bizId, request.getUserId(), request.getTaskId());

        } catch (Exception e) {
            // 8. 记录失败状态 & 准备补偿
            rewardMapper.updateStatusWithRetry(request.getBizId(), "FAILED");
            redisTemplate.delete(lockKey); // 解锁让补偿任务可以重试
            log.error("发奖失败,bizId={}, error={}", request.getBizId(), e.getMessage(), e);
            throw e;
        }
    }
}

核心问题已解决,细节就不多说了,该怎么去存储,查询的时候用缓存吗。这些不说了。好了写了一篇论文了,下去沉淀吧!!!!!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐