史上最浅显易懂的RxJava入门教程
分类:编程应用

因为工作需要刚好在学习 RxJava + Retrofit2 + OkHttp3 网络请求框架,网上搜了一些 RxJava 的教程,并不是很好理解,所幸最后我找到了几篇有助于初学者了解 RxJava 的文章,于是结合自己的理解,重新整理成一篇发给大家,希望通过我的咀嚼,能够帮助大家更快的了解和上手 RxJava,尤其是文章的最后,你将理解是怎样一种优势,乃至于我们开始考虑用 RxJava 来替代传统方式。话不多说,以下正文。

RxAndroid 基础#

参考hi大头鬼hi的微博,写代码进行测试学习,以下记录共享,同时以便之后查阅。 由于不熟悉lambda,同时开始学习也不建议直接使用lambda,以下大部分代码均使用常规方法编写。

RxJava在github上的地址
RxAndroid在github上的地址

首先,工程中引入:

dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
compile 'com.android.support:appcompat-v7:23.0.1'

//引入RxAndroid----begin
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'
//引入RxAndroid----end

// RxBinding
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
compile 'com.jakewharton.rxbinding:rxbinding-appcompat-v7:0.4.0'
compile 'com.jakewharton.rxbinding:rxbinding-design:0.4.0'
}

观察者模式

RxJava 涉及到所谓的观察者模式。观察者模式,按我的理解,就是所谓的被观察者和订阅者之间的关系。其形式是,在作为被观察者的一个方法中,包含着一段订阅,该订阅的本质是抽象类或接口的引用对抽象方法的调用,而该订阅实际指向的是作为实现类的订阅者具体实现的方法。(这实际上就是用到了面向对象编程中“向上转型”的概念。向上转型的套路是:实现类实现了抽象类定义的抽象方法;将实现类的对象赋值给抽象类的引用;然后通过抽象类的引用来调用抽象方法,而该调用实际指向实现类具体实现的方法……好吧,我这里相当于把面向对象的基础又过了一遍,不懂向上转型概念的先滚去看 Mars 的 Java4Android 教程 - - )

//抽象类或接口public interface Printer{ void print;//定义抽象函数}//实现类实现抽象函数class CanonPrinter implement Printer{ @override public void print{ System.out.println; }}//在被观察者中,通过抽象类或接口的引用来调用抽象函数private static void printSth(Printer printer){ printer.print("ha ha ha");}//执行被观察者时,传入实现类的对象作为实参赋值给抽象类或接口的引用,//从而在被观察者中通过抽象类或接口的引用来调用print抽象方法,//而该抽象方法实际指向实现类实现的print方法public static void main(String[] args){ printSth(new CanonPrinter;}

所以若是真要理解成观察和被观察,按照这个逻辑也的确说的过去,也就是说,被观察者刚好触发了这里,然后直接跳转到订阅者实现的方法内部去具体执行被触发后该做出的反应。

