Redis学习之Stream
Redis Stream 是一种高性能、持久化的消息队列数据结构,具有仅追加日志特性。它通过消费者组机制实现负载均衡和可靠性,每个消息由唯一ID标识并被分配给组内消费者处理。关键机制包括待处理消息列表(PEL)用于跟踪未确认消息,以及XACK确认和XCLAIM故障恢复功能。核心命令如XADD添加消息、XREADGROUP消费消息、XPENDING监控未处理消息等。文章以快递分拣中心为比喻,形象解释
您好!很高兴为您详细介绍 Redis Stream。
Redis Stream 是 Redis 在 5.0 版本引入的强大、持久化的新型数据结构。它旨在提供一个高性能、低延迟的**消息队列(Message Queue)和事件流(Event Stream)**解决方案,常用于构建异步通信、事件驱动架构、以及您正在开发的聊天应用场景。
🚀 一、基本概念和特性
Redis Stream 本质上是一个**仅追加(Append-Only)**的日志结构,类似于 Kafka 或 RabbitMQ 等专业消息中间件,但它更加轻量级,并且与 Redis 的原子操作、持久化等特性紧密结合。
1. 消息和 ID
- 消息内容: Stream 中的每个消息都是由一个或多个 键值对(Field-Value) 组成的集合。
- 消息 ID(Message ID): 每条消息被写入 Stream 时,Redis 都会为其自动生成一个 独一无二的、单调递增的 ID。
- ID 格式为:
<毫秒时间戳>-<序列号>(例如:1701830400000-0)。 - 这种设计保证了消息在整个 Stream 中的全局有序性。
- ID 格式为:
2. 核心数据结构
| 概念 | 描述 | 作用 |
|---|---|---|
| Stream Key | 整个消息队列的名称(例如:chat_history_stream)。 |
存储所有消息的容器。 |
| Entry | Stream 中存储的单个消息。 | 包含唯一的 ID 和消息内容(键值对)。 |
| Consumer Group | 消费者组。由一个或多个消费者组成的逻辑组。 | 允许多个消费者共同消费一个 Stream,实现负载均衡。 |
| Consumer | 组内的一个独立进程/客户端(例如:您代码中的 chat-consumer-1)。 |
负责从组内读取消息并进行处理。 |
🔄 二、消费者组机制(Consumer Groups)
消费者组是 Redis Stream 最强大的特性,它提供了消息队列中必不可少的消息持久化、负载均衡和故障恢复能力。
1. 负载均衡与唯一性
- 在同一消费者组内,消息会被分发给组内的不同消费者。
- 每个消息只会被组内的一个消费者消费一次(即消息的唯一性),实现了队列的负载均衡。
- 如果有多个不同的消费者组,同一个 Stream 中的消息可以被所有组独立地消费一遍。
2. 待处理消息列表(Pending Entries List, PEL)
PEL 是 Stream 消费者组机制的核心,用于实现可靠性:
- 当一个消费者使用
XREADGROUP命令从 Stream 中读取消息时,这些消息并不会立即从 Stream 中删除。 - 它们会被移动到该消费者的 PEL 中。
- PEL 记录了:
消息ID、该消息分配给哪个消费者、消息读取时间、尝试交付次数。 - 确认处理(ACK): 只有当消费者成功处理消息后,通过
XACK命令显式确认,该消息才会从 PEL 中移除。
3. 故障恢复(Claim)
如果一个消费者宕机(例如您代码中消费者发生 panic),其 PEL 中的消息就永远不会被 ACK。
- 故障发现: 其他消费者或监控系统可以通过
XPENDING命令检查某个消费者的 PEL 列表。 - 消息转移: 监控系统可以通过
XCLAIM命令,将超过一定空闲时间(Idle Time)的 PEL 消息强制转移给另一个健康的消费者处理。
4. 抽象理解(Claim)
好的,我来用一个形象的事务,比如一家 快递分拣中心 🚛,来解释 Redis Stream 中的**消费者组(Consumer Group)**机制。
📦 形象比喻:快递分拣中心
想象您的 Redis Stream(例如 chat_history_stream)就是一条永远不会停止、不断有新包裹(消息)流入的主传输带(Main Conveyor Belt)。
1. Stream (主传输带)
- Redis Stream (
chat_history_stream): 主传输带。所有聊天消息(包裹)都按时间顺序不断地流经这里。一旦包裹放入,就永远不会消失。 - 消息 ID (
170183...-0): 包裹上的唯一条形码。它保证了包裹的顺序和身份。
2. Consumer Group (分拣班组)
-
消费者组 (
chat_consumer_group): 这是一个分拣班组,由多名工人组成,他们的目标是共同处理主传输带上的所有包裹。- 职责: 确保主传输带上的每个包裹只被这个班组处理一次。
3. Consumers (分拣工人)
- 消费者 (
chat-consumer-1,chat-consumer-2等): 班组中的分拣工人。- 负载均衡: 当主传输带上有新包裹时,这些包裹会被分发给班组内的不同工人。如果 C1 拿了一个包裹,C2 就不会再拿同一个。这实现了消息的并行处理和负载均衡。
🛡️ 核心机制:可靠性和故障恢复
消费者组之所以强大,在于它能处理“包裹被拿走但未处理完成”的情况。
4. Pending Entries List (PEL) (工人手推车 / 待办清单)
- 当工人 C1 从传输带上拿走 10 个包裹时,这些包裹并不会立即消失。它们会被记录在 C1 的**个人手推车(PEL)**上,表示:“我已经拿了这些包裹,正在处理中。”
- PEL 记录了:
包裹 ID、哪个工人拿走的、拿走多久了。
5. XACK (贴上“已完成”标签)
- 只有当工人 C1 成功处理完包裹(例如,广播到聊天室,或存入数据库)后,他必须使用
XACK命令,给这些包裹贴上一个 “已完成”标签。 - 贴上标签后,这些包裹才会从 C1 的手推车(PEL)上移除,班组记录也标记为最终完成。
6. 故障恢复 (小组长救场)
- 如果工人 C1 突然生病回家了(消费者进程
panic或宕机)。他手推车(PEL)上的包裹就没有被贴上“已完成”标签。 - **小组长(其他消费者或监控系统)**会定期检查 PEL。如果发现 C1 手推车上的包裹停留时间过长(例如超过 5 分钟):
- 小组长会使用
XCLAIM命令,将这些包裹从 C1 的手推车上拿走,转移给健康的工人 C2 或 C3。 - C2/C3 拿到这些包裹后,重新开始处理,确保任何消息都不会因为单点故障而丢失。
- 小组长会使用
💻 三、核心命令详解
| 命令 | 作用 | 示例 | 说明 |
|---|---|---|---|
XADD |
向 Stream 中添加一条新消息。 | XADD mystream * sensor:temp 25 |
* 表示由 Redis 自动生成新的 ID。 |
XGROUP CREATE |
为指定的 Stream 创建一个消费者组。 | XGROUP CREATE mystream mygroup 0$ |
0$ 表示从 Stream 的开头开始消费。 |
XREAD |
独立地读取 Stream 中的消息(不使用消费者组)。 | XREAD BLOCK 1000 STREAMS mystream $ |
从 Stream 最新 ID 处开始阻塞读取。 |
XREADGROUP |
从消费者组中读取新消息或待处理消息。 | XREADGROUP GROUP mygroup consumer1 STREAMS mystream > |
consumer1 从 mygroup 中读取未被处理的新消息 (>)。 |
XACK |
确认指定 ID 的消息已被处理,并将其从 PEL 中移除。 | XACK mystream mygroup 170183... |
必须在 XREADGROUP 之后执行,确保可靠性。 |
XPENDING |
查看消费者组或指定消费者当前的待处理消息(PEL)。 | XPENDING mystream mygroup |
用于监控和故障排查。 |
场景应用(您的聊天服务器)
在您的 Go 聊天服务器中,Redis Stream 的应用模型如下:
- 消息发送 (Producer): 服务器收到用户消息后,通过
XADD chat_history_stream将消息写入 Stream(包括sender、context、type等字段)。 - 异步消费 (Consumer): 您的三个 Go 协程 (
chat-consumer-1、-2、-3) 组成了消费者组chat_consumer_group。它们循环执行XREADGROUP从 Stream 中拉取消息。 - 广播处理: 收到消息的消费者(例如
chat-consumer-1)负责调用服务器的ChatTaskHandler,将消息转发到broadcastChan,最终广播给所有在线用户。 - 确认机制: 广播成功后,消费者必须执行
XACK,将该消息从自己的 PEL 中移除。
本文主要是用文字理解Stream中的如何实现和运行
更多推荐


所有评论(0)