Redis 消息队列(Stream)详解:构建高可靠、可追溯的异步通信系统

在现代分布式系统中,消息队列 是实现服务解耦、异步处理、流量削峰的核心组件。Redis 从 5.0 版本开始引入 Stream 数据结构,提供了一个持久化、高性能、支持消费者组的轻量级消息队列,成为 Kafka、RabbitMQ 的有力补充,尤其适合中小规模、低延迟的场景。

本文将深入讲解 Redis Stream 的核心概念、命令使用、消费者组机制、实战场景与最佳实践。


一、为什么选择 Redis Stream?

传统方案痛点 Redis Stream 解决方案
❌ List 不持久 ✅ 消息持久化,重启不丢失
❌ 无消费者组 ✅ 支持消费者组(Consumer Group)
❌ 无法追溯历史消息 ✅ 按 ID 查询任意消息
❌ 无确认机制 ✅ 支持消息确认(ACK)
❌ 性能一般 ✅ 内存操作,微秒级延迟

一句话总结
Redis Stream = 持久化日志 + 消费者组 + 消息确认 + 高性能,是 Redis 内置的“轻量级 Kafka”。


二、Stream 的核心概念

概念 说明
📜 Stream 一个只增不减的消息流,类似日志文件
📨 消息(Message) 由 ID 和多个 field-value 组成
🔢 消息 ID 自动生成(时间戳-序号),全局唯一、有序
🧑 消费者(Consumer) 读取消息的应用实例
👥 消费者组(Consumer Group) 一组消费者共同消费一个 Stream,实现消息分发
Pending Entries List(PEL) 记录已读但未确认的消息,防止丢失
🔄 消息确认(XACK) 消费者处理完成后确认,从 PEL 中移除

三、核心命令详解

1. 添加消息:XADD

XADD mystream * name Alice action login
  • mystream:流名称
  • *:自动生成 ID(推荐)
  • name Alice action login:field-value 对

📌 输出示例:

1712345678901-0  # 时间戳-序号

2. 读取消息:XREAD / XREADGROUP

XREAD:单消费者模式
# 从 ID 0 开始读取最多 5 条
XREAD COUNT 5 STREAMS mystream 0

# 阻塞读取(超时 5 秒)
XREAD BLOCK 5000 STREAMS mystream $
  • $:只接收新消息(类似 Kafka 的 latest)

XREADGROUP:消费者组模式(推荐)
# 先创建消费者组
XGROUP CREATE mystream mygroup $ MKSTREAM

# 消费者 consumer1 读取未确认的消息(>)
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >

# consumer2 也可以读取(广播?不!是分发)
XREADGROUP GROUP mygroup consumer2 COUNT 1 STREAMS mystream >

消费者组优势
多个消费者 分摊消费,提高吞吐量,实现负载均衡。


3. 确认消息:XACK

XACK mystream mygroup 1712345678901-0
  • 只有确认后,消息才会从 PEL 中移除

4. 查看待处理消息:XPENDING

XPENDING mystream mygroup

📌 输出:

1) 1712345678901-0
   consumer1
   30000  # 已读取但未确认的时间(ms)

✅ 用于监控“卡住”的消息,防止消息丢失。


5. 手动交付:XCLAIM

# 将长时间未确认的消息交给另一个消费者
XCLAIM mystream mygroup consumer2 MINIDLE 30000 1712345678901-0

✅ 实现故障转移。


6. 裁剪流长度:XTRIM

# 保留最近 1000 条消息
XADD mystream MAXLEN ~ 1000 * name Alice action login

或独立裁剪:

XTRIM mystream MAXLEN ~ 1000

✅ 防止 Stream 无限增长。


四、消费者组工作流程

生产者
   ↓
+------------------+
|   Stream         |
| 1712...-0: msg1  |
| 1712...-1: msg2  |
+------------------+
         ↓
   消费者组 mygroup
   /              \
consumer1       consumer2
  ↓                  ↓
处理 msg1          处理 msg2
  ↓                  ↓
XACK 确认          XACK 确认

消费者组特点:

  • 消息分发:每条消息只被组内一个消费者消费(广播需多个组)
  • 独立偏移:每个消费者有自己的读取位置
  • 容错机制:通过 PEL 和 XCLAIM 实现故障转移

五、典型应用场景

1. 异步任务处理

# 生产者:下单后发送消息
XADD tasks * type send_email user_id 1001 order_id 2001

# 消费者:邮件服务监听并发送
XREADGROUP GROUP email_worker worker1 STREAMS tasks >

2. 事件驱动架构(Event-Driven)

# 用户注册事件
XADD events * event_type user_registered user_id 1001 timestamp 1712345678

# 多个服务监听:
- 发送欢迎邮件
- 增加积分
- 更新推荐模型

✅ 实现业务解耦。


3. 日志收集与审计

# 微服务写入操作日志
XADD audit_log * service order action create_order user 1001

# 审计服务消费并存储到数据库

4. 延迟任务(结合 Sorted Set)

# 使用 ZSet 存储延迟任务,到期后写入 Stream
ZADD delay_queue 1712346000 "task:email:123"

# 定时任务轮询:
ZRANGEBYSCORE delay_queue 0 $(date +%s)
# 执行后 XADD 到 Stream

5. 微服务间通信

Order Service → Stream → Notification Service
Payment Service → Stream → Accounting Service

替代直接 RPC 调用,提高系统弹性。


六、与 List 实现队列的对比

特性 List + BLPOP Stream
💾 持久化 ❌ 依赖 RDB/AOF ✅ 天然持久
👥 消费者组 ❌ 不支持 ✅ 支持
🔍 消息追溯 ❌ 只能从头/尾操作 ✅ 按 ID 查询
✅ 消息确认 ❌ 无机制 ✅ XACK
🔄 故障转移 ❌ 难实现 ✅ XPENDING + XCLAIM
📈 性能 高(略低,但功能更强)
🧩 适用场景 简单队列 生产级消息系统

结论
对于新项目,优先使用 Stream;简单场景可用 List。


七、最佳实践建议

实践 说明
✅ 使用 消费者组 实现负载均衡与容错
✅ 设置 MAXLEN 防止内存无限增长
✅ 监控 XPENDING 及时发现未确认消息
✅ 使用 XCLAIM 处理消费者宕机
✅ 合理设置 阻塞超时 XREAD BLOCK 5000
✅ 避免 大消息 单条消息不宜 > 1KB
✅ 结合 TTL 临时 Stream 可设置过期

八、Java 实战示例(Spring Data Redis)

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2. 生产者

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public void sendMessage(String stream, String type, String userId) {
    Map<String, Object> message = new HashMap<>();
    message.put("type", type);
    message.put("user_id", userId);
    message.put("timestamp", System.currentTimeMillis());

    redisTemplate.opsForStream().add(stream, StreamArguments.entries(message));
}

3. 消费者(监听容器)

@Component
public class StreamListener {

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> container(
            RedisConnectionFactory connectionFactory) {
        StreamMessageListenerContainerOptions options = StreamMessageListenerContainerOptions
            .builder()
            .pollTimeout(Duration.ofSeconds(1))
            .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
            StreamMessageListenerContainer.create(connectionFactory, options);

        container.receive(
            Consumer.from("mygroup", "worker1"),
            StreamOffset.create("mystream", ReadOffset.lastConsumed()),
            (record) -> {
                System.out.println("Received: " + record.getValue());
                // 处理业务逻辑
            }
        );

        container.start();
        return container;
    }
}

九、监控与运维

命令 用途
XINFO STREAM mystream 查看流信息(长度、第一/最后 ID)
XINFO GROUPS mystream 查看消费者组
XINFO CONSUMERS mystream mygroup 查看组内消费者
XPENDING mystream mygroup 查看待确认消息
INFO stream 查看 Stream 全局统计

十、总结:Redis Stream 的核心价值

优势 说明
持久化 消息不丢失,适合关键业务
消费者组 实现消息分发与负载均衡
消息确认 保障至少一次投递
可追溯 按 ID 查询历史消息
高性能 内存操作,低延迟
轻量集成 无需引入 Kafka/RabbitMQ

结语
Redis Stream 是 Redis 生态中 最强大的消息中间件能力。它让 Redis 不再只是一个“缓存”,而是成为一个 多功能的实时数据平台

💡 推荐使用场景

  • 微服务间异步通信
  • 事件驱动架构
  • 任务队列(非超大规模)
  • 日志收集与审计

如果你正在寻找一个 简单、可靠、高性能 的消息队列方案,Redis Stream 是一个不容忽视的选择

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