被观察者

Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { sub.onNext; sub.onNext; sub.onCompleted; 

订阅者

Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onNext { System.out.println; } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; 

关联起来

myObservable.subscribe(mySubscriber);

有没有觉得和btn.setOnClickListener(onClickListener)很相似,是的,就是这样。这一步就是将实现类的对象赋值给抽象类或接口的引用。

不过,观察者和订阅者的创建也有简化版。例如当观察者只触发一个事件时,可以直接使用 just。

观察者

Observable<String> myObservable = Observable.just("Hello, world!"); 

订阅者当不关心 OnComplete 和 OnError,只需要在 onNext 时做一些处理,可以用 Action1 类

Action1<String> onNextAction = new Action1<String>() { @Override public void call { System.out.println; } }; 

subscribe 方法有一个重载版本,接受1~3个 Action1 类型的参数,分别对应 OnNext,OnError,OnComplete然后我们现在只需要 onNext,就只需要传入一个参数

myObservable.subscribe(onNextAction);

这样,上述的代码最终可以写成

Observable.just .subscribe(new Action1<String>() { @Override public void call { System.out.println; 

注意,泛型类不能直接写在类中作为对象的实例变量,当它没有被指定类型时,因为泛型是编译时,不是运行时。上述代码因为 Observable 不指定具体<T>,因此,你应写在方法中,使 Observable 作为局部变量。

而使用 lambda,可以使代码变得更简洁

Observable.just .subscribe(s -> System.out.println; 

那如果我想做一些手脚怎么办,特别是,观察者不能被改动,订阅者也不能被改动的时候这时我们可以通过操作符 map 来完成改动(操作符还有很多,负责中间过程改动的,是map)

例如我给输入的文字加个后缀

Observable.just .map(new Func1<String, String>() { @Override public String call { return s + ".jpg"; } }) .subscribe(s -> System.out.println;

lambda:

Observable.just .map(s -> s + ".jpg") .subscribe(s -> System.out.println;

或者在中间过程获得类型不同的数据

Observable.just .map(s -> s.hashCode .map(i -> Integer.toString .subscribe(s -> System.out.println;

那么到这里为止,我们知道了,RxJava 本质上和我们之前用的“回调”性质是一样的,就是作为观察者的一方调用抽象函数,作为订阅者的另一方实现抽象函数,并且二者通过 subscribe 方法关联,也就是,在观察者的 subscribe 方法中实现订阅者的匿名内部类,来具体实现被触发的方法。

但是故事说到这里,好像还是没有异步什么事啊,它分明就是“同线程中的身首异处”嘛。

下面来看看操作符

一、基本使用

操作符

上述我们已经提到了操作符 map。然后此处我们有个这样的需求:输入一个关键字,返回相关结果的 url 列表。相当于搜索引擎。

一般我们会这么做

Observable<List<String>> query(String text);query("Hello, world!") .subscribe(urls -> { for (String url : urls) { System.out.println; } }); 

有for循环,这样看起来有点繁杂我们可以通过操作符 from,对数组中的数据进行逐个处理

Observable.from .subscribe(url -> System.out.println;

也就是

query("Hello, world!") .subscribe(urls -> { Observable.from .subscribe(url -> System.out.println; 

但这样破坏了 RxJava 的结构,因为订阅者最好是保持本身单一的功能,而数据的改变最好在中间过程中通过操作符来完成。此时我们可以通过操作符 flatMap 来处理这个问题。

Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个Observable

query("Hello, world!") .flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> urls) { return Observable.from; } }) .subscribe(url -> System.out.println;

lambda:

query("Hello, world!") .flatMap(urls -> Observable.from .subscribe(url -> System.out.println; 

flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List<String>,而是收到一些列单个的字符串,就像 Observable.from() 的输出一样

接着前面的例子,现在我不想打印 url 了,而是要打印收到的每个网站的标题。我当然不能直接将 getTitle 方法放进订阅者, 因为一再强调了,订阅者实现的功能要单一,不要轻易改动。那我将考虑在中间过程中来完成这个改动,这样的话,我的 getTitle 方法每次只能传入一个 url,并且返回值不是一个 String,而是一个输出 String 的 Observable 对象。flatMap 不正是这么运作的吗(通过输入的 Observable 对象得到里面的数据,再将数据输出,以 Observable 对象的方式传递),所以还是考虑使用 flatMap。

query("Hello, world!") .flatMap(urls -> Observable.from .flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String url) { return getTitle; } }) .subscribe(title -> System.out.println;

lambda:

query("Hello, world!") .flatMap(urls -> Observable.from .flatMap(url -> getTitle .subscribe(title -> System.out.println; 

那么如果我想过滤掉一些情况,例如返回 url 为 null 的,我不要它们的标题了,过滤掉,可以用操作符 filter。

query("Hello, world!") .flatMap(urls -> Observable.from .flatMap(url -> getTitle .filter(title -> title != null) .subscribe(title -> System.out.println;

如果想指定输出元素的数量,可以用 take。以下限制输出 5 个。

query("Hello, world!") .flatMap(urls -> Observable.from .flatMap(url -> getTitle .filter(title -> title != null) .take .subscribe(title -> System.out.println; 

如果想在每次触及订阅之前,完成一些事情,可以用 doOnNext。例如以下在输出之前,将标题保存到某处。

query("Hello, world!") .flatMap(urls -> Observable.from .flatMap(url -> getTitle .filter(title -> title != null) .take .doOnNext(title -> saveTitle .subscribe(title -> System.out.println; 

其他的操作符还可以来这篇文章了解

介绍完操作符,现在开始讲重点。为什么用 RxJava,它和传统的回调以及异步处理相比存在什么优势?

常规语法

Observable<String> myObservable = Observable.create(
            new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> sub) {
                    sub.onNext("Observable Hello, world!");
                    sub.onCompleted();
                }
            }
    );

    Subscriber<String> mySubscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            L.d("onNext s: " + s);
           }

        @Override
        public void onCompleted() { }

        @Override
        public void onError(Throwable e) { }
    };

    myObservable.subscribe(mySubscriber);

错误处理

Observable.just("Hello, world!") .map(s -> potentialException .map(s -> anotherPotentialException .subscribe(new Subscriber<String>() { @Override public void onNext { System.out.println; } @Override public void onCompleted() { System.out.println("Completed!"); } @Override public void onError(Throwable e) { System.out.println; } });

代码中的 potentialException() 和 anotherPotentialException() 有可能会抛出异常。每一个 Observerable 对象在终结的时候都会调用 onCompleted() 或者 onError() 方法,所以 Demo 中会打印“Completed!”或者“Ouch!”

这种模式有以下几个优点:

1.只要有异常发生 onError() 一定会被调用这极大的简化了错误处理。只需要在一个地方处理错误即可以。

2.操作符不需要处理异常将异常处理交给订阅者来做,Observerable 的操作符调用链中一旦有一个抛出了异常,就会直接执行 onError() 方法。

3.你能够知道什么时候订阅者已经接收了全部的数据知道什么时候任务结束能够帮助简化代码的流程。

这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。

使用 RxJava,Observable 对象根本不需要知道如何处理错误!操作符也不需要处理错误状态:一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。(这里的意思是说,传统的方式,就算也存在中间过程,那错误也需要在中间过程的每一步中分别处理。而 RxJava 不需要在中间过程处理错误,有错误直接跳到最后,统一由订阅者处理)

简化语法

Observable.just(1)
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    L.d("onNext integer: " + integer);
                }
            });

调度器

使用 RxJava,你可以使用 subscribeOn() 指定观察者代码运行的线程,使用 observerOn() 指定订阅者运行的线程:(这个过程简单的说就是,订阅者在主线程观察被观察者,所以这个动作被成为 observeOn。而被观察者在异线程添加了订阅者的订阅,所以这个动作叫 subscribeOn,其实说 be subscribe on 更合适)

myObservableServices.retrieveImage .subscribeOn(Schedulers.io .observeOn(AndroidSchedulers.mainThread .subscribe(bitmap -> myImageView.setImageBitmap;

subscribeOn() 和 observerOn() 可以被添加到任何 Observable 对象上,这两个也是操作符。我不需要关心 Observable 对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度

如果使用 AsyncTask 或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用 RxJava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以

map操作符变换

Observable.just("Hello, world!")
            .map(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    return s + " -Map";
                }
            })
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    L.d("onNext s: " + s);
                }
            });

订阅

当调用 Observable.subscribe(),会返回一个 Subscription 对象。这个对象代表了被观察者和订阅者之间的联系

Subscription subscription = Observable.just("Hello, World!") .subscribe(s -> System.out.println;subscription.unsubscribe();

RxJava 的另外一个好处就是它处理 unsubscribing 的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用 unsubscribe 将会在他当前执行的地方终止。不需要做任何额外的工作!

最后,再次声明一遍,这篇文章的学习,其主线是参考自以下这篇博客,是一篇对国外大神的译文。

后面的文章,链接在这里,目前先没有这个打算继续看下去……

以及有人推荐这篇,不知道讲的什么鬼~

二、操作符

Observable.from()

Observable.from()方法,它接收一个集合作为输入,然后每次输出一个元素给subscriber:

 List<String> urls = new ArrayList<>();
    urls.add("url1");
    urls.add("url2");
    urls.add("url3");
    Observable.from(urls)
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    L.d("onNext s: " + s);
                }
            });

输出如下:
nNext s: url1
onNext s: url2
onNext s: url3

   Observable<List<String>> myObservableUrls = Observable.create(
            new Observable.OnSubscribe<List<String>>() {
                @Override
                public void call(Subscriber<? super List<String>> subscriber) {
                    List<String> urls = new ArrayList<>();
                    urls.add("url1");
                    urls.add("url2");
                    urls.add("url3");
                    subscriber.onNext(urls);
                }
            }
    );

    myObservableUrls.subscribe(new Action1<List<String>>() {
        @Override
        public void call(List<String> strings) {
            Observable.from(strings)
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            L.d("myObservableUrls onNext s: " + s);
                        }
                    });
        }
    });

输出如下:
myObservableUrls onNext s: url1
myObservableUrls onNext s: url2
myObservableUrls onNext s: url3
在前一个Observable输出是list时,嵌套太多,看起来比较乱。

Observable.flatMap()

Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable。

 myObservableUrls.flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            L.d("myObservableUrls.flatMap onNext s: " + s);
        }
    }); 

避免了上面直接使用Observable.from时需要内如再次subscribe导致的内部嵌套。理解flatMap的关键点在于,flatMap输出的新的Observable正是我们在Subscriber想要接收的。现在Subscriber不再收到List<String>,而是收到一些列单个的字符串,就像Observable.from()的输出一样。

flatMap(),它可以返回任何它想返回的Observable对象。
接着前面的例子,现在我不想打印URL了,而是要打印收到的每个网站的标题。

现在的方法每次只能传入一个URL,并且返回值不是一个String,而是一个输出String的Observabl对象:

  private Observable<String> getTitle(final String url) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("title: " + url);
        }
    });
}

实现:

 myObservableUrls.flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    }).flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String s) {
            return getTitle(s);
        }
    }).subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            L.d("myObservableUrls.flatMap onNext o: " + o);
        }
    });

