WebSocket のチャットサーバを Akka Streams の MergeHub と BroadcastHub で実装する
ストリームの流れ、来てますね。私も先日のScalaMatsuri2017へ参加したのですが、「Scalaのエコシステム業務利用してみた」という話が多かった印象です。その中でも、Akka Stream はかなり実務レベルで活用されているようでした。今回は、今ノッてる Akka Streams を実際に使ってみて、よくあるサンプルのひとつ、チャットサーバを作ってみたいと思います。Akka Streams の中でも、グラフの動的な操作を行うための MergeHub と BroadcastHub を使ってみましたので、利用を検討している方の参考になれば幸いです。
こんな方を想定しています
- Akka Streams に興味がある
- MergeHub と BroadcastHub を使ったコードがどんなふうになるのか知りたい
バージョン情報
ライブラリ | バージョン |
---|---|
PlayFramework Scala | 2.5.12 |
akka-stream | 2.4.14 (Play 同梱バージョン) |
コード全文はこちらです。
Akka Streams とは
ここ最近、データストリームを扱うための仕組みが充実してきています。ストリームとは「無限のデータ列」を表す言葉で、ストリームデータ処理というと、無限に渡されるデータを変換したり、フィルタしたり、どこか別のところへ書き込んだりすることを言います。この考え方は、たとえばログの処理であったり、従量課金の計算であったり、リアルタイムに発生するデータを、ビジネス上意味のあるかたまりに整理する、という用途でとても有用です。今後も利用が拡大されていくことでしょう。もしかしたら、私たちが消費するコンテンツも、HTTPのリクエスト - レスポンス形式ではなく、蛇口をひねるようにストリームコンテンツを消費する方式が、メジャーになるかもしれませんね(※妄想です)。
Akka Streams は、Java/Scala でストリーム処理を行うためのライブラリです。コード上で、以下のような部品を定義できるのが特長です。
Sourceは例えばログファイルのストリームデータ、Sinkは例えばファイル出力・外部APIのコール、Flowは例えばデータのフィルタ・意味のあるフォーマットに変換、といった具合です。これらの部品を組み合わせて、最終的にグラフをつくり、ストリーム処理系としてピタゴラスイッチのようなものを定義する、というのが Akka Streams の基本的な考え方です。
Akka Streams を使って チャットサーバの"メッセージバス"を実現する
Akka Stremas でチャットサーバを実装するにあたり、下図のような作戦を考えました。
同じチャットルームに Join したユーザは、同室にいるすべてのクライアントのメッセージを受け取れる し、同室にいるすべてのクライアントへメッセージを送信できる という要件があります。実はこれ、普通の Akka Streams の部品を使うだけでは、実現できません。先で述べたように、基本的に Akka Streams のグラフは「閉じている = 実行可能」であり、グラフを構成する部品は、あらかじめ役割が決まっています。ログの振り分けや変換など、一般的な処理にはこの仕組みで十分です。しかし今回つくろうとしているチャットサーバのように、動的に部品へのつなぎ口を増減させる 場合は、別の仕組みが必要になります。それが、MergeHubとBroadcastHubです。これらを使って Publish - Subscribe可能な ストリームバス をつくります。
private def create(roomId: String): ChatRoom = { // MergeHubとBroadcastHubを使って、バスのパーツを作ります。 val (sink, source) = MergeHub.source[ChatMessage](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) .run() //排水口をつくります。 source.runWith(Sink.ignore) val channel = ChatChannel(sink, source) // パーツを使って、チャットメッセージのバスをつくります。 val bus: Flow[ChatMessage, ChatMessage, UniqueKillSwitch] = Flow.fromSinkAndSource(channel.sink, channel.source) .joinMat(KillSwitches.singleBidi[ChatMessage, ChatMessage])(Keep.right) .backpressureTimeout(3.seconds) .map { e => println(s"$e $channel") e } ChatRoom(roomId, bus) }
このコードを実行すると、以下のようなパーツができあがります。このパーツは、全体としてはFlowの部品になることに加え、入り口と出口に動的に接続することができるバスとして振る舞います。チャットサーバには、このFlowを使ってやると良さそうです。
WebSocket の入力 と出力
WebSocketの入出力と、Akka Strems の世界をどのようにつなぎ合わせればよいでしょうか。それには Play Framework の力を借ります。HTTPでのリクエストと同じように、routes
ファイルにパスを、Controller
にリクエストの受け口とレスポンスを書くことになります。Play Framework のドキュメントによると、Controller
の実装はFlowを作ってやると、Playが自動でWebSocketの入力をFlowのinに、FlowのoutをWebSocketの出力としてくれるようです。以下は公式ドキュメントからの引用です。
import play.api.mvc._ import play.api.libs.streams._ class Controller1 @Inject() (implicit system: ActorSystem, materializer: Materializer) { def socket = WebSocket.accept[String, String] { request => // ↓Flow[String, String, _]型 ActorFlow.actorRef(out => MyWebSocketActor.props(out)) } }
非常にシンプルに書けますね。WebSocketの受け入れ口に対しては Akka Streams のFlowを渡してやれば良いということがわかりました。ということで、先程作成したメッセージバスもFlowなので、つなぎこんでやればよさそうです!
文字列とChatMessageの変換処理
さきほど、図で示したメッセージバスは、InとOutの型がどちらもChatMessage
でした。WebSocket.accept[String, String]
という型情報からもわかるとおり、ここではWebSocketの入力と出力は文字列で扱うことにするので、文字列をChatMessage
へ変換する部品が必要です。データを変換する部品といえば、Flowでしたね。入力用と出力用にそれぞれふたつ用意しました。最終的にチャットサーバ用のControllerは以下のようにしました。
# chatroom GET /chat/stream/:roomId controllers.chat.ChatController.start(roomId)
class ChatController @Inject()( implicit val system: ActorSystem, implicit val materializer: Materializer, streamChatService: ChatService ) { def start(roomId: String) = WebSocket.accept[String, String] { request => val userName = request.queryString("user_name").headOption.getOrElse("anon") // 入力側のFlow。ここで文字列からChatMessageへ変換します。 val userInput: Flow[String, ChatMessage, _] = ActorFlow.actorRef[String, ChatMessage](out => ChatRequestActor.props(out, userName)) // チャットルーム。実体はメッセージバスのFlowです。 val room = streamChatService.start(roomId, userName) // 出力側のFlow。ここでChatMessageから文字列へ変換します。 val userOutPut: Flow[ChatMessage, String, _] = ActorFlow.actorRef[ChatMessage, String](out => ChatResponseActor.props(out,userName)) // 変換器とメッセージバスをつなぎます。 userInput.viaMat(room.bus)(Keep.right).viaMat(userOutPut)(Keep.right) } }
最終的には、図で示すと以下のようなイメージになります。
使ってみた
node.js製のWebSocketクライント、wscatをインストールして試しました。
複数のユーザでメッセージバスを共有して、お互いのメッセージが相互にやりとりできていることがわかります。Akka Streams を使って、チャットルームを実装することができました。
まとめ
Akka Streams は無限の可能性を秘めています。今回試したのは文字列を受け付けてそれを流すタイプのストリームでしたが、バイトストリームを流して、それをクライアント側でデコードすることができれば、さまざまなメディアを同じ仕組みで配信することが可能かもしれません。クライアントサイドも Angular2 などで WebSocket クライアントを実装することができるようですので、Akka Streams と連携してどんな情報がやりとりできるのかをこれから試していきたいです。