Redis 键空间通知

实时监控Redis键和值的变化

注意:Redis Pub/Sub 是即发即弃的;也就是说,如果 Pub/Sub 客户端断开连接,然后稍后重新连接,则客户端断开连接期间传递的所有事件都将丢失。

事件驱动

Redis 定义了两种事件类型,一种是文件事件,一种是时间事件。不同了事件类型通过不同的处理去处理。

文件事件

可以理解为是IO操作:建立连接,读请求,写响应。

时间事件

定时执行:过期key,AOF等。

打开方式

redis 默认是不发送任意事件通知的。

动态配置(临时开,重启失效)

CONFIG SET notify-keyspace-events AKE

静态配置(重启redis-server生效)

redis.conf中找到 notify-keyspace-events ""修改为 notify-keyspace-events "AKE"则表示开启所有事件通知。

事件类型表

字母 所属维度 含义说明 典型命令/场景
K 通道维度 Keyspace 事件,频道名格式 __keyspace@<db>__:<key> 监听“某个键发生过什么”
E 通道维度 Keyevent 事件,频道名格式 __keyevent@<db>__:<cmd> 监听“某个命令影响了哪些键”
g 命令维度 通用命令(与类型无关) DEL、EXPIRE、RENAME、MOVE、SWAPDB 等
$ 命令维度 字符串专属命令 SET、MSET、APPEND、INCR、DECR …
l 命令维度 列表专属命令 LPUSH、RPOP、LTRIM …
s 命令维度 集合专属命令 SADD、SPOP、SINTER …
h 命令维度 哈希专属命令 HSET、HDEL、HINCRBY …
z 命令维度 有序集合专属命令 ZADD、ZREM、ZINCRBY …
t 命令维度 Stream 专属命令 XADD、XDEL、XTRIM …
d 命令维度 模块键类型事件(Module key) 自定义模块产生的键
x 生命周期 过期事件(expired) 键因 TTL 到达被删除
e 生命周期 淘汰事件(evicted) 键因 maxmemory 策略被逐出
m 生命周期 未命中事件(miss) 访问了一个不存在的键
n 生命周期 新建事件(new) 第一次向一个键写入值
A 快捷别名 相当于 g$lshztxed 的集合(不含 m、n) 想“一键全开”时常用

事件是怎么样发送的?

本质:Redis 发布/订阅

执行命令/键修改时 -> 触发对应回调-> 调用 publish 向固定的频道(__keyspace@<db>__:<key>||__keyevent@<db>__:<event>)发送消息事件。

口诀:“双下划线,keyspace / keyevent 二选一;@库号,冒号后跟事件名;过期找 expired,淘汰找 evicted。”

维度 频道格式 示例(库 0) 说明
keyspace __keyspace@<db>__:<key> __keyspace@0__:mylock 告诉你“哪个键发生了事”
keyevent __keyevent@<db>__:<event> __keyevent@0__:expired 告诉你“什么事发生了

不同命令产生的事件:官方文档 https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/#events-generated-by-different-commands

**重要提示:**所有命令仅在目标键真正被修改时才会生成事件。例如, DELETE 删除一个不存在的元素并不会真正改变该键的值,因此不会生成任何事件。

监听过期key的时候只需要关注:expired

补充:每次根据**maxmemory**策略从数据集中逐出一个键以释放内存时(内存不够,驱逐未使用过的key): <font style="color:rgb(17, 24, 39);">evicted

实测

监听:**psubscribe __key*__:***

127.0.0.1:6379> psubscribe __key*__:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"     # 通过模式频道监听
2) "__key*__:*"     # 监听的模式平道
3) (integer) 1      # 当前客户端监听的频道

另外启动一个客户端

127.0.0.1:6379> set hello world;  # 创建hello这个键
OK
127.0.0.1:6379> expire hello 10   # 给hello这个键设置一个过期时间
(integer) 1
127.0.0.1:6379>

观察监听的哪个客户端

1) "pmessage"         				# 发送消息
2) "__key*__:*"								# 发送消息的模式频道
3) "__keyspace@0__:hello"			# 发送消息的实际频道
4) "set"											# 发送消息的内容   这里表示对hello这个key进行了set操作
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:set"
4) "hello"										# 这里表示通过set命名操作了hello这个key
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:hello"
4) "expire"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expire"		# 这里表示通过expire对hello设置了一个过期时间
4) "hello"
1) "pmessage"									
2) "__key*__:*"
3) "__keyspace@0__:hello"			
4) "expired"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expired"		# hello 这个key过期后会触发发送事件 
4) "hello"

了解以上内容后我们便可以监听事件来实现业务中的一些功能了。但是注意redis 发布/订阅是不可靠的。他不能保证一定会你发消息。如果要用一定要添加补偿机制。

使用场景

  1. 订单超市未支付-----订单的有效期为ttl,ttl到期后处理监听事件,来做关闭订单,解锁库存
  2. 配置刷新-----读取配置后给配置一个ttl监听该key,如果ttl则重新拉取配置
  3. 限流窗口----重新计数–可以做一个简单的限流-> 当某一个第一次接受到请求的时候往redis添加一个key并将value给1,并设置ttl为60,然后再接受同样请求的时候对这个key的value+1,如果该key的value值大于某个界限的时候可以在程序中直接拦截掉->如果该key过期了,可以通过监听重新设置该key的value为1。如此循环。

