Android响应式编程之RxJava应用分析



一、引言

RxJava 到底是什么?RxJava 是 Rx 在Java上的实现,Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。 Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各 Android开发者的欢迎。

二、RxJava介绍

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava是一个实现反应性扩展框架的Java虚拟机:用于通过使用观察序列构成异步和基于事件的程序库。扩展了观察者模式,以支持数据/事件序列,并增加了操作符,他可以将将序列清晰的组合在一起的。这些序列组合可以是抽象出来的某些数据/事件,如低级别的线程,同步,线程安全和并发数据结构。

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

RxJava最核心的两个东西就是Observables(被观察者,也就是事件源)和Subscribers(观察者),由Observables发出一系列的事件,Subscribers进行订阅接收并进行处理,看起来就好像是设计模式中的观察者模式,但是跟观察者模式不同的地方就在于,如果没有观察者(即Subscribers),Observables是不会发出任何事件的。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError():

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个



RxJava的优势:简洁,而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁;它提供的各种功能强悍的操作符真的很强大。

Github:RxJava

三、RxJava应用

3.1 引入依赖

Rxjava有两个分支并行维护(估计还会存在一段时间)

compile 'io.reactivex:rxjava:1.x.y'    //1.3.8

compile 'io.reactivex.rxjava2:rxjava:x.y.z'    ////2.2.1

implementation "io.reactivex.rxjava2:rxjava:2.x.y"  //2.2.1

项目中用到:

compile 'io.reactivex.rxjava2:rxjava:2.1.0'

3.2 创建Subscriber(Observer)

//创建观察者或者订阅者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    //Disposable是1.x的Subscription改名的,因为Reactive-Streams规范用这个名称,为了避免重复
    //这个回调方法是在2.0之后新添加的
    //可以使用d.dispose()方法来取消订阅
    }

    @Override
    public void onNext(String value) {
        Log.e("onNext", value);
    }

    @Override
    public void onError(Throwable e) {
        Log.e("onError", e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e("onComplete", "complete");
    }
};

3.3 创建Observable

//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(ObservableEmitter e) throws Exception {
        e.onNext("Hello Amiga!");
    }
});

3.4 subscribe (订阅)

tempObservable.subscribe(tempSubsciber);

subscribe()主要做了一下工作:

  • 调用subscriber.onStart()
  • 调用 Observable 中的 OnSubscribe.call(Subscriber) 。处理事件发送的逻辑,在 RxJava 中, Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候
  • 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe()
//创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
        //这一步是必须,我们通常可以在这里做一些初始化操作,调用request()方法表示初始化工作已经完成
        //调用request()方法,会立即触发onNext()方法
        //在onComplete()方法完成,才会再执行request()后边的代码
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String value) {
        Log.e("onNext", value);
    }

    @Override
    public void onError(Throwable t) {
        Log.e("onError", t.getMessage());
    }

    @Override
    public void onComplete() {
        //由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete
        Log.e("onComplete", "complete");
    }
};

Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
        e.onNext("Hello,I am China!");
    }
}, BackpressureStrategy.BUFFER)
    .subscribe(subscriber);

在onSubscribe()回调中必须调用s.request()方法去请求资源,参数就是要请求的数量,一般如果不限制请求数量,可以写成Long.MAX_VALUE,之后会立即触发onNext()方法!所以在onSubscribe()/onStart()中做了一些初始化的工作,而这些工作是在request()后面时,会出现一些问题,在onNext()执行时,初始化工作的那部分代码还没有执行。为了避免这种情况,请确保调用request()时,已经把所有初始化工作做完了。

3.5 实例

目前最经典的就是RxJava 与 Retrofit 结合使用:

public interface PrintData {

    @GET("request")
    Observable<Translation> getCall();
     // 注解里传入 网络请求 的部分URL地址
    // Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
    // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
    // 采用Observable<...>接口 
    // getCall()是接受网络请求数据的方法
}
    //通过Retrofit发送网络请求
    // 创建Retrofit对象
    Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
        .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava,1.X为RxJavaCallAdapterFactory
        .build();

    // 创建 网络请求接口 的实例
    PrintData request = retrofit.create(PrintData.class);

    // 采用Observable<...>形式 对 网络请求 进行封装
    Observable<Translation> observable = request.getCall();
    // 通过线程切换发送网络请求
    observable.subscribeOn(Schedulers.io())       // 切换到IO线程进行网络请求
        .observeOn(AndroidSchedulers.mainThread())  // 切换回到主线程 处理请求结果
        .subscribe(new Observer<Translation>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Translation result) {
                // e.接收服务器返回的数据
                result.show() ;
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "请求失败");
            }

            @Override
            public void onComplete() {

            }
    });


3.6 取消订阅

Observable.subscribe()的返回值是一个 Subscription 对象。Subscription 类只有两个方法,unsubscribe() 和 isUnsubscribed()。

四、RxJava 操作符



4.1 create操作符

用来创建一个Observable的:

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //TODO
        }
})

4.2 just操作符

输入什么,输出什么

Observable.just("Hello world").subscribe(s -> showLog(s));

4.3 fromArray操作符

将输入的数组都发射出来

Observable.fromArray("Hello", "World", "girl").subscribe(s -> showLog(s));

4.4 map操作符

将一个对象使用指定的方法转换为另一个对象再发射出去

        observableSigle = Observable.create(new ObservableOnSubscribe<Student>() {
            @Override
            public void subscribe(ObservableEmitter<Student> e) throws Exception {
                //学生
                Student student = new Student("CCHIP", 100);
                e.onNext(student);
                e.onComplete();
            }
        }).map(new Function<Student, Teacher>() {//使用map操作符进行转换
            @Override
            public Teacher apply(Student student) throws Exception {
                //将学生根据业务要求转换为老师并发射出去
                String name = student.getName() + "的老师";
                boolean isGood = student.getScore() >= 90;
                Teacher teacher = new Teacher(name, isGood);
                return teacher;
            }
        });

RxJava操作符功能非常强大,但大部份目前没有用过,所以就不作过多介绍。

五、总结

一开始我认为RxJava 写的代码理解起来很困难,可是它真的很牛逼,学习及使用之后,让我后受益匪浅。


参考资源: