相关资源

RxJava文档:https://github.com/ReactiveX/RxJava/wiki
RxJava中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava经典资料:https://github.com/lzyzsd/Awesome-RxJava

ReactiveX:https://reactivex.io/

响应式编程概述

响应式编程?是一种基于异步数据流概念的编程模式

关键概念:事件

适用场景:UI(通用)

RxJava

  • 异步数据处理库
  • 扩展的观察者模式

在这里插入图片描述
在这里插入图片描述

RxJava与观察者模式

观察者模式的四大要素:

  • Observable:被观察者
  • Observer:观察者
  • Subscribe:订阅
  • Event:事件

在这里插入图片描述
在这里插入图片描述

入门示例

导入RxJava:

/*
// https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava
implementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.11'

// https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava
implementation group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.21'
*/

// 使用课程中的老版本:
// https://mvnrepository.com/artifact/io.reactivex/rxjava
implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8'

Hellowrld示例:

package top.onefine.rxjava;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;

@Slf4j
public class HelloWorld {

    public static void main(String[] args) {
        // 1. 创建被观察者
        Observable<String> observable = Observable.create(subscriber -> {
            subscriber.onNext("Hello world.");
            throw new NullPointerException("Throw a Exception...");
//            subscriber.onCompleted();
        });

        // 2. 创建观察者
        Subscriber<String> subscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {
                log.info("onCompleted...");
            }

            @Override
            public void onError(Throwable e) {
                log.info("onError...");
            }

            @Override
            public void onNext(String s) {
                log.info("onNext: {}", s);
            }
        };

        // 3. 订阅事件
        observable.subscribe(subscriber);
    }
}

输出:

11:27:08.971 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello world.
11:27:08.974 [main] INFO top.onefine.rxjava.HelloWorld - onError...

操作符分类

在这里插入图片描述

Creating Observables(创建Observable)

在这里插入图片描述
示例:

package top.onefine.rxjava;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;

@Slf4j
public class HelloWorld {

    public static void main(String[] args) {
//        create_demo();
//        just_demo();
//        from_demo();
//        defer_demo();
//        range_demo();
        repeat_demo();
    }

    /*
    14:16:59.385 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    14:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    14:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void repeat_demo() {
        Observable.range(1, 3).repeat(2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer arg) {
                        log.info("onNext: {}", arg);
                    }
                });
    }

    /*
    14:15:30.089 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
    14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void range_demo() {
        Observable.range(1, 5)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer arg) {
                        log.info("onNext: {}", arg);
                    }
                });
    }

    /*
    14:11:17.146 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello World.
    14:11:17.149 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static String value;

    private static void defer_demo() {
        Observable<String> observable = Observable.defer(() -> Observable.just(value));

        value = "Hello World.";

        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                log.info("onCompleted...");
            }

            @Override
            public void onError(Throwable e) {
                log.info("onError...");
            }

            @Override
            public void onNext(String s) {
                log.info("onNext: {}", s);
            }
        });
    }

    /*
    11:49:42.936 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
    11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void from_demo() {
        // 文档:https://reactivex.io/documentation/operators/from.html
        Observable.from(new Integer[]{1, 2, 3, 4, 5, 6})
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer arg) {
                        log.info("onNext: {}", arg);
                    }
                });
    }

    /*
    11:43:49.006 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...
    11:43:49.008 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void just_demo() {
        Observable.just("RxJava学习...")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(String s) {
                        log.info("onNext: {}", s);
                    }
                });
    }

    /*
    11:44:26.187 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...
     */
    private static void create_demo() {
        Observable.create((Observable.OnSubscribe<String>) subscriber -> subscriber.onNext("RxJava学习..."))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(String s) {
                        log.info("onNext: {}", s);
                    }
                });
    }
}

Transforming Observables(转换Obervable)

在这里插入图片描述
示例:

package top.onefine.rxjava;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.observables.GroupedObservable;

import java.util.List;

@Slf4j
public class HelloWorld {

    public static void main(String[] args) {
//        map_demo();
//        flatMap_demo();
//        groupBy_demo();
//        buffer_demo();
        scan_demo();
    }


