您好!很高兴为您详细介绍 Redis Stream

Redis Stream 是 Redis 在 5.0 版本引入的强大、持久化的新型数据结构。它旨在提供一个高性能、低延迟的**消息队列(Message Queue)事件流(Event Stream)**解决方案,常用于构建异步通信、事件驱动架构、以及您正在开发的聊天应用场景。


🚀 一、基本概念和特性

Redis Stream 本质上是一个**仅追加(Append-Only)**的日志结构,类似于 KafkaRabbitMQ 等专业消息中间件,但它更加轻量级,并且与 Redis 的原子操作、持久化等特性紧密结合。

1. 消息和 ID

  • 消息内容: Stream 中的每个消息都是由一个或多个 键值对(Field-Value) 组成的集合。
  • 消息 ID(Message ID): 每条消息被写入 Stream 时,Redis 都会为其自动生成一个 独一无二的、单调递增的 ID
    • ID 格式为:<毫秒时间戳>-<序列号>(例如:1701830400000-0)。
    • 这种设计保证了消息在整个 Stream 中的全局有序性

2. 核心数据结构

概念 描述 作用
Stream Key 整个消息队列的名称(例如:chat_history_stream)。 存储所有消息的容器。
Entry Stream 中存储的单个消息。 包含唯一的 ID 和消息内容(键值对)。
Consumer Group 消费者组。由一个或多个消费者组成的逻辑组。 允许多个消费者共同消费一个 Stream,实现负载均衡
Consumer 组内的一个独立进程/客户端(例如:您代码中的 chat-consumer-1)。 负责从组内读取消息并进行处理。

🔄 二、消费者组机制(Consumer Groups)

消费者组是 Redis Stream 最强大的特性,它提供了消息队列中必不可少的消息持久化、负载均衡和故障恢复能力。

1. 负载均衡与唯一性

  • 在同一消费者组内,消息会被分发给组内的不同消费者。
  • 每个消息只会被组内的一个消费者消费一次(即消息的唯一性),实现了队列的负载均衡。
  • 如果有多个不同的消费者组,同一个 Stream 中的消息可以被所有组独立地消费一遍。

2. 待处理消息列表(Pending Entries List, PEL)

PEL 是 Stream 消费者组机制的核心,用于实现可靠性

  • 当一个消费者使用 XREADGROUP 命令从 Stream 中读取消息时,这些消息并不会立即从 Stream 中删除。
  • 它们会被移动到该消费者的 PEL 中。
  • PEL 记录了:消息ID该消息分配给哪个消费者消息读取时间尝试交付次数
  • 确认处理(ACK): 只有当消费者成功处理消息后,通过 XACK 命令显式确认,该消息才会从 PEL 中移除。

3. 故障恢复(Claim)

如果一个消费者宕机(例如您代码中消费者发生 panic),其 PEL 中的消息就永远不会被 ACK。

  • 故障发现: 其他消费者或监控系统可以通过 XPENDING 命令检查某个消费者的 PEL 列表。
  • 消息转移: 监控系统可以通过 XCLAIM 命令,将超过一定空闲时间(Idle Time)的 PEL 消息强制转移给另一个健康的消费者处理。

4. 抽象理解(Claim)

好的,我来用一个形象的事务,比如一家 快递分拣中心 🚛,来解释 Redis Stream 中的**消费者组(Consumer Group)**机制。

📦 形象比喻:快递分拣中心

想象您的 Redis Stream(例如 chat_history_stream)就是一条永远不会停止、不断有新包裹(消息)流入的主传输带(Main Conveyor Belt)

1. Stream (主传输带)
  • Redis Stream (chat_history_stream): 主传输带。所有聊天消息(包裹)都按时间顺序不断地流经这里。一旦包裹放入,就永远不会消失。
  • 消息 ID (170183...-0): 包裹上的唯一条形码。它保证了包裹的顺序和身份。

