こんにちは、RxJava歴1年くらいのtakuji31です。
これはRxJava Advent Calendar 2015、7日目の記事です。
6日目はk-matsさんの「何となくRxJavaを使ってるけど正直よく分かってない人が読むと良さそうな記事・基礎編」でした。
さて、本日はRxJavaの便利なOperator達を紹介したいと思います。
cache
cache operatorはemitされた値をキャッシュしておくことができるObservableを生成するoperatorです。
以下のコードは単純なループを回しつつ、標準出力にログを出すだけのObservableです。
Observable<Integer> observable = Observable.<Integer>create(subscriber -> { for (int i = 0; i < 5; i++) { System.out.println(String.format(Locale.US, "OnSubscribe %d", i)); subscriber.onNext(i); } subscriber.onCompleted(); }); observable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext 1 - %d", integer)); }); observable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext 2 - %d", integer)); });
これを実行すると以下のように出力されます。
OnSubscribe 0 OnNext 1 - 0 OnSubscribe 1 OnNext 1 - 1 OnSubscribe 2 OnNext 1 - 2 OnSubscribe 3 OnNext 1 - 3 OnSubscribe 4 OnNext 1 - 4 OnSubscribe 0 OnNext 2 - 0 OnSubscribe 1 OnNext 2 - 1 OnSubscribe 2 OnNext 2 - 2 OnSubscribe 3 OnNext 2 - 3 OnSubscribe 4 OnNext 2 - 4
subscribeする度にOnSubscribeが実行されていますね?
この程度の単純なコードだとあまり意味はありませんが、ものすごく重い処理をこの中で実行していて、かつ結果がずっと変わらないようなものだとOnSubscribeが複数実行される必要はないと思います。
この問題を解決してくれるのがcache operatorです。
Observable<Integer> cachedObservable = observable.cache(); cachedObservable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext 1 - %d", integer)); }); cachedObservable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext 2 - %d", integer)); });
これを実行すると以下の通り表示されます。
OnSubscribe 0 OnNext 1 - 0 OnSubscribe 1 OnNext 1 - 1 OnSubscribe 2 OnNext 1 - 2 OnSubscribe 3 OnNext 1 - 3 OnSubscribe 4 OnNext 1 - 4 OnNext 2 - 0 OnNext 2 - 1 OnNext 2 - 2 OnNext 2 - 3 OnNext 2 - 4
無駄な処理が減りましたね?
この後もsubscribeされる度にOnNextだけが表示されます。
retry / retryWhen
APIクライアントをObservableで実現なんてのをやっていると、色々な付随処理を全部Observableでやりたくなるわけですが、
その中で一番単純なのがリトライかと思います。特にモバイルのアプリではネットワークのエラーで通信をやり直すのは日常茶飯事ですね。
そこで登場するのがretry operatorです。
int[] retryCount = new int[1]; retryCount[0] = 0; Observable<Integer> observable = Observable.<Integer>create(subscriber -> { if (retryCount[0] < 5) { subscriber.onError(new RuntimeException(String.format(Locale.US, "Error %d", retryCount[0]))); } else { subscriber.onNext(retryCount[0]); subscriber.onCompleted(); } retryCount[0]++; }); observable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext %d", integer)); }, Throwable::printStackTrace);
5回Subscribeされるまで延々とRuntimeExceptionを吐きつづけるだけのObservableです。
もちろんネットワークのエラーはこんなに単純ではなくて、いつ起きるかわかりませんが、サンプルコードなのでシンプルに。
このコードを実行すると、以下のように表示されます。
java.lang.RuntimeException: Error 0 at jp.takuji31.rxac2015.RetryOperator.lambda$main$0(RetryOperator.java:16) at rx.Observable.subscribe(Observable.java:8191) at rx.Observable.subscribe(Observable.java:8158) at rx.Observable.subscribe(Observable.java:7962) at jp.takuji31.rxac2015.RetryOperator.main(RetryOperator.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
例外が吐かれますね。
ここは何としても処理を成功してほしいのでretry operatorを使いましょう。
Observable<Integer> retryObservable = observable.retry(); retryObservable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext %d", integer)); }, Throwable::printStackTrace);
実行してみましょう。
OnNext 5
5回の再試行の末にやっと成功しました!
retryは試行回数を制限することもできます。
Observable<Integer> threeTimeRetryObservable = observable.retry(3); threeTimeRetryObservable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext %d", integer)); }, Throwable::printStackTrace);
java.lang.RuntimeException: Error 3 at jp.takuji31.rxac2015.RetryOperator.lambda$main$0(RetryOperator.java:16) at rx.Observable.unsafeSubscribe(Observable.java:8098) at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:278) at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:77) at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:56) at rx.internal.operators.OnSubscribeRedo$5.request(OnSubscribeRedo.java:366) at rx.Subscriber.setProducer(Subscriber.java:209) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:358) at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:55) at rx.Observable.subscribe(Observable.java:8191) at rx.Observable.subscribe(Observable.java:8158) at rx.Observable.subscribe(Observable.java:7962) at jp.takuji31.rxac2015.RetryOperator.main(RetryOperator.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
3回リトライの末、4回目でエラーを返しました。
また、retryWhenを使えば、リトライの試行を遅らせたりやめさせたりできます。
Observable<Integer> retryWhenObservable = observable.retryWhen(observable1 -> { return observable1.flatMap(o -> Observable.timer(3, TimeUnit.SECONDS)); }); retryWhenObservable.subscribe(integer -> { System.out.println(String.format(Locale.US, "OnNext %d", integer)); }, Throwable::printStackTrace);
こうすることで、毎回エラーが返された時に3秒ずつ待ってから再試行されます。
APIへのリトライを行う時には必須ですね。
onErrorResumeNext / onErrorReturn
onErrorResumeNextはエラーが返された時に特定のObservableを用いて処理を継続できるOperatorです。
Observable<String> observable = Observable.range(0, 5).flatMap(integer -> Observable.error(new RuntimeException("String not found!!!"))); observable.subscribe(s -> { System.out.println(s); }, Throwable::printStackTrace);
エラーを返すだけのObservableです、もちろんsubscribeしてもエラーしか返しませんね?それも5回やっても毎回例外返してくるひどいやつです。
Observable<String> resumeNextObservable = observable.onErrorResumeNext(throwable -> Observable.just("Resume", "Next")); resumeNextObservable.subscribe(s -> { System.out.println(s); }, Throwable::printStackTrace);
こうしてやることで、処理を継続できます。結果は以下の通り。
Resume Next
対してonErrorReturnは単一の値を返します。
Observable<String> returnObservable = observable.onErrorReturn(throwable -> "OnErrorReturn"); returnObservable.subscribe(s -> { System.out.println(s); }, Throwable::printStackTrace);
結果は以下の通り。
OnErrorReturn
サンプルコード
今回のサンプルコードはGithubにあります。
まとめ
map、flatMap、filter等のよく使うOperatorほど存在感はありませんが、便利なOperator達を紹介しました。
かゆいところに手が届くRxJavaの便利なOperator達を是非活用してみてください。
明日はizumin5210さんの「SingleとSingleSubscriber」です、お楽しみに!