一、pom引入依赖配置

<dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.23</version>
        </dependency>

二、代码实例

@RestController
@RequestMapping(value = "/demo")
@Api(tags = "测试demo")
public class DemoController {


    @GetMapping(value = "/demo1")
    public Flux<String> demo1() {
        Sinks.Many<String> dataSink = Sinks.many().multicast().onBackpressureBuffer();

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // 创建一个实现了 Runnable 接口的任务
        Runnable task = () -> {
            String t = "Data @ " + DateUtil.getTime();
//            System.out.println("定时任务执行,当前时间: " + t);
            // 发送数据到流
            Sinks.EmitResult emitResult = dataSink.tryEmitNext(t);
            System.out.println(emitResult.isFailure());
            System.out.println(emitResult.isSuccess());
            System.out.println("------------------------------");
        };

        // 延迟 1 秒后开始执行任务,之后每隔 2 秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);


        return dataSink.asFlux().doOnSubscribe(subscription -> {
            System.out.println("订阅了");
        }).doOnCancel(() -> {
            //当客户端断开后,服务端释放资源
            System.out.println("---取消订阅了");
            dataSink.tryEmitComplete();
            scheduledExecutorService.shutdownNow();
        });
    }

}

三、浏览器运行结果

四、使用okhttp调用

public static void main(String[] args) {
        OkHttpClient client = new OkHttpClient();

        // 创建 JSON 请求体
        Request request = new Request.Builder().url("http://127.0.0.1:10021/demo/demo1").header("Authorization", "1").get().build();

        EventSourceListener listener = new EventSourceListener() {
            @Override
            public void onOpen(EventSource eventSource, Response response) {
                System.out.println("SSE connection opened");
            }

            @Override
            public void onClosed(EventSource eventSource) {
                System.out.println("SSE connection closed");
            }

            @Override
            public void onEvent(EventSource eventSource, String id, String type, String data) {
                System.out.println(type + "-------" + data);
            }

            @Override
            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                System.err.println("Error occurred: " + t.getMessage());
            }
        };

        EventSource.Factory factory = EventSources.createFactory(client);
        factory.newEventSource(request, listener);
    }

 

<dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>4.12.0</version>
        </dependency>

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