缺点

  1. 消息丢失:redis pub/sub 是无持久化。
  2. 消息重复消费:多节点下多个会同时收到过期key的消息
  3. 大key同时过期:节点压力会剧增
  4. 只发一次:过期后立刻就发送消息,当时没收到以后也不会收到。

在SpringBoot应用

Java:21
SpringBoot:3.5.5
Redis-client:Spring-data-redis 3.5.5+Lettuce-6.x

依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

KeyspaceEventMessageListener

这是spring-data-redis 内置的一个 抽象类KeyspaceEventMessageListener。只需要继承该类,并重写他的doHandleMessage方法即可监听__keyevent@*

public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {
    // 监听的渠道
	private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");
        
    public void init() {

		RedisConnectionFactory connectionFactory = listenerContainer.getConnectionFactory();
        
		if (StringUtils.hasText(keyspaceNotificationsConfigParameter) && connectionFactory != null) {

			try (RedisConnection connection = connectionFactory.getConnection()) {

				RedisServerCommands commands = connection.serverCommands();
				Properties config = commands.getConfig("notify-keyspace-events");
                // 这个是配置是否包含notify-keyspace-events 用于为连接动态打开键空间通知
				if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
					commands.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
				}
			}
		}
        // 1.注册
		doRegister(listenerContainer);
	}

	/**
	 * Register instance within the container.
	 *
	 * @param container never {@literal null}.
	 */
	protected void doRegister(RedisMessageListenerContainer container) {
        // 2.向listener容器中添加该渠道表示监听渠道
		listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
	}

    @Override
	public void onMessage(Message message, @Nullable byte[] pattern) {
        
		if (ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {
			return;
		}
        
		doHandleMessage(message);
	}

	/**
	 * Handle the actual message
	 * 实际消费消息的地方
	 * @param message never {@literal null}.
	 */
	protected abstract void doHandleMessage(Message message);
}
样例
// 注意要讲该对象添加为Bean对象
@Slf4j
@Component 
public class ConsumeKeyspaceEventMessageListenerImpl extends KeyspaceEventMessageListener {

    /**
     * Creates new {@link KeyspaceEventMessageListener}.
     *
     * @param listenerContainer must not be {@literal null}.
     */
    public ConsumeKeyspaceEventMessageListenerImpl(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    protected void doHandleMessage(Message message) {
        log.info("channel:{},body:{}", new String(message.getChannel()), new String(message.getBody()));
    }
}

KeyExpirationEventMessageListener

键过期的专用消息监听器

它继承了KeyspaceEventMessageListener只关注__keyevent@*__:expired

它默认给我们实现的操作是:发送一个event(RedisKeyExpiredEvent)给Spring容器。让Spring的事件机制再去处理。

public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener
		implements ApplicationEventPublisherAware {
    // 只关注该频道
	private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

    private @Nullable ApplicationEventPublisher publisher;

    @Override
	protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        // 1 注册
		listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
	}

    // 默认实现
	@Override
	protected void doHandleMessage(Message message) {
        // // 2.  将内容包装RedisKeyExpiredEvent为发送给Spring容器
		publishEvent(new RedisKeyExpiredEvent(message.getBody()));
	}

	/**
	 * Publish the event in case an {@link ApplicationEventPublisher} is set.
	 *
	 * @param event can be {@literal null}.
	 */
	protected void publishEvent(RedisKeyExpiredEvent event) {

		if (publisher != null) {
			this.publisher.publishEvent(event);
		}
	}
    
}

如果我们自己想对键过期做专门的处理,有两种方式

  1. 直接通过@EventListener处理RedisKeyExpiredEvent该事件即可。
  2. 继续KeyExpirationEventMessageListener 重写doHandleMessage即可。
样例
@EventListener

要保证容器中存在KeyExpirationEventMessageListener的组件才会在key过期的时候发送RedisKeyExpiredEvent

@Bean
public KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer){
    return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
}

@Bean
public RedisKeyExpiredEventListener redisKeyExpiredEventListener() {
    return new RedisKeyExpiredEventListener();
}


@Slf4j
public class RedisKeyExpiredEventListener {

    /**
     * 通过Spring的EventListener处理过期key
     *
     * @param event
     */
    @EventListener
    public void onMessage(RedisKeyExpiredEvent<byte[]> event) {
        log.info("expired key: {}", new String(event.getId()));
        log.info("timestamp: {}", event.getTimestamp());
    }
}

继承并重写

需要重写doHandleMessage来实现自定义逻辑

@Bean
public ConsumeKeyExpirationEventMessageListener consumeKeyExpirationEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {
    return new ConsumeKeyExpirationEventMessageListener(redisMessageListenerContainer);
}
@Slf4j
public class ConsumeKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {
    /**
     * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
     *
     * @param listenerContainer must not be {@literal null}.
     */
    public ConsumeKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    /**
     * 通过继承并重写doHandleMessage来达到监听某个key过期
     * @param message never {@literal null}.
     */
    @Override
    protected void doHandleMessage(Message message) {
        // 保留发送RedisKeyExpiredEvent事件,以便其他地方使用
        super.doHandleMessage(message);

        log.info("expired key: {}", new String(message.getBody()));
        log.info("channel: {}", new String(message.getChannel()));
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