Spring整合MongoDB(八)----变更流Change Streams
从MongoDB 3.6开始,Change Streams可以让应用程序获得更改通知,而无需跟踪(tail)操作日志(oplog)。更改流支持仅适用于副本集(replica sets)或分片(sharded)集群。Change Streams可以与命令式和反应式(reactive)MongoDB Java driver一起使用。强烈建议使用反应式变体,因为它的资源密集度较低。
【Spring连载】使用Spring Data访问 MongoDB(八)----变更流Change Streams
从MongoDB 3.6开始,Change Streams可以让应用程序获得更改通知,而无需跟踪(tail)操作日志(oplog)。
更改流支持仅适用于副本集(replica sets)或分片(sharded)集群。
Change Streams可以与命令式和反应式(reactive)MongoDB Java driver一起使用。强烈建议使用反应式变体,因为它的资源密集度较低。然而,如果你不能使用反应式API,你仍然可以通过使用Spring生态系统中已经流行的消息(messaging)传递概念来获取更改事件。
可以在集合和数据库级别上进行监视,而数据库级别的变体发布数据库中所有集合的更改。订阅数据库变更流时,请确保为事件类型使用合适的类型,因为转换可能无法正确应用于不同的实体类型。如有疑问,请使用Document。
一、使用MessageListener的Change Streams
通过使用Driver侦听变更流会创建一个长时间运行的阻塞任务,该任务需要委托给一个单独的组件。在这种情况下,我们需要首先创建一个MessageListenerContainer,它将是运行特定SubscriptionRequest任务的主要入口点。Spring Data MongoDB已经附带了一个默认实现,该实现在MongoTemplate上运行,能够为ChangeStreamRequest创建和运行Task实例。
以下示例展示如何将Change Streams与MessageListener实例一起使用:
例1:使用MessageListener实例的变更流
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); --------1
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; --------2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); --------3
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); --------4
// ...
container.stop(); --------5
1. 启动容器将初始化资源,并启动已注册SubscriptionRequest实例的Task实例。启动后添加的Requests会立即运行。
2. 定义接收消息时调用的监听器。Message#getBody()被转换为请求的域类型。使用“Document”接收未经转换的原始结果。
3. 设置要监听的集合,并通过ChangeStreamOptions提供其他选项。
4. 注册请求。返回的订阅可用于检查当前任务状态并取消以释放资源。
5. 一旦确定不再需要容器,请不要忘记停止它。这样做会停止容器中所有正在运行的Task实例。
处理时的错误会传递到org.springframework.util.ErrorHandler。如果未另行说明,则默认情况下会应用追加日志的ErrorHandler。
请使用register(request, body, errorHandler)提供其他功能。
二、反应式(Reactive) Change Streams
使用Reactive API订阅Change Streams是处理流的一种更自然的方法。尽管如此,基本的构建块(如ChangeStreamOptions)仍然保持不变。以下示例展示了如何使用发出ChangeStreamEvents的Change Streams:
例2:发出ChangeStreamEvent的变更流
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) --------1
.watchCollection("people")
.filter(where("age").gte(38)) --------2
.listen(); --------3
1. 基础document应转换为的事件目标类型。忽略此项可接收未转换的原始结果。
2. 使用聚合管道(aggregation pipeline)或仅使用查询条件来筛选事件。
3. 获取变更流事件的Flux。ChangeStreamEvent#getBody()被转换为(2)中请求的域类型。
三、恢复Change Streams
更改流可以恢复,并在你离开的位置继续发送事件。要恢复流,你需要提供恢复令牌或上次已知的服务器时间(UTC)。使用ChangeStreamOptions相应地设置值。
以下示例展示了如何使用服务器时间设置恢复偏移量:
例3:恢复变更流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) --------1
.listen();
1. 你可以通过getTimestamp方法获得ChangeStreamEvent的服务器时间,也可以使用通过getResumeToken暴露的resumeToken。
在某些情况下,当恢复变更流时,“Instant”可能不是一个足够精确的度量。为此,请使用MongoDB native BsonTimestamp。
更多推荐
所有评论(0)