[Firebase] ReactiveX(RxJava,RxSwift,RxJS) を使ったFirestoreのクライアント実装

背景

前職でコミュニケーションサービスをゼロから立ち上げるにあたって、がっつりFiretoreを実戦投入しました。その際に調べた事や挑戦、ノウハウなどをブログとして残そうと思った次第です。
一からの入門記事ではございませんので、そこのトコロはご了承下さい。分かる方はコードブロックだけ眺めて下さればOKです。

APIサーバレスなアーキテクチャ

Firestoreの(ここで触れておきたい)特徴

  • 分散DB
  • クライアントはDBに直接接続する
  • クライアントはローカルにもDBを持つ(単なるReadキャッシュではない)

その他は公式を参照するなりしてください。

困り事

上述のようにクライアントはDBサーバに直接接続する構成を採るため、APIサーバのような抽象化レイヤが介在できません。そのため、抽象化のための実装は、各クライアント上に実現する事になります。つまり

  • APIサーバでやってた処理がクライアントのModel層へ
  • クライアントの種類の分だけ実装が必要

になります。じゃあ、とAndroid、iOS、Webの三つ分をそれぞれ別個に実装していると、後の管理・保守工数含めて大変な事になってしまいますよね? Firestoreは後述する要素も含めて、非常に魅力的なアーキテクチャなのですが、この現実は頂けません。対策必須です。
我々はこの問題に対処するため、 ReactiveX (Reactive Extentions) を採用しました。ReactiveXを使うとどうなるか、それを具体的に紹介するのがこの記事の目的です。

ReactiveXでModel層を(出来るだけ)統一的に実装する

ReactiveXに関しては詳細は公式等参照なりググるなりして頂くとして、

  • 各言語毎に実装があり、そこで使用するオペレータ(の名称や、それが実現する処理の内容)などは同じ
  • よって類似性の高いコードを書くことが出来る
  • それによりトータルの開発工数や管理工数を削減出来る

という三段論法です(加えて四段目として、Firestoreのクライアント実装にReactiveXを使えば色々便利だよ、というのもあります)。
では早速

コード

1. クライアント毎にObservable化する

公式にある通り、クライアントはFirestoreのドキュメントやコレクションを直接リッスンします。これにより、対象がアップデートされる都度、データを受け取る事が出来ます。これを一連のデータストリームとして取り扱うのが、ここでの主眼となります。

先ずは実際に、FirestoreのリアルタイプアップデートをObservable化するコードを、各クライアント毎に書いてみます。任意のQueryでコレクションを取得する単純なサンプルです。実戦的部分はここでは省略(また後日)。
import com.google.firebase.firestore.*
import io.reactivex.Observable
import io.reactivex.disposables.Disposables

class CollectionStream {
  companion object {
    fun <T: Any> from(query: Query, documentClass: Class<T>): Observable<List<T>> {
      return Observable.create { emitter ->
        val registration = query.addSnapshotListener { snapshot, error ->
          if (error != null) {
            emitter.onError(error)
          } else {
            val list = snapshot!!.documents.map { d ->
              try {
                d.toObject(documentClass)
              } catch (e: Exception) {
                null
              }
            }.filterNotNull()
            emitter.onNext(list)
          }
        }
        emitter.setDisposable(Disposables.fromRunnable {
          registration.remove()
        })
      }
    }  
※ Kotlinのサンプルでは簡便化のため `toObject()` を使っていますが、そのままではDocumentIDは保持されませんし、制約や前提を十分把握した上で実戦投入する事をお勧めします。私はiOS等と実装を揃える目的もあり、自身でコンバータを実装しましたが、ここではそこまでは踏み込みません。
import Foundation
import FirebaseFirestore
import RxSwift

protocol Document {
  init?(id: String, data: [String : Any])
}

class CollectionStream {
  static func from<T: Document>(query: Query, documentClass: T.Type) -> Observable<[T]> {
    return Observable.create { observer in
      let listener = query.addSnapshotListener() { (snapshot, err) in
        if let err = err {
          observer.on(.error(err))
          return
        }
                
        let list: [T] = snapshot!.documents
            .map { doc in T(id: doc.documentID, data: doc.data()) }
            .compactMap { $0 } // remove nil
        observer.on(.next(list))
      }
      return Disposables.create { listener.remove() }
    }
  }
import firebase from 'firebase/app'
import 'firebase/firestore'
import { Observable } from 'rxjs';

class CollectionStream {
  static from(query) {
    return Observable.create(observer => {
      let dispose;
      dispose = query.onSnapshot(snapshot => {
        observer.next(snapshot.docs.map(doc => ({ id: doc.id, data: doc.data() })));
      }, error => {
        observer.error(error)
      });
      return () => {
        dispose();
      }
    });
  }

さてこうして取り出したObservableには、クラウド(またはローカル)でデータがUpdateされる都度、そのデータが流れてきます。これらの Observable を subscribe() する事で、アップデートをリアルタイムで取得出来るようになりました。

※ FirestoreはローカルにもDBを持ちます。ローカルにデータがある場合は、これも同じストリームに流れて来ます。また更新処理に関しても、まずローカルに対してUpdateが実行され、次に裏でクラウドにSyncされるという順番になるので、それぞれのタイミングでデータが流れます。このあたりは、もう少し具体的に後述します。
次はこれを元に、Model層に色々なIFを用意しましょう。一般的にAPIサーバでやるように、DB層を隠蔽し、色々加工したデータを提供するイメージです。

2. Model層で色々なIFを用意する

ここでは、ReactiveXの色々なオペレータを使って、目的に応じてデータストリームを加工・合成等してゆきます。

例1. チャットルームでの発言回数をSUMる

Firestoreのサンプルと言えばチャット、という事で、ここでもそれに倣いましょう。”チャットの全ての発言のリスト”を取得するオリジナルのデータストリームがあるとして、ReactiveXではそれをベースに、自分の発言回数を取得するストリームを新たに作成します。

class SampleModel {
  companion object {
    fun getMessagesOf(roomId: String): Observable<List<Message>> {
      val query = FirebaseFirestore.getInstance()
          .collection("chatrooms/$roomId/messages")
          .orderBy("createdAt", Query.Direction.DESCENDING)
      return CollectionStream.from(query, Message::class.java)
    }
    
    fun getSpeakCountAt(roomId: String): Observable<Int> {
      return getMessagesOf(roomId)
          .map { messages ->
            messages.filter { message -> message.ownerUid == "self-uid" }
          }
          .map { it.count() }
    }
  }
class SampleModel {
  static func getMessagesOf(roomId: String) -> Observable<[Message]> {
    let query = Firestore.firestore()  
        .collection("chatrooms/\(roomId)/messages")
        .order(by: "createdAt", descending: true)
    return CollectionStream.from(query: query, returningClass: Message.self)
  }
    
  static func getSpeakCountAt(roomId: String) -> Observable<Int> {
    return getMessagesOf(roomId: roomId)
      .map { messages in
        messages.filter { message in message.ownerUid == "self-uid" }
      }
      .map { $0.count }
  }
import { map } from "rxjs/operators";

class SampleModel {
  static getMessagesOf(roomId) {
    const query = firebase.firestore()
        .collection(`chatrooms/${roomId}/messages`)
        .orderBy("createdAt", "desc");
    return CollectionStream.from(query);
  }

  static getSpeakCountAt(roomId) {
    return self.getMessagesOf(roomId)
      .pipe(
        map(messages => {
          return messages.filter(d => d.data["ownerUid"] === "self-uid");
        }),
        map(list => list.size()),
      )
  }
※ サンプルなので無駄にMap処理を繋げましたが、複数の加工処理を繋げて色々出来る、というイメージが伝わればと思います。
またLimit無しに全件取得するようなQueryは、言うまでもなく乱暴です。ページングや定時バッチので集計と組み合わせる等、上手く設計しましょう。

続けてもう一つサンプル。

例2. 発言者の名前をリアルタイムに解決する(非正規化なしで)

吹き出しの横に発言者の名前を表示したいです。どうしますか?

FirestoreではSQLのテーブルジョインのような事が出来ないため、データ構造上「敢えて非正規化する」という事が一つの手法となります。この例なら、Messageドキュメントのフィールドに発言者の名前情報を持たせる、というのが非正規化ですが、当然ながらユーザが自分の名前を変更した場合には、全発言を走査してデータを書き換える処理が別途必要になります。
ここで紹介するは、MessagesとMembersの2つのデータStreamを合成して一つのStreamにする、言ってみればクライアントサイドでのJoinです。
import io.reactivex.functions.BiFunction

class SampleModel {
  companion object {
    fun getMembersOf(roomId: String): Observable<List<Member>> {
      val query = FirebaseFirestore.getInstance().collection("chatrooms/$roomId/members")
      return CollectionStream.from(query, Member::class.java)
    }

    fun getMessagesAndOwnersOf(roomId: String): Observable<List<Pair<Message, Member?>>> {
      val messagesStream = getMessagesOf(roomId)
      val membersStream = getMembersOf(roomId)
      return Observable.combineLatest(messagesStream, membersStream,
          BiFunction { messages, members ->
            messages.map { msg ->
              val owner = members.find { it.id == msg.ownerUid }
              Pair(msg, owner)
            }
          })
    }
class SampleModel {
    static func getMembersOf(roomId: String) -> Observable<[Member]> {
        let query = Firestore.firestore().collection("chatrooms/\(roomId)/members")
        return CollectionStream.from(query: query, documentClass: Member.self)
    }
    
    static func getMessagesAndOwnersOf(roomId: String) -> Observable<[(Message, Member?)]> {
        let messagesStream = getMessagesOf(roomId: roomId)
        let membersStream = getMembersOf(roomId: roomId)
        return Observable.combineLatest(messagesStream, membersStream) { messages, members in
            messages.map { msg in
                let owner = members.first { $0.id == msg.ownerUid }
                return (msg, owner)
            }
        }
    }
import { combineLatest } from "rxjs";

class SampleModel {
    static getMembersOf(roomId) {
        const query = firebase.firestore().collection(`chatrooms/${roomId}/members`);
        return CollectionStream.from(query);
    }

    static getMessagesAndOwnersOf(roomId) {
        return combineLatest(SampleModel.getMessagesOf(roomId), SampleModel.getMembersOf(roomId))
            .pipe(
                map(([ messages, members ]) => {
                    return messages.map(msg => {
                        const member = members.find(m => m.id === msg.data["ownerUid"]);
                        return {
                            message: msg,
                            owner: member,
                        }
                    })
                }),
            )
    }

このように「クライアントサイドでJoin」すれば、非正規化せずとも、Membersストリームから常に最新のデータが取得できます。この手法ですと、Firestoreからデータを取得する際の自由度が大幅に上がるので、とてもお勧めです。

さて改めて各クライアントのコードをご覧頂けると、とても良く似ている、という印象を持って頂けたのではないでしょうか?

  • 「一つの言語で開発してから横展開」が割と現実的
    ※ 実際に私は、IDEの型推測機能が優秀なAndroidStudio+Kotlin環境でまず書いて、それをXcode環境に持って行く(コピペ+α)、というスタイルで作業してました。
  • 見比べれば、実装ロジックが違ってもすぐに気付く事が出来る
  • 但しスレッド周りは除く

というのが私の評価です。ブログをご覧の方々も同じ感触を持って頂けると嬉しいのですが。

3. ViewでObservableをSubscribeする

さて、ここまで見てきたObservableの実装は、処理そのものではなく「データが流れてきたらこうするよ」という処理の定義です。実際にはSubscriber(主にView)がObservableを subscribe() して初めて、Firestoreに対するListenが開始され、データが流れ始めます(ColdとかHotの話はここでは割愛)。

という訳でSubscriberであるViewのサンプルです。キリが無いのでサンプルはvue.jsだけです。
<script>
  import SampleModel from "../models/SampleModel";
  (略)
  export default {
    name: 'SampleChat',
    props: [ "roomId" ],
    data() {
      return {
        room: {},
        messages: [],
        speakCount: 0,
        subscriptions: [],
      }
    },
    created() {
      this.subscriptions.push(SampleModel.getMessagesAndOwnersOf(this.roomId).subscribe(v => {
        this.messages = v;
      }));
      this.subscriptions.push(SampleModel.getSpeakCountAt(this.roomId).subscribe(v => {
        this.speakCount = v;
      }));
      this.subscriptions.push(SampleModel.getInfoOf(this.roomId).subscribe({
        next: (v) => {
          this.room = v;
        },
        error: (e) => console.log("error", e),
      }));
    },
    beforeDestroy() {
      this.subscriptions.forEach(s => s.unsubscribe());
    },
  }
</script>
<template>
  <div>
    <h1>{{ room.data ? room.data.name : "" }}</h1>
    <ul>
      <li v-for="item in messages">
        {{ item.message.data.text }} ({{ item.owner ? item.owner.data.name : "unknown" }})
      </li>
    </ul>
    <p>あなたの発言回数:{{ speakCount }}</p>
  </div>
</template>

だいぶテンプレ感溢れるコードになるので、VueRxをお勧めします。ライフサイクルに合わせて subscribeunsubscribe も自動で行ってくれます。 初期化フェーズで Vue.use(VueRx) しておいて下さい。

  export default {
    name: 'SampleChat',
    props: [ "roomId" ],
    subscriptions() {
      return {
        messages: SampleModel.getMessagesAndOwnersOf(this.roomId).pipe(tap(v => console.log(v))),
        speakCount: SampleModel.getSpeakCountAt(this.roomId).pipe(tap(v => console.log(v))),
        room: SampleModel.getInfoOf(this.roomId).pipe(
          tap(v => console.log(v)),
          startWith({}),
        ),
      }
    },  

これで、あなたや他の誰かが発言すれば直ぐに自動で、一覧や発言回数が更新されます。
ここまで来たら、発言機能も付けましょう。

4. 更新処理もObservableにする

データ更新系の処理も、データ取得系と同様にObservableとして実装する事が可能です。勿論必ずしもObservable化しなくともよいですが、プレチェックを行う際など、Firestoreのデータ状況を参照したい場合には、取得系のObservableと繋げる事で全体をスッキリ記述できます。
ここではネタとして、発言回数に制限を設けましょう。

import {map, mergeMap, take} from "rxjs/operators";
import {combineLatest, Observable, of} from "rxjs";

class SampleModel {
  static sendMessage(roomId, message) {
    const creation = Observable.create(observer => {
      const docRef = firebase.firestore().collection(`chatrooms/${roomId}/messages`).doc();

      console.log("document will be created locally at first.");
      observer.next(docRef.id); // 新規ドキュメントのIDを返却してみる

      docRef.set({
        text: message,
        ownerUid: UserSession.uid(),
        createdAt: firebase.firestore.FieldValue.serverTimestamp(),
      }).then(() => {
        console.log("document has synced to the cloud.");
        observer.complete();
      }).catch(error => {
        observer.error(error)
      });

      return () => {} // do nothing at unsubscribe()
    });

    // このチャットでは、一人10回までしか発言できない!という仕様とする
    return this.getSpeakCountAt(roomId).pipe(
      take(1),
      mergeMap(count => {
        if (count <= 10) {
          return creation;
        } else {
          return of("speak count has reached to the limit.");
        }
      })
    );
  }
※ observer.next()を使えば、subscriberに値を返す事が出来ます。ここでは生成されたDocumentIDを返していますが、そこから更に処理を繋げて最終的な何かを返すという事もあるでしょう。
※ Firestoreの更新系処理のコールバック(あるいはPromiss.then)について注意すべき点は、オフライン状態だといつまでも呼ばれないという事です。このコールバックは `onSynced` とも言うべきもので、ローカルの更新がクラウドに同期されて初めて発火します。後続処理をこのコールバック内で実装すると、オフライン環境だと先に進めなくなりますので注意が必要です。
これはErrorに関しても同様で、例えばクラウド定義のRuleに抵触するような書き込みであっても、オフラインだとエラースローされません。この場合、オンラインに戻ったタイミングでドキュメントはローカルから削除されます。
  export default {
    data() { 
      form: {
        message: "",
      },
    },
    subscriptions() { ...略... },
    methods: {
      sendMessage: function() {
        SampleModel.sendMessage(this.roomId, this.form.message).subscribe({
          next: v => console.log(v),
          complete: () => {}, // nop
          error: e => console.error(e),
        });
        this.form.message = "";
      },
    }
  }
</script>
<template>
  <div>
    略
    <p>あなたの発言回数:{{ speakCount }}</p>
    <p>
      <input type="text" v-model="form.message"><input type="button" value="send" @click="sendMessage">
    </p>
  </div>
</template>

この実装は、オフライン状態でも普通に動作しますので、是非お試し下さい。クラウドにSync出来なくとも、ローカルデータは追加されるので、そのUpdateを受けて一覧画面は更新されます(※)。
当然ながら、オンラインに復帰するまで他の人の発言は見えませんしその逆もそうですが、オフラインを理由にユーザの操作をブロックするよりも、操作自体は受け入れる方が、UXとしては良いのではと思います(但しその結果、一時的な不整合状態が発生するのを甘受する前提ですが)。

※ createdAtなどのフィールドに serverTimestamp を指定した場合、Syncされるまではその値はNullになります。Nullはソート順に影響しますので、そのままでは意図した並び順にならない可能性があります。要件等に応じて適宜対処する必要があります。このあたりもまたの機会に。

まとめ

Firestoreを採用する際の問題は、各クライアント毎に、正確な実装を行う必要がある事だと思います。単純に手数が増えるというだけでなく、各クライアントが全て正しく同じに実装されなければなりません。これは従来APIサーバ一カ所で正確性を担保するだけで良かったのに比べると、大きなプレッシャーです。

他方、クライアントがデータストアに直接アクセスするFirestoreのアーキテクチャは、クライアントに高いリアルタイム性・オフライン耐性を手軽に提供してくれます。それによって、ユーザに高いレベルのUXを簡単に提供できるようになります。勿論インフラも面倒見なくて良いので楽です。

この記事では、開発・管理コストを削減するために、ReactiveXを使って各クライアントで実装を揃える事を提案・実演してみました。他に類似するものがないこの独特なサービス・アーキテクチャと上手く付き合ってゆくにあたって、この記事が幾らかの参考になれば嬉しいです。