    /*
    14:54:20.924 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
    14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
    14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 15
    14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void scan_demo() {
        Observable.range(1, 5).scan(Integer::sum)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }

                });
    }

    /*
    14:50:27.586 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [1, 2]
    14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [3, 4]
    14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [5]
    14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void buffer_demo() {
        Observable.range(1, 5).buffer(2)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(List<Integer> i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    14:40:24.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1 ...1
    14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2 ...0
    14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3 ...1
    14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4 ...0
    14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5 ...1
    14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...0
    14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...1
    14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void groupBy_demo() {
        Observable.just(1, 2, 3, 4, 5).groupBy(arg -> String.valueOf(arg % 2))
                .subscribe(new Observer<GroupedObservable<String, Integer>>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(GroupedObservable<String, Integer> result) {
                        result.subscribe(new Subscriber<Integer>() {
                            @Override
                            public void onCompleted() {
                                log.info("onCompleted...{}", result.getKey());
                            }

                            @Override
                            public void onError(Throwable e) {
                                log.info("onError...{}", result.getKey());
                            }

                            @Override
                            public void onNext(Integer i) {
                                log.info("onNext: {} ...{}", i, result.getKey());
                            }
                        });
                    }
                });
    }

    /*
    14:30:30.782 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 1 fine.
    14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 2 fine.
    14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 3 fine.
    14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 4 fine.
    14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 5 fine.
    14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void flatMap_demo() {
        Observable.just(1, 2, 3, 4, 5).flatMap(arg -> Observable.just("one " + arg + " fine."))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(String s) {
                        log.info("onNext: {}", s);
                    }
                });
    }

    /*
    14:24:51.902 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 123 fine
    14:24:51.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void map_demo() {
        Observable.just(123).map(arg -> "one " + arg + " fine")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(String s) {
                        log.info("onNext: {}", s);
                    }
                });
    }

}

Filtering Observables(过滤Observable)

在这里插入图片描述
示例:

package top.onefine.rxjava;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;

import java.util.concurrent.TimeUnit;

@Slf4j
public class HelloWorld {

    public static void main(String[] args) {
//        debounce_demo();
//        distinct_demo();
//        elementAt_demo();
//        filter_demo();
//        first_demo();
//        ignoreElements_demo();
//        last_demo();
//        sample_demo();
//        skip_demo();
//        skipLast_demo();
//        take_demo();
        takeLast_demo();
    }

    /*
    15:32:23.691 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    15:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    15:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void takeLast_demo() {
        Observable.just(1, 2, 3, 4).takeLast(2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:31:39.259 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    15:31:39.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    15:31:39.262 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void take_demo() {
        Observable.just(1, 2, 3, 4).take(2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:30:16.202 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    15:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    15:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void skipLast_demo() {
        Observable.just(1, 2, 3, 4).skipLast(2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:29:14.802 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    15:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    15:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void skip_demo() {
        Observable.just(1, 2, 3, 4).skip(2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:26:59.758 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    15:27:03.757 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 6
    15:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
    15:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void sample_demo() {
        Observable.create((Observable.OnSubscribe<Integer>) arg -> {
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    arg.onNext(i);
                }
                arg.onCompleted();
            } catch (InterruptedException e) {
                e.printStackTrace();
                arg.onError(e);
            }
        }).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                log.info("onCompleted...");
            }

            @Override
            public void onError(Throwable e) {
                log.info("onError...");
            }

            @Override
            public void onNext(Integer i) {
                log.info("onNext: {}", i);
            }
        });
    }

    /*
    15:24:49.731 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    15:24:49.734 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void last_demo() {
        Observable.just(1, 2, 3, 2, 3).last()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:23:42.135 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void ignoreElements_demo() {
        Observable.just(123).ignoreElements()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }


    /*
    15:18:04.901 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    15:18:04.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void first_demo() {
        Observable.just(1, 2, 3, 2, 3).distinct().first()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:16:14.458 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    15:16:14.461 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void filter_demo() {
        Observable.just(1, 2, 3, 2, 3).distinct().filter(i -> i > 2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:14:13.951 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    15:14:13.954 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void elementAt_demo() {
        Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).elementAt(3)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:12:27.291 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
    15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void distinct_demo() {
        Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).distinct()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:06:56.929 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
    15:06:56.932 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void debounce_demo() {
        Observable.create((Observable.OnSubscribe<Integer>) arg -> {
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    arg.onNext(i);
                }
                arg.onCompleted();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                log.info("onCompleted...");
            }

            @Override
            public void onError(Throwable e) {
                log.info("onError...");
            }

            @Override
            public void onNext(Integer i) {
                log.info("onNext: {}", i);
            }
        });
    }
}

Combining Observables(组合Observable)

在这里插入图片描述
示例:

package top.onefine.rxjava;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;


@Slf4j
public class HelloWorld {

    public static void main(String[] args) {
//        zip_demo();
//        merge_demo();
//        startWith_demo();
        combineLatest_demo();
    }

    /*
    16:05:38.919 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:4
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 7
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:5
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 8
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:6
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:7
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
    16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void combineLatest_demo() {
        Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);
        Observable.combineLatest(observable1, observable2, (i1, i2) -> {
            log.info("i1:{}, i2:{}", i1, i2);
            return i1 + i2;
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                log.info("onCompleted...");
            }

            @Override
            public void onError(Throwable e) {
                log.info("onError...");
            }

            @Override
            public void onNext(Integer i) {
                log.info("onNext: {}", i);
            }
        });
    }

    /*
    16:00:48.097 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 7
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
    16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void startWith_demo() {
        Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);
        observable1.startWith(observable2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:58:26.334 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 20
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 30
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 8
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 12
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 16
    15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void merge_demo() {
        Observable<Integer> observable1 = Observable.just(10, 20, 30);
        Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
        Observable.merge(observable1, observable2)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

    /*
    15:49:52.520 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 14
    15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 28
    15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 42
    15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
     */
    private static void zip_demo() {
        // 用来合并两个Observable发射的数据源,根据Func2函数生成一个新的值并发射出去。
        // 当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发射数据。
        Observable<Integer> observable1 = Observable.just(10, 20, 30);
        Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable1, observable2, Integer::sum)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log.info("onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.info("onError...");
                    }

                    @Override
                    public void onNext(Integer i) {
                        log.info("onNext: {}", i);
                    }
                });
    }

}

Error Handling Operators(处理错误)

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