springboot 自定义flux编程,流式返回结果
一、pom引入依赖配置。
·
一、pom引入依赖配置
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.23</version>
</dependency>
二、代码实例
@RestController
@RequestMapping(value = "/demo")
@Api(tags = "测试demo")
public class DemoController {
@GetMapping(value = "/demo1")
public Flux<String> demo1() {
Sinks.Many<String> dataSink = Sinks.many().multicast().onBackpressureBuffer();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
// 创建一个实现了 Runnable 接口的任务
Runnable task = () -> {
String t = "Data @ " + DateUtil.getTime();
// System.out.println("定时任务执行,当前时间: " + t);
// 发送数据到流
Sinks.EmitResult emitResult = dataSink.tryEmitNext(t);
System.out.println(emitResult.isFailure());
System.out.println(emitResult.isSuccess());
System.out.println("------------------------------");
};
// 延迟 1 秒后开始执行任务,之后每隔 2 秒执行一次
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
return dataSink.asFlux().doOnSubscribe(subscription -> {
System.out.println("订阅了");
}).doOnCancel(() -> {
//当客户端断开后,服务端释放资源
System.out.println("---取消订阅了");
dataSink.tryEmitComplete();
scheduledExecutorService.shutdownNow();
});
}
}
三、浏览器运行结果
四、使用okhttp调用
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient();
// 创建 JSON 请求体
Request request = new Request.Builder().url("http://127.0.0.1:10021/demo/demo1").header("Authorization", "1").get().build();
EventSourceListener listener = new EventSourceListener() {
@Override
public void onOpen(EventSource eventSource, Response response) {
System.out.println("SSE connection opened");
}
@Override
public void onClosed(EventSource eventSource) {
System.out.println("SSE connection closed");
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
System.out.println(type + "-------" + data);
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
System.err.println("Error occurred: " + t.getMessage());
}
};
EventSource.Factory factory = EventSources.createFactory(client);
factory.newEventSource(request, listener);
}
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.12.0</version>
</dependency>
更多推荐
所有评论(0)