Takuji->find;

株式会社はてなでアプリケーションエンジニアやってます、技術的な記事を書いているつもり

RxJavaの便利なOperator達 - RxJava Advent Calendar 2015

こんにちは、RxJava歴1年くらいのtakuji31です。

これはRxJava Advent Calendar 2015、7日目の記事です。

qiita.com

6日目はk-matsさんの「何となくRxJavaを使ってるけど正直よく分かってない人が読むと良さそうな記事・基礎編」でした。

qiita.com

さて、本日はRxJavaの便利なOperator達を紹介したいと思います。

cache

cache operatorはemitされた値をキャッシュしておくことができるObservableを生成するoperatorです。

以下のコードは単純なループを回しつつ、標準出力にログを出すだけのObservableです。

Observable<Integer> observable = Observable.<Integer>create(subscriber -> {
    for (int i = 0; i < 5; i++) {
        System.out.println(String.format(Locale.US, "OnSubscribe %d", i));
        subscriber.onNext(i);
    }
    subscriber.onCompleted();
});

observable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext 1 - %d", integer));
});

observable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext 2 - %d", integer));
});

これを実行すると以下のように出力されます。

OnSubscribe 0
OnNext 1 - 0
OnSubscribe 1
OnNext 1 - 1
OnSubscribe 2
OnNext 1 - 2
OnSubscribe 3
OnNext 1 - 3
OnSubscribe 4
OnNext 1 - 4
OnSubscribe 0
OnNext 2 - 0
OnSubscribe 1
OnNext 2 - 1
OnSubscribe 2
OnNext 2 - 2
OnSubscribe 3
OnNext 2 - 3
OnSubscribe 4
OnNext 2 - 4

subscribeする度にOnSubscribeが実行されていますね?

この程度の単純なコードだとあまり意味はありませんが、ものすごく重い処理をこの中で実行していて、かつ結果がずっと変わらないようなものだとOnSubscribeが複数実行される必要はないと思います。

この問題を解決してくれるのがcache operatorです。

Observable<Integer> cachedObservable = observable.cache();

cachedObservable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext 1 - %d", integer));
});

cachedObservable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext 2 - %d", integer));
});

これを実行すると以下の通り表示されます。

OnSubscribe 0
OnNext 1 - 0
OnSubscribe 1
OnNext 1 - 1
OnSubscribe 2
OnNext 1 - 2
OnSubscribe 3
OnNext 1 - 3
OnSubscribe 4
OnNext 1 - 4
OnNext 2 - 0
OnNext 2 - 1
OnNext 2 - 2
OnNext 2 - 3
OnNext 2 - 4

無駄な処理が減りましたね?

この後もsubscribeされる度にOnNextだけが表示されます。

retry / retryWhen

APIクライアントをObservableで実現なんてのをやっていると、色々な付随処理を全部Observableでやりたくなるわけですが、

その中で一番単純なのがリトライかと思います。特にモバイルのアプリではネットワークのエラーで通信をやり直すのは日常茶飯事ですね。

そこで登場するのがretry operatorです。

int[] retryCount = new int[1];
retryCount[0] = 0;
Observable<Integer> observable = Observable.<Integer>create(subscriber -> {
    if (retryCount[0] < 5) {
        subscriber.onError(new RuntimeException(String.format(Locale.US, "Error %d", retryCount[0])));
    } else {
        subscriber.onNext(retryCount[0]);
        subscriber.onCompleted();
    }
    retryCount[0]++;
});

observable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext %d", integer));
}, Throwable::printStackTrace);

5回Subscribeされるまで延々とRuntimeExceptionを吐きつづけるだけのObservableです。

もちろんネットワークのエラーはこんなに単純ではなくて、いつ起きるかわかりませんが、サンプルコードなのでシンプルに。

このコードを実行すると、以下のように表示されます。

java.lang.RuntimeException: Error 0
    at jp.takuji31.rxac2015.RetryOperator.lambda$main$0(RetryOperator.java:16)
    at rx.Observable.subscribe(Observable.java:8191)
    at rx.Observable.subscribe(Observable.java:8158)
    at rx.Observable.subscribe(Observable.java:7962)
    at jp.takuji31.rxac2015.RetryOperator.main(RetryOperator.java:23)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

例外が吐かれますね。

ここは何としても処理を成功してほしいのでretry operatorを使いましょう。

Observable<Integer> retryObservable = observable.retry();

retryObservable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext %d", integer));
}, Throwable::printStackTrace);

実行してみましょう。

OnNext 5

5回の再試行の末にやっと成功しました!

retryは試行回数を制限することもできます。

Observable<Integer> threeTimeRetryObservable = observable.retry(3);

threeTimeRetryObservable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext %d", integer));
}, Throwable::printStackTrace);
java.lang.RuntimeException: Error 3
    at jp.takuji31.rxac2015.RetryOperator.lambda$main$0(RetryOperator.java:16)
    at rx.Observable.unsafeSubscribe(Observable.java:8098)
    at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:278)
    at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:77)
    at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:56)
    at rx.internal.operators.OnSubscribeRedo$5.request(OnSubscribeRedo.java:366)
    at rx.Subscriber.setProducer(Subscriber.java:209)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:358)
    at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:55)
    at rx.Observable.subscribe(Observable.java:8191)
    at rx.Observable.subscribe(Observable.java:8158)
    at rx.Observable.subscribe(Observable.java:7962)
    at jp.takuji31.rxac2015.RetryOperator.main(RetryOperator.java:40)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

3回リトライの末、4回目でエラーを返しました。

また、retryWhenを使えば、リトライの試行を遅らせたりやめさせたりできます。

Observable<Integer> retryWhenObservable = observable.retryWhen(observable1 -> {
    return observable1.flatMap(o -> Observable.timer(3, TimeUnit.SECONDS));
});

retryWhenObservable.subscribe(integer -> {
    System.out.println(String.format(Locale.US, "OnNext %d", integer));
}, Throwable::printStackTrace);

こうすることで、毎回エラーが返された時に3秒ずつ待ってから再試行されます。

APIへのリトライを行う時には必須ですね。

onErrorResumeNext / onErrorReturn

onErrorResumeNextはエラーが返された時に特定のObservableを用いて処理を継続できるOperatorです。

Observable<String> observable = Observable.range(0, 5).flatMap(integer -> Observable.error(new RuntimeException("String not found!!!")));
observable.subscribe(s -> {
    System.out.println(s);
}, Throwable::printStackTrace);

エラーを返すだけのObservableです、もちろんsubscribeしてもエラーしか返しませんね?それも5回やっても毎回例外返してくるひどいやつです。

Observable<String> resumeNextObservable = observable.onErrorResumeNext(throwable -> Observable.just("Resume", "Next"));
resumeNextObservable.subscribe(s -> {
    System.out.println(s);
}, Throwable::printStackTrace);

こうしてやることで、処理を継続できます。結果は以下の通り。

Resume
Next

対してonErrorReturnは単一の値を返します。

Observable<String> returnObservable = observable.onErrorReturn(throwable -> "OnErrorReturn");
returnObservable.subscribe(s -> {
    System.out.println(s);
}, Throwable::printStackTrace);

結果は以下の通り。

OnErrorReturn

サンプルコード

今回のサンプルコードはGithubにあります。

github.com

まとめ

map、flatMap、filter等のよく使うOperatorほど存在感はありませんが、便利なOperator達を紹介しました。

かゆいところに手が届くRxJavaの便利なOperator達を是非活用してみてください。

明日はizumin5210さんの「SingleとSingleSubscriber」です、お楽しみに!