RxJava

RxJava使用方法

使用步骤

  1. 导入jcenter

    1
    2
    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'
  1. 创建观察者Observer对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s){
    Log.d(tag,"Item: "+s);
    }
    @Override
    public void onCompleted(){
    Log.d(tag,"Completed!");
    }
    @Override
    public void onError(Throwablee){
    Log.d(tag,"Error!");
    }
    };
    • SubscriberObserver的扩展,比ObserveronStart()法,会在subscribe刚开始而事件未发送时调用,但不适用于显示progressbar,因为它总在subscribe发生的线程被调用,不能指定线程,若想指定线程,需要用 doOnSubscribe()

    • Subscription是实现了Subscriber的一个接口,其方法unsubscribe用于取消前订阅状态,调用后,Subsrciber不再接收事件,调用前最好用isUnsubscribed()断,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,因此尽快在合适的地方(例如 onPause() onStop() 等方法中)调用unsubscribe释放,避免内存泄漏。

  1. 创建被观察者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("Hello");
    subscriber.onNext("Hi");
    subscriber.onNext("Aloha");
    subscriber.onCompleted();
    }
    });
    //与上面的写法等价
    //或
    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    //或
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
  2. 订阅

    1
    2
    3
    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
    • 除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      Action1<String> onNextAction = new Action1<String>() {
      // onNext()
      @Override
      public void call(String s) {
      Log.d(tag, s);
      }
      };
      Action1<Throwable> onErrorAction = new Action1<Throwable>() {
      // onError()
      @Override
      public void call(Throwable throwable) {
      // Error handling
      }
      };
      Action0 onCompletedAction = new Action0() {
      // onCompleted()
      @Override
      public void call() {
      Log.d(tag, "completed");
      }
      };
      // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
      observable.subscribe(onNextAction);
      // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
      observable.subscribe(onNextAction, onErrorAction);
      // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
      observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
  3. 线程控制——Scheduler

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      Observable.just(1, 2, 3, 4)
      .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
      .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
      .subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer number) {
      Log.d(tag, "number:" + number);
      }
      });

      加载图片示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      int drawableRes = ...;
      ImageView imageView = ...;
      Observable.create(new OnSubscribe<Drawable>() {
      @Override
      public void call(Subscriber<? super Drawable> subscriber) {
      Drawable drawable = getTheme().getDrawable(drawableRes));
      subscriber.onNext(drawable);
      subscriber.onCompleted();
      }
      })
      .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
      .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
      .subscribe(new Observer<Drawable>() {
      @Override
      public void onNext(Drawable drawable) {
      imageView.setImageDrawable(drawable);
      }
      @Override
      public void onCompleted() {
      }
      @Override
      public void onError(Throwable e) {
      Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
      }
      });
  4. 变换
    所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

    • map()

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      Observable.just("images/logo.png") // 输入类型 String
      .map(new Func1<String, Bitmap>() {
      @Override
      public Bitmap call(String filePath) { // 参数类型 String
      return getBitmapFromPath(filePath); // 返回类型 Bitmap
      }
      })
      .subscribe(new Action1<Bitmap>() {
      @Override
      public void call(Bitmap bitmap) { // 参数类型 Bitmap
      showBitmap(bitmap);
      }
      });

      如上面这段代码,它的作用是将String事件转化为Bitmap事件。

    • flatmap()
      首先看一段代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      Student[] students = ...;
      Subscriber<Course> subscriber = new Subscriber<Course>() {
      @Override
      public void onNext(Course course) {
      Log.d(tag, course.getName());
      }
      ...
      };
      Observable.from(students)
      .flatMap(new Func1<Student, Observable<Course>>() {
      @Override
      public Observable<Course> call(Student student) {
      return Observable.from(student.getCourses());
      }
      })
      .subscribe(subscriber);
从上面的代码可以看出,`flatMap()` 和 `map()` 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 `map()` 不同的是, `flatMap()` 中返回的是个 `Observable` 对象,并且这个 `Observable` 对象并不是被直接发送到了 `Subscriber` 的回调方法中。 `flatMap()` 的原理是这样的:
a. 使用传入的事件对象创建一个 `Observable` 对象;
b. 并不发送这个 `Observable`, 而是将它激活,于是它开始发送事件;
c. 每一个创建出来的 `Observable` 发送的事件,都被汇入同一个 `Observable` ,而这个 `Observable` 负责将这些事件统一交给 `Subscriber` 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 `Observable` 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 `flatMap()` 所谓的 flat。

扩展:由于可以在嵌套的 `Observable` 中添加异步代码, `flatMap()` 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
传统的嵌套请求需要使用嵌套的 `Callback` 来实现。而通过 `flatMap()` ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。 `throttleFirst()`: 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:
1
2
3
RxView.clickEvents(button) // RxBinding 代码,后面的文章有释
.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为500ms
.subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
  • compose()
    用于包裹多次变换

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
    return observable
    .lift1()
    .lift2()
    .lift3()
    .lift4();
    }
    }
    ...
    Transformer liftAll = new LiftAllTransformer();
    observable1.compose(liftAll).subscribe(subscriber1);
    observable2.compose(liftAll).subscribe(subscriber2);
    observable3.compose(liftAll).subscribe(subscriber3);
    observable4.compose(liftAll).subscribe(subscriber4);
  1. 多次线程切换

    1
    2
    3
    4
    5
    6
    7
    8
    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
![subsrcibeOn](/images/subsrcibeOn.jpg)


![observerOn](/images/observerOn.jpg)


![xiancheng](/images/xiancheng.jpg)


图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 `subscribeOn()` 影响,运行在红色线程;③和④处受第一个 `observeOn()` 的影响,运行在绿色线程;⑤处受第二个 `onserveOn()` 影响,运行在紫色线程;而第二个 `subscribeOn()` ,由于在通知过程中线程就被第一个 `subscribeOn()` 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 `subscribeOn()` 的时候,只有第一个 `subscribeOn()` 起作用。

然而,虽然超过一个的 `subscribeOn()` 对事件处理的流程没有影响,但在流程之前却是可以利用的。在前面讲 `Subscriber` 的时候,提到过 `Subscriber` 的 `onStart()` 可以用作流程开始前的初始化。然而`onStart()` 由于在 `subscribe()` 发生时就被调用了,因此不能指定线程,而是只能执行在 `subscribe()` 被调用时的线程。这就导致如果 `onStart()` 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 `subscribe()` 将会在什么线程执行。

而与 `Subscriber.onStart()` 相对应的,有一个方法 `Observable.doOnSubscribe()` 。它和 `Subscriber.onStart()` 同样是在 `subscribe()` 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, `doOnSubscribe()` 执行在 `subscribe()` 发生的线程;而如果在 `doOnSubscribe()` 之后有 `subscribeOn()` 的话,它将执行在离它最近的 `subscribeOn()` 所指定的线程。

1
2
3
4
5
6
7
8
9
10
11
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。
分享到 评论