filter()

filter()输出和输入相同的元素,并且会过滤掉那些不满足检查条件的

 myObservableUrls.flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    }).flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String s) {
            return getTitle(s);
        }
    }).filter(new Func1<Object, Boolean>() {
        @Override
        public Boolean call(Object o) {
            return !o.equals("title: url2");
        }
    }).subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            L.d("myObservableUrls.flatMap onNext o: " + o);
        }
    });

take()

take()输出最多指定数量的结果。

doOnNext()

doOnNext()允许我们在每次输出一个元素之前做一些额外的事情,比如这里的保存标题。

直接上网上的示例:

query("Hello, world!")  
.flatMap(urls -> Observable.from(urls))  
.flatMap(url -> getTitle(url))  
.filter(title -> title != null)  
.take(5)  
.doOnNext(title -> saveTitle(title))  
.subscribe(title -> System.out.println(title));  

三、响应式的好处

错误处理

每一个Observerable对象在终结的时候都会调用onCompleted()或者onError()方法,

这种模式有以下几个优点:

1.只要有异常发生onError()一定会被调用

这极大的简化了错误处理。只需要在一个地方处理错误即可以。

2.操作符不需要处理异常

将异常处理交给订阅者来做,Observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onError()方法。

3.你能够知道什么时候订阅者已经接收了全部的数据。

知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能Observable对象永远不会结束)

使用RXJAVA,OBSERVABLE对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。

调度器

使用RxJava,你可以使用subscribeOn()指定观察者代码运行的线程,使用observerOn()指定订阅者运行的线程:

myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

订阅(Subscriptions)

调用Observable.subscribe(),会返回一个Subscription对象。这个对象代表了被观察者和订阅者之间的联系。

ubscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));

你可以在后面使用这个Subscription对象来操作被观察者和订阅者之间的联系.

subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());

RxJava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!

当然也不是所有的代码都使用响应式的方式,仅仅当代码复杂到我想将它分解成简单的逻辑的时候,我才使用响应式代码。

四、在Android中使用响应式编程

这部分主要是摘录,有些方法现在已过时,后续在实际应用中有用到再进行补充更新,

RxAndroid

首先,AndroidSchedulers提供了针对Android的线程系统的调度器。需要在UI线程中运行某些代码?很简单,只需要使用AndroidSchedulers.mainThread():

retrofitService.getImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:

RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释
.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms
.subscribe(subscriber);

遗留代码,运行极慢的代码

绝大多数时候Observable.just() 和 Observable.from() 能够帮助你从遗留代码中创建 Observable 对象:

private Object oldMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.just(oldMethod());
}

上面的例子中如果oldMethod()足够快是没有什么问题的,但是如果很慢呢?调用oldMethod()将会阻塞住他所在的线程。
为了解决这个问题,可以参考我一直使用的方法–使用defer()来包装缓慢的代码:

private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

现在,newMethod()的调用不会阻塞了,除非你订阅返回的observable对象。

生命周期

如何处理Activity的生命周期?主要就是两个问题:
1.在configuration改变(比如转屏)之后继续之前的Subscription。
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?

2.Observable持有Context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。

第一个问题的解决方案就是使用RxJava内置的缓存机制,这样你就可以对同一个Observable对象执行unsubscribe/resubscribe,却不用重复运行得到Observable的代码。cache() (或者 replay())会继续执行网络请求(甚至你调用了unsubscribe也不会停止)。这就是说你可以在Activity重新创建的时候从cache()的返回值中创建一个新的Observable对象。

Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要你自己来做,和所有其他的生命周期相关的解决方案一延虎,必须在生命周期外的某个地方存储。(retained fragment或者单例等等)。

第二个问题的解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用CompositeSubscription来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()里取消所有的订阅。

private CompositeSubscription mCompositeSubscription
= new CompositeSubscription();

private void doSomething() {
    mCompositeSubscription.add(
        AndroidObservable.bindActivity(this, Observable.just("Hello, World!"))
        .subscribe(s -> System.out.println(s)));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    mCompositeSubscription.unsubscribe();
}

你可以在Activity/Fragment的基类里创建一个CompositeSubscription对象,在子类中使用它。

注意! 一旦你调用了 CompositeSubscription.unsubscribe(),这个CompositeSubscription对象就不可用了, 如果你还想使用CompositeSubscription,就必须在创建一个新的对象了。
最后,附上测试时使用的完整工程,代码上面均有贴出来了。
RxAndroid基本使用测试代码
http://download.csdn.net/detail/yaodong379/9486905

本文由正版必中一肖图发布于编程应用,转载请注明出处:史上最浅显易懂的RxJava入门教程

上一篇:推荐介绍配置 下一篇:没有了
猜你喜欢
热门排行
精彩图文