友情链接:
demo地址:1. 概述:
用官网的一句话:"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。也就是咱们常说的链式编程
2. Rxjava的好处
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁(本段内容摘自)。
3. 观察者和被观察者
- Observable(被观察者)/Observer(观察者) Obsesrver用来连接这两个
- Flowable(被观察者)/Subscriber(观察者) (2.0出现的支持背压) subscribe用来连接这两个
4. 简单使用
发射源有多少个onNext就会发射多少次,onComplete 和 onError是冲突的两个方法,有你没我,有我没你 如果在onComplete或者onError调用OnNext方法不会再起作用
//创建被观察者 Observableobservable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); e.onNext(3); //e.onError(new Throwable()); } }); //创建观察者 Observer observer = new Observer () { @Override public void onSubscribe(Disposable d) { Log.i(TAG,"onSubscribe订阅了"); } @Override public void onNext(Integer integer) { Log.i(TAG,"onNext"+integer); } @Override public void onError(Throwable e) { Log.i(TAG,"onError"); } @Override public void onComplete() { Log.i(TAG,"onComplete"); } }; // 开始订阅 observable.subscribe(observer);复制代码
打印出来的Log为
onSubscribe订阅了onNext1onNext2onComplete复制代码
4.1 Observer(观察者)中的方法
- onSubscribe(Disposable d) 当订阅到被观察者的时候调用 , Disposable 用来解除订阅的,防止内存泄漏
- onNext(T t) 被观察者发送OnNext方法的时候调用
- onComplete() 当被观察者调用onComplete方法时执行
- onError(Throwable e) 当被观察者调用onError方法时执行
subscribe()方法内可以传递的东西
- Consumer<? super T> onNext 表示被观察者只关注onNext
- Consumer<? super T> onNext, Consumer<? super Throwable> onError) 表示被观察者只关注onNext 和 onError
- Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 表示被观察者只关注onNext 和 onError 和 onComplete
- Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) 表示被观察者四个方法都关注
5. 大致的操作符分为以下几点:
当然了下面的也不是所有的操作符
创建型 | Create | Just | fromIterable | Timer | Interval | Repeat |
转化型 | Map | FlatMap | Buffer | Scan | Window | GroupBy |
过滤型 | Filter | Distince | Skip | Take | Last | Debounce |
组合型 | Zip | Join | And | Switch | Merge | StartWith |
错误处理性 | Retry | Catch | ||||
辅助型 | SubscribeOn | ObserveOn | Timer | Interval | DoOnNext | Delay |
条件和布尔 | All | SkipUntil | TakeUntil | Contains | Amb | |
算数和聚合型 | Conact | Count | Max | Min | Sum | |
连接型 | Connect | Publish | Replay | RefCount | ||
异步操作 | Start | ToAsync | StartFuture | FromAction | FromCallable | RunAsync |
阻塞操作 | ForEach | Firsh | Last | MostRecent | Next | Single |
字符串操作 | Split | Decode | Encode | Join | Form | ByLine |
6. 创建型(Creating): 也就是创建 Observable (被观察者)
6.1 Create (表示只发送OnNext方法)
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { Log.i(TAG,integer+""); } });复制代码
打印出来
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 112-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 212-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3复制代码
6.2 just (将传入的参数依次发送出来)
Observable.just(1,2,3).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG,integer+""); } });复制代码
打印出来
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 112-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 212-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3复制代码
6.3 fromIterable (将Iterable中的对象依次发送出去)
同样 fromArray 是将 数组 中的数据依次发送出去
ArrayListarrayList = new ArrayList<>(); for(int i = 0;i<3;i++) { arrayList.add(""+i+i); } Observable.fromIterable(arrayList).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { Log.i(TAG,s+""); } });复制代码
打印出来
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 0012-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1112-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 22复制代码
6.4 Timer (它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法)
// DAYS,HOURS,MICROSECONDS,MILLISECONDS,MINUTES,NANOSECONDS,SECONDS;// 天 小时 微秒 毫秒 分钟 纳秒 秒 final long start = System.currentTimeMillis(); Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { long end = System.currentTimeMillis(); Log.i(TAG,"时间差:"+(end-start)+"ms"); } });复制代码
打印出来
12-12 05:20:22.483 20471-20629/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 时间差:1008ms复制代码
6.5 Interval (创建一个按固定时间间隔发射整数序列)
可以用来当做计时器,或者间隔性请求网络数据
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG,""+aLong); } });复制代码
打印出来
12-12 05:25:35.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 012-12 05:25:36.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 112-12 05:25:37.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 212-12 05:25:38.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 312-12 05:25:39.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 412-12 05:25:40.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 5.........复制代码
6.6 repeat (创建一个重复发射特定数据的Observable)
可以用来当做计时器,或者间隔性请求网络数据
Observable.just(1).repeat(2).subscribe(new Consumer() { @Override public void accept(Integer i) throws Exception { mCount++; Log.i(TAG,"第:"+mCount+"次"+"数据为:"+i); } });复制代码
打印出来
12-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:1次数据为:112-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:2次数据为:1复制代码
7. 既然讲了创建被观察者(Observable)和观察者(Observer), 那么先讲一下Schedulers线程调度器
如果Observable默认的是在主线程中,Observer默认跟随Observable的线程
-
Schedulers.computation()
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 -
Schedulers.newThread()
开启一个新的线程
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName()); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName()); } });复制代码
打印出来
12-13 06:59:25.085 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-212-13 06:59:25.086 11730-12573/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxNewThreadScheduler-1复制代码
- Schedulers.io() 主要用于一些耗时操作,比如读写文件,数据库存取,网络交互等。 这个调度器根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.i0()中的线程池数量是无限制大的,大量的I/0操作将创建许多线程,我们需要在性能和线程数量中做出取舍。
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName()); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName()); } });复制代码
打印出来
12-13 06:58:22.448 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-212-13 06:58:22.448 11730-12399/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxCachedThreadScheduler-3复制代码
- AndroidSchedulers.mainThread() Android中专用的,指定的操作在Android的主线程(UI线程中)运行
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName()); } });复制代码
打印出来
12-13 06:54:57.969 11730-11782/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-112-13 06:54:57.970 11730-11730/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:main复制代码