[Android] RxJava〜初心者向け〜

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

背景

RxJava難しいですよね。
何度もネットで調べましたが、何度も挫折しています。
ミニマムで動く物を作り、Rxの雰囲気を掴んでもらえたらと思い書いてみます。

ミニマムの実装

動く物を作った方がモチベーションが沸くのではと考えRxBindingを使います。
WidgetObservableとかViewObservableと呼ばれていたものでRxBindingに全て含まれるようになり、書き方も変更があったようです。
https://github.com/ReactiveX/RxAndroid/wiki#moved
RxBindingについては以前弊社ブログに書きましたので興味があればご覧ください。

まずbuild.gradleに依存関係を記述します。

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    testCompile 'junit:junit:4.12'
    compile 'com.android.support:appcompat-v7:23.2.0'
    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'
    compile 'com.jakewharton.rxbinding:rxbinding:0.2.0'
}

Rxの流れは基本的には以下の3つになります。

  • Observableを作る
  • Operatorで加工する
  • Observerをセットする
        EditText editText = (EditText) findViewById(R.id.edit_text);
        
        RxTextView.textChangeEvents(editText)
                .map(new Func1<TextViewTextChangeEvent, String>() {
                    @Override
                    public String call(TextViewTextChangeEvent textViewTextChangeEvent) {
                        return String.valueOf(textViewTextChangeEvent.text().length());
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer textLength) {
                        Log.d("TextLength=", String.valueOf(textLength));
                    }
                });

ミニマムで書けば以上のようになります。

  • 3行目でObservableを作り
  • 4-9行目でOperatorで加工して
  • 10-15行目でObserverをセットしています

RXはメソッドチェーンでたくさん書かれて最初は何をしているかわかりづらいですが、基本はこの形で書かれています。

3行目のRxTextView.textChangeEvents()EditTextをObserveして変更があったときにTextViewTextChangeEvent型のObservableを返します。

public static Observable<TextViewTextChangeEvent> textChangeEvents(@NonNull TextView view) {
    return Observable.create(new TextViewTextEventOnSubscribe(view));
  }

4行目のmap()は受け取ったObservableを他のObservableに変換するメソッドです。今回の例ではTextViewTextChangeEvent型のobservableをString型に変換して返しています。

Func1<T,R>()はT型のObservableをR型のObservableに変換して返すのでRで指定した型が戻り値になります。
Func2<T1,T2,R>,Func3<T1,T2,T3,R>・・・とあり型引数の数に応じてFunc<T・・・>()があります。

scan()メソッドではFunc2<T,T,R>を使います。
以前書いた弊社ブログでもFunc2<T,T,R>が使われていますので気になる方はご覧ください。

10行目のsubscribe()ではObserver()を引数に取ります。最終的にどのような処理をするのかをここで書きます。
Observer()の代わりにAction1()onNext()の処理を書く事もできますが、通信処理など非同期処理を書くときはOnError()のハンドリングが必要になるのでこの書き方はできないようです。

RxではObservbleの変化に対してObserver、Func、Actionとあり、それぞれ何をするのか混乱しますが
FuncとActionの違いは戻り値があるかどうかで、ObserverはonNext()、onCompleted()、onError()とあり処理のハンドリングをする事ができるという違いがあります。
上にも書きましたが、Observerの代わりにAction1を書く事でonNext()のハンドリングをする事ができます。onError()onCompleted()のハンドリングが必要でない場合はObserverの代わりにAction1を引数に取る事ができます。

実装

上のコードをベースにOperatorを追加します。

        EditText editText = (EditText) findViewById(R.id.edit_text);
        
        RxTextView.textChangeEvents(editText)
                .map(new Func1<TextViewTextChangeEvent, Integer>() {
                    @Override
                    public Integer call(TextViewTextChangeEvent textViewTextChangeEvent) {
                        return textViewTextChangeEvent.text().length();
                    }
                })
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        int minimumLimit = 5;
                        return integer > minimumLimit;
                    }
                })
                .take(5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer textLength) {
                        Log.d("TextLength=", String.valueOf(textLength));
                    }
                });

Operatorを追加しました。
filter()でsubscribeする条件を指定してフィルターします。上の例では5文字以下の時はsubscribeしないようになっています。
take()でsubscribeする回数を指定しています。上の例では最初の5回をsubscribeして、それ以降はsubscribeしないようになっています。

そして次にObserverを追加します。

        EditText editText = (EditText) findViewById(R.id.edit_text);
        
        RxTextView.textChangeEvents(editText)
                .map(new Func1<TextViewTextChangeEvent, Integer>() {
                    @Override
                    public Integer call(TextViewTextChangeEvent textViewTextChangeEvent) {
                        return textViewTextChangeEvent.text().length();
                    }
                })
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        int minimumLimit = 5;
                        return integer > minimumLimit;
                    }
                })
                .take(5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer textLength) {
                        Log.d("TextLength=", String.valueOf(textLength));
                    }
                });

上の例では書いていませんが、onError()onCompleted()のハンドリングが必要であれば記述します。

次にスレッドの指定をします。

        EditText editText = (EditText) findViewById(R.id.edit_text);
        
        RxTextView.textChangeEvents(editText)
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<TextViewTextChangeEvent, Integer>() {
                    @Override
                    public Integer call(TextViewTextChangeEvent textViewTextChangeEvent) {
                        return textViewTextChangeEvent.text().length();
                    }
                })
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        int minimumLimit = 5;
                        return integer > minimumLimit;
                    }
                })
                .take(5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer textLength) {
                        Log.d("TextLength=", String.valueOf(textLength));
                    }
                });

RxTextView.textChangeEvents()はメインスレッドからしかsubscribeできないので例の4-5行目のように書きますが

                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())

通信処理の結果を返すObservableを扱うケースでは以下のようになります。

                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())

最後に

長くなりましたが基本は
- Observableを作る
- Operatorで加工する
- Observerをセットする
という事を理解できればなんとか書けそうな気がしました。