2. Consumer Group (分拣班组)
  • 消费者组 (chat_consumer_group): 这是一个分拣班组,由多名工人组成,他们的目标是共同处理主传输带上的所有包裹。

    • 职责: 确保主传输带上的每个包裹只被这个班组处理一次
3. Consumers (分拣工人)
  • 消费者 (chat-consumer-1, chat-consumer-2 等): 班组中的分拣工人
    • 负载均衡: 当主传输带上有新包裹时,这些包裹会被分发给班组内的不同工人。如果 C1 拿了一个包裹,C2 就不会再拿同一个。这实现了消息的并行处理负载均衡

🛡️ 核心机制:可靠性和故障恢复

消费者组之所以强大,在于它能处理“包裹被拿走但未处理完成”的情况。

4. Pending Entries List (PEL) (工人手推车 / 待办清单)
  • 当工人 C1 从传输带上拿走 10 个包裹时,这些包裹并不会立即消失。它们会被记录在 C1 的**个人手推车(PEL)**上,表示:“我已经拿了这些包裹,正在处理中。”
  • PEL 记录了:包裹 ID哪个工人拿走的拿走多久了
5. XACK (贴上“已完成”标签)
  • 只有当工人 C1 成功处理完包裹(例如,广播到聊天室,或存入数据库)后,他必须使用 XACK 命令,给这些包裹贴上一个 “已完成”标签
  • 贴上标签后,这些包裹才会从 C1 的手推车(PEL)上移除,班组记录也标记为最终完成。
6. 故障恢复 (小组长救场)
  • 如果工人 C1 突然生病回家了(消费者进程 panic 或宕机)。他手推车(PEL)上的包裹就没有被贴上“已完成”标签。
  • **小组长(其他消费者或监控系统)**会定期检查 PEL。如果发现 C1 手推车上的包裹停留时间过长(例如超过 5 分钟):
    • 小组长会使用 XCLAIM 命令,将这些包裹从 C1 的手推车上拿走,转移给健康的工人 C2 或 C3。
    • C2/C3 拿到这些包裹后,重新开始处理,确保任何消息都不会因为单点故障而丢失。

💻 三、核心命令详解

命令 作用 示例 说明
XADD 向 Stream 中添加一条新消息。 XADD mystream * sensor:temp 25 * 表示由 Redis 自动生成新的 ID。
XGROUP CREATE 为指定的 Stream 创建一个消费者组。 XGROUP CREATE mystream mygroup 0$ 0$ 表示从 Stream 的开头开始消费。
XREAD 独立地读取 Stream 中的消息(不使用消费者组)。 XREAD BLOCK 1000 STREAMS mystream $ 从 Stream 最新 ID 处开始阻塞读取。
XREADGROUP 从消费者组中读取新消息或待处理消息。 XREADGROUP GROUP mygroup consumer1 STREAMS mystream > consumer1mygroup 中读取未被处理的新消息 (>)。
XACK 确认指定 ID 的消息已被处理,并将其从 PEL 中移除。 XACK mystream mygroup 170183... 必须在 XREADGROUP 之后执行,确保可靠性。
XPENDING 查看消费者组或指定消费者当前的待处理消息(PEL)。 XPENDING mystream mygroup 用于监控和故障排查。

场景应用(您的聊天服务器)

在您的 Go 聊天服务器中,Redis Stream 的应用模型如下:

  1. 消息发送 (Producer): 服务器收到用户消息后,通过 XADD chat_history_stream 将消息写入 Stream(包括 sendercontexttype 等字段)。
  2. 异步消费 (Consumer): 您的三个 Go 协程 (chat-consumer-1-2-3) 组成了消费者组 chat_consumer_group。它们循环执行 XREADGROUP 从 Stream 中拉取消息。
  3. 广播处理: 收到消息的消费者(例如 chat-consumer-1)负责调用服务器的 ChatTaskHandler,将消息转发到 broadcastChan,最终广播给所有在线用户。
  4. 确认机制: 广播成功后,消费者必须执行 XACK,将该消息从自己的 PEL 中移除。
    本文主要是用文字理解Stream中的如何实现和运行
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