[Firebase][RxJava,RxSwift]Firestoreリッスン時のコールバックがMainスレッドで呼ばれるのをどうにかする

Firestore & ReactiveX 実践ネタです。 まだ未読の方は、先にこちらをどうぞ → [Firebase] ReactiveX(RxJava,RxSwift,RxJS) を使ったFirestoreのクライアント実装

コールバックの処理スレッドを制御する

Firestoreのドキュメントやコレクションをリッスンすると、データの取得時およびアップデートの都度、コールバックが呼ばれますが、このコールバックは必ずメインスレッドで呼び出されます1

ですので、何も考えずにここに処理を繋げるのはNGです。単にtoObject()を使ってMapからオブジェクト化するような処理でも、モバイルのCPUにとっては決して安くはありません。コレクションをクエリするような場合は尚更です。

他方、スレッド制御が絡むと、一般的には実装が複雑になりがちです。「とりあえず」「この程度なら」とそのままメインスレッドでやっちゃう気持ちも良く分かります。ReactiveXを使えば、このような悩みもシンプルに解決できます。

コード

ReactiveX の Schedulers を単独で使ってみる

Android

import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
(略)

private fun log(tag: String, text: String) {
  Log.d(tag, "$text: thread-id='${android.os.Process.myTid()}' ${if (Looper.myLooper() == Looper.getMainLooper()) "(main)" else ""}")
}

fun testSchedulers() {
  log("SchedulerTest", "1")

  Schedulers.io().createWorker().schedule {
    log("SchedulerTest", "2")
  }

  AndroidSchedulers.mainThread().createWorker().schedule {
    log("SchedulerTest", "3")
  }

  log("SchedulerTest", "4")
}

使い方は非常に簡単・明快ですね。これだけで別スレッドで処理させる事が可能です。

Log
1: thread-id='16002' (main)
2: thread-id='16086' 
4: thread-id='16002' (main)
3: thread-id='16002' (main)

スレッドが絡むので、ログの出力順は必ずしも上のようにはならないとは思いますが、メインスレッドとバックグラウンドスレッドとで処理が実行されている事が確認できます。

subscribeOn() じゃダメなの?

さて、では改めてFirestoreのコールバックに関して見てみましょう。ReactiveXで処理スレッドを制御するオペレータとしてsubscribeOn()がありますが、これでダメなのかというとダメです。実際にやってみましょう。

Android

fun testSubscribeOn() {
  val roomId = "xxxxxxxxx"
  Observable.create<Int> { emitter ->

    log("SchedulerTest", "without callback")

    FirebaseFirestore.getInstance()
        .collection("chatrooms/$roomId/messages")
        .addSnapshotListener { snapshot, exception ->

          log("SchedulerTest", "within callback")

          if (exception != null) {
            emitter.onError(exception)
          } else {
            emitter.onNext(snapshot!!.size())
          }
        }
  }
  .subscribeOn(Schedulers.newThread())
  .subscribe()
}
Log
without callback: thread-id='19792' 
within callback: thread-id='19709' (main)

Observable.createブロック内はsubscribeOn()の影響を受けていますが、Firestoreのリスナーコールバック内は変わらずメインスレッドです。これは期待する動作ではないですね。考えれば尤もですが、スレッドのスイッチングは自力で実現する必要があります。が、これは上で見たように、ReactiveXのSchedulerを使えば簡単です。

コールバック→Observable化実装にSchedulerを追加する

先のブログで紹介した、FirestoreのリアルタイムアップデートをObservable化する実装です。こちらに、Schedulerによるスレッド制御機構を追加すると以下のようになります。

Android

class CollectionStream {
  companion object {

    @SchedulerSupport(SchedulerSupport.CUSTOM)
    fun <T: Any> from(query: Query, documentClass: Class<T>, scheduler: Scheduler = Schedulers.computation()): Observable<List<T>> {

      return Observable.create { emitter ->
        val disposables = CompositeDisposable()
        val registration = query.addSnapshotListener { snapshot, error ->

          /*
           * 指定されたスケジューラで処理を行う
           *
           * 戻り値であるdisposableをハンドルする事の効果は検証できていない。一応理屈では
           * CollectionのUpdateが高頻度で発生してスレッドにキューが滞留しているような状況で、
           * SubscriberがStreamをdispose()した場合には、まだ処理が始まっていないキューに関してはキャンセルされる(はず)。
           */
          val disposable = scheduler.createWorker().schedule {

            if (error != null) {
              emitter.onError(error)
            } else {
              val list = snapshot!!.documents.map { d ->
                try {
                  d.toObject(documentClass)
                } catch (e: Exception) {
                  null
                }
              }.filterNotNull()
              emitter.onNext(list)
            }

            Disposables.empty()
          }
          disposables.add(disposable)
        }

        emitter.setDisposable(Disposables.fromRunnable {
          disposables.clear()
          registration.remove()
        })
      }
    }
  }
}

iOS

class CollectionStream {

    static func from<T: Document>(query: Query, documentClass: T.Type,
                                  scheduler: SchedulerType = ConcurrentDispatchQueueScheduler(qos: .background)) -> Observable<[T]> {

        return Observable.create { observer in
            let disposables = CompositeDisposable()
            let listener = query.addSnapshotListener() { (snapshot, err) in

                let disposable = scheduler.schedule(()) { _ in    
                    if let err = err {
                        observer.on(.error(err))
                    } else {
                        let list: [T] = snapshot!.documents
                            .map { doc in T(id: doc.documentID, data: doc.data()) }
                            .compactMap { $0 } // remove nil
                        observer.on(.next(list))
                    }

                    return Disposables.create() // nop
                }
                _ = disposables.insert(disposable)
            }
            return Disposables.create {
                listener.remove()
                disposables.dispose()
            }
        }
    }

RxSwiftでも基本構造は変わらないので扱いやすいですね。スケジューラの種類はiOSに馴染みのあるものになっています。

どちらもデフォルトスケジューラにバッググランド処理系を指定しています。必要に応じて observeOn() でメインスレッドに戻す等して下さい。

まとめ

Firestoreのコールバックはかならずメインスレッド上で呼ばれるため、適切にObservable化するにはスレッドの制御は必須となります。 この記事では、ReactiveXの提供する機能を使って、非常に簡単にスレッド制御機構を追加できる事を紹介しました。この実装は一例ですが、Firestoreを使いこなすにあたって参考になれば幸いです。
またReactiveXのスケジューラは、オペレータ引数としてだけでなく、単独で使っても非常に手軽で便利なので、どんどん使っていきましょう。


  1. この挙動に関しては、ドキュメント上で明記されてる箇所は見つけられませんでした。