Java响应式(反应式)编程——RxJava
响应式编程是一种基于异步数据流概念的编程模式
·
相关资源
RxJava文档:https://github.com/ReactiveX/RxJava/wiki
RxJava中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava经典资料:https://github.com/lzyzsd/Awesome-RxJava
ReactiveX:https://reactivex.io/
响应式编程概述
响应式编程?是一种基于异步数据流概念的编程模式
关键概念:事件
适用场景:UI(通用)
RxJava
- 异步数据处理库
- 扩展的观察者模式
RxJava与观察者模式
观察者模式的四大要素:
- Observable:被观察者
- Observer:观察者
- Subscribe:订阅
- Event:事件
入门示例
导入RxJava:
/*
// https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava
implementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.11'
// https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava
implementation group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.21'
*/
// 使用课程中的老版本:
// https://mvnrepository.com/artifact/io.reactivex/rxjava
implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8'
Hellowrld示例:
package top.onefine.rxjava;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;
@Slf4j
public class HelloWorld {
public static void main(String[] args) {
// 1. 创建被观察者
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Hello world.");
throw new NullPointerException("Throw a Exception...");
// subscriber.onCompleted();
});
// 2. 创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
};
// 3. 订阅事件
observable.subscribe(subscriber);
}
}
输出:
11:27:08.971 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello world.
11:27:08.974 [main] INFO top.onefine.rxjava.HelloWorld - onError...
操作符分类
Creating Observables(创建Observable)
示例:
package top.onefine.rxjava;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;
@Slf4j
public class HelloWorld {
public static void main(String[] args) {
// create_demo();
// just_demo();
// from_demo();
// defer_demo();
// range_demo();
repeat_demo();
}
/*
14:16:59.385 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
14:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
14:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
14:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void repeat_demo() {
Observable.range(1, 3).repeat(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer arg) {
log.info("onNext: {}", arg);
}
});
}
/*
14:15:30.089 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
14:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void range_demo() {
Observable.range(1, 5)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer arg) {
log.info("onNext: {}", arg);
}
});
}
/*
14:11:17.146 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello World.
14:11:17.149 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static String value;
private static void defer_demo() {
Observable<String> observable = Observable.defer(() -> Observable.just(value));
value = "Hello World.";
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
});
}
/*
11:49:42.936 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
11:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void from_demo() {
// 文档:https://reactivex.io/documentation/operators/from.html
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer arg) {
log.info("onNext: {}", arg);
}
});
}
/*
11:43:49.006 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...
11:43:49.008 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void just_demo() {
Observable.just("RxJava学习...")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
});
}
/*
11:44:26.187 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...
*/
private static void create_demo() {
Observable.create((Observable.OnSubscribe<String>) subscriber -> subscriber.onNext("RxJava学习..."))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
});
}
}
Transforming Observables(转换Obervable)
示例:
package top.onefine.rxjava;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.observables.GroupedObservable;
import java.util.List;
@Slf4j
public class HelloWorld {
public static void main(String[] args) {
// map_demo();
// flatMap_demo();
// groupBy_demo();
// buffer_demo();
scan_demo();
}
/*
14:54:20.924 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 15
14:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void scan_demo() {
Observable.range(1, 5).scan(Integer::sum)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
14:50:27.586 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [1, 2]
14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [3, 4]
14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [5]
14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void buffer_demo() {
Observable.range(1, 5).buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(List<Integer> i) {
log.info("onNext: {}", i);
}
});
}
/*
14:40:24.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1 ...1
14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2 ...0
14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3 ...1
14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4 ...0
14:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5 ...1
14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...0
14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...1
14:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void groupBy_demo() {
Observable.just(1, 2, 3, 4, 5).groupBy(arg -> String.valueOf(arg % 2))
.subscribe(new Observer<GroupedObservable<String, Integer>>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(GroupedObservable<String, Integer> result) {
result.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...{}", result.getKey());
}
@Override
public void onError(Throwable e) {
log.info("onError...{}", result.getKey());
}
@Override
public void onNext(Integer i) {
log.info("onNext: {} ...{}", i, result.getKey());
}
});
}
});
}
/*
14:30:30.782 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 1 fine.
14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 2 fine.
14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 3 fine.
14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 4 fine.
14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 5 fine.
14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void flatMap_demo() {
Observable.just(1, 2, 3, 4, 5).flatMap(arg -> Observable.just("one " + arg + " fine."))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
});
}
/*
14:24:51.902 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 123 fine
14:24:51.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void map_demo() {
Observable.just(123).map(arg -> "one " + arg + " fine")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
});
}
}
Filtering Observables(过滤Observable)
示例:
package top.onefine.rxjava;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;
import java.util.concurrent.TimeUnit;
@Slf4j
public class HelloWorld {
public static void main(String[] args) {
// debounce_demo();
// distinct_demo();
// elementAt_demo();
// filter_demo();
// first_demo();
// ignoreElements_demo();
// last_demo();
// sample_demo();
// skip_demo();
// skipLast_demo();
// take_demo();
takeLast_demo();
}
/*
15:32:23.691 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
15:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
15:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void takeLast_demo() {
Observable.just(1, 2, 3, 4).takeLast(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:31:39.259 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
15:31:39.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
15:31:39.262 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void take_demo() {
Observable.just(1, 2, 3, 4).take(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:30:16.202 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
15:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
15:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void skipLast_demo() {
Observable.just(1, 2, 3, 4).skipLast(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:29:14.802 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
15:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
15:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void skip_demo() {
Observable.just(1, 2, 3, 4).skip(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:26:59.758 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 2
15:27:03.757 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 6
15:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
15:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void sample_demo() {
Observable.create((Observable.OnSubscribe<Integer>) arg -> {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg.onNext(i);
}
arg.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
arg.onError(e);
}
}).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:24:49.731 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
15:24:49.734 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void last_demo() {
Observable.just(1, 2, 3, 2, 3).last()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:23:42.135 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void ignoreElements_demo() {
Observable.just(123).ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:18:04.901 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
15:18:04.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void first_demo() {
Observable.just(1, 2, 3, 2, 3).distinct().first()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:16:14.458 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
15:16:14.461 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void filter_demo() {
Observable.just(1, 2, 3, 2, 3).distinct().filter(i -> i > 2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:14:13.951 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
15:14:13.954 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void elementAt_demo() {
Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).elementAt(3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:12:27.291 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
15:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void distinct_demo() {
Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).distinct()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:06:56.929 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
15:06:56.932 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void debounce_demo() {
Observable.create((Observable.OnSubscribe<Integer>) arg -> {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg.onNext(i);
}
arg.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).debounce(1, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
}
Combining Observables(组合Observable)
示例:
package top.onefine.rxjava;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;
@Slf4j
public class HelloWorld {
public static void main(String[] args) {
// zip_demo();
// merge_demo();
// startWith_demo();
combineLatest_demo();
}
/*
16:05:38.919 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:4
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 7
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:5
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 8
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:6
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 9
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:7
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
16:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void combineLatest_demo() {
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);
Observable.combineLatest(observable1, observable2, (i1, i2) -> {
log.info("i1:{}, i2:{}", i1, i2);
return i1 + i2;
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
16:00:48.097 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 6
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 7
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3
16:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void startWith_demo() {
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);
observable1.startWith(observable2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:58:26.334 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 10
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 20
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 30
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 8
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 12
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 16
15:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void merge_demo() {
Observable<Integer> observable1 = Observable.just(10, 20, 30);
Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
Observable.merge(observable1, observable2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
/*
15:49:52.520 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 14
15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 28
15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 42
15:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...
*/
private static void zip_demo() {
// 用来合并两个Observable发射的数据源,根据Func2函数生成一个新的值并发射出去。
// 当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发射数据。
Observable<Integer> observable1 = Observable.just(10, 20, 30);
Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, Integer::sum)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log.info("onCompleted...");
}
@Override
public void onError(Throwable e) {
log.info("onError...");
}
@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}
});
}
}
Error Handling Operators(处理错误)
更多推荐
所有评论(0)