RxJava的错误处理及使用问题

赵小温

最近在研究RxJava在Android项目中的应用,在RxJava的错误处理方面,查阅很多网上资料,大都会出现类似“Rxjava中,在错误出现的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错误”的说法,第一感觉太棒了,但经过demo试验事实并非如此,所以决定对RxJava的错误处理机制做一个整体的研究。

首先来看一下对于错误处理的几种方式。大致分为四类

1.Subscriber的onError()
2.onErrorReturn()类方法
3.retry()类方法
4.xxxDelayError()类方法
下面依次讲解分析每一种处理方式的使用及所遇到的一些问题。

一.onError()方法

Rxjava中,在错误出现的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错误。
我们先来看一个例子:

从本地读取N张图片显示到界面上

String[] bitmapPaths = new String[]{
            "mnt/sdcard/MagazineUnlock/demo.png",
            "mnt/sdcard/MagazineUnlock/demo2.png",
            "mnt/sdcard/MagazineUnlock/demo3.png",
            "mnt/sdcard/MagazineUnlock/demo4.png",
            null};
Observable.from(bitmapPaths).map(new Func1() {
            @Override
            public Bitmap call(String s) {
                File file = new File(s);
                return BitmapUtils.getBitmap(file);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                setErrorBitmap();
            }

            @Override
            public void onNext(Bitmap bitmap) {
                addImage(bitmap);
            }
        });
我们可以看到数据源bitmapPaths是一个字符串数组,但最后一项是一个null,在map方法中做处理,如不加判断,将会引起异常,然后调用onError方法结束。 现在假设bitmapPaths是一个null呢?是否还会进入onError方法结束? 假设不如来做试验看下,结果App报异常导致崩溃(事实证明,上面的说法并不是完全正确的)。那我们不妨来看看RxJava的源码,为何在这里会报错吧。 Observable的from方法:
public static  Observable from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return create(new OnSubscribeFromArray(array));
    }
可以看到from方法对array只是做了length长度的保护,但没有对null的情况做保护。 还有网友反馈的一种情况就是FileNotFoundException无法进入onError的方法,后经过查看源码发现BitmapFactory.decodeFile(file.getAbsolutePath()); 该方法在读取时做了一个catch保护,所以我们是无法捕获该异常的,请看源码
public static Bitmap decodeFile(String pathName, Options opts) {
        Bitmap bm = null;
        InputStream stream = null;
        try {
            stream = new FileInputStream(pathName);
            bm = decodeStream(stream, null, opts);
        } catch (Exception e) {
            /*  do nothing.
                If the exception happened on open, bm will be null.
            */
            Log.e("BitmapFactory", "Unable to decode stream: " + e);
        } finally {
            if (stream != null) {
                try {
                    stream.close();
                } catch (IOException e) {
                    // do nothing here
                }
            }
        }
        return bm;
    }
只是在遇到异常时,使用Log打印了一句话。 二. onErrorReturn()类方法 该类方法有三个,分别是onErrorReturn(),onExceptionResumeNext(),onErrorResumeNext()。 RxJava中发生错误时,onErrorReturn()方法可以在调用onError方法之前拦截错误,返回一个Bitmap进入onNext方法,并结束整个流程。 onErrorResumeNext()方法返回一个新的Observable,将这个新的Observable的每一项释放到onNext,然后结束整个流程。 onExceptionResumeNext()方法同onErrorResumeNext()类似,但是它只捕获Exception(Exception只是Throwable的一个子类) 还是上面的例子,我们直接来看新增代码部分
onErrorReturn(new Func1() {
            @Override
            public Bitmap call(Throwable throwable) {
                Bitmap errorBitmap = BitmapFactory.decodeResource(ErrorHandlingActivity.this.getResources(),
                        R.drawable.ic_launcher);
                return errorBitmap;
            }
        })
这里有一个地方需要注意,就是onExceptionResumeNext/onErrorResumeNext方法和map一起使用时,如果异常情况发生在map里,有一定的概率会出现前面几张图片也没有显示出来的情况,现在我们不做线程切换,就不会出现该问题。这点是需要大家注意的问题。 三.retry()类方法 该类方法有retry(),retryWhen()。 retry()方法可以设置重试次数,使用方法比较简单。 retryWhen()方法通过Func1方法实现对错误重试的控制。下面用一个例子来说明,每隔2s重试一次,总次数为2。
.retryWhen(new RetryWithDelay(2, 2000))
public class RetryWithDelay implements Func1, Observable> {

        private final int maxRetries;
        private final int retryDelayMillis;
        private int retryCount;

        public RetryWithDelay(int maxRetries, int retryDelayMillis) {
            this.maxRetries = maxRetries;
            this.retryDelayMillis = retryDelayMillis;
        }

        @Override
        public Observable call(Observable attempts) {
            return attempts.flatMap(new Func1>() {
                @Override
                public Observable call(Throwable throwable) {
                    if (++retryCount <= maxRetries) {
                        return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS);
                    }
                    // Max retries hit. Just pass the error along.
                    return Observable.error(throwable);
                }
            });
        }
    }

如果要对某一种类型的Throwable做重试,也可以用instanceof来做一个过滤。

四.xxxDelayError()类方法

xxxDelayError()和xxxOrDefault()也是对错误的一种处理,xxxDelayError()可以延迟onError方法的调用;
xxxOrDefault()取出某一个值,当该值为空时,使用默认值;
下面以mergeDelayError为案例做一下讲解 mergeDelayError():他能从一个Observable中继续发射数据即便是其中有一个抛出了错误。当所有的Observables都完成时,mergeDelayError将会发射onError()。
下面有个例子可以看下

Observable observable1 = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("observable1---------1");
                subscriber.onNext("observable1---------2");
                subscriber.onNext("observable1---------3");
                subscriber.onNext("observable1---------4");
                subscriber.onCompleted();
            }
        });
        Observable observable2 = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("observable2---------1");
                subscriber.onNext("observable2---------2");
                String s = null;
                s.toString();
                subscriber.onNext("observable2---------3");
                subscriber.onNext("observable2---------4");
                subscriber.onCompleted();
            }
        });
        Observable observable3 = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("observable3---------1");
                subscriber.onNext("observable3---------2");
                subscriber.onNext("observable3---------3");
                subscriber.onNext("observable3---------4");
                subscriber.onCompleted();
            }
        });

        Observable.mergeDelayError(observable1, observable2, observable3)
            .onErrorReturn(new Func1() {
                    @Override
                    public String call(Throwable throwable) {
                        return "onErrorReturn";
                    }
                }).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

输入结果:

observable1---------1
observable1---------2
observable1---------3
observable1---------4
observable2---------1
observable2---------2
observable3---------1
observable3---------2
observable3---------3
observable3---------4
onErrorReturn

输入预期结果,在输出“observable2---------2”后,程序报错,然后通过延迟错误,在最后输出“onErrorReturn”。

这里需要注意的一点就是,这个错误必须是发生在observable1,observable2,observable3中的,如果是发生在mergeDelayError方法以外的,就会按照onError的流程结束。

这只是我遇到的一些问题,应该还会有其他需要注意的问题,欢迎大家发现问题后一起交流。