ElixirとPhoenixでWebSocketを使ったChatアプリケーションを作る
昨今のWebアプリケーションでは、クライアント、サーバが常時接続してメッセージ通信をすることが求められてきています
(リアルタイムWebアプリケーションと呼ばれたりします)。
このアプリケーションには以下の特徴があります
- クライアント、サーバが常時接続
- クライアントからではなく、サーバから接続しているクライアントへ情報をプッシュする
- 通信の頻度が多い
今回はこのような常時接続型のアプリケーションを実現するために作られたWebSocketの説明と、PhoenixでWebSocketをどのように使って常時接続のアプリケーションを作るか説明します。
WebSoketが必要とされてきた背景
今までのWebアプリケーションのリクエスト/レスポンス処理は以下のようなアーキテクチャでした。
- 全てのリクエストは状態を持たない
- サーバはリクエストを毎回新しいものとして扱う(HTTPはもともとステートレスなプロトコルなので前回のリクエスト/レスポンスと関係ない)
一方、常時接続型のアプリケーションの場合は以下のようになります。
- クライアント、サーバで双方向通信可能なコネクションを維持する
- クライアントは直接サーバと一つのプロセスで接続する
- クライアントの数だけプロセスを立ち上げる
インタラクティブなアプリケーションに多い構成です。
このアーキテクチャは通信の頻度が多くなるのでHTTPを使う場合には通信量の大きさが問題になります。また、ステートレスなHTTPでは状態を維持するのが難しいです。
これらの問題を解決する手段としてWebSocketが作られました
WebSocketの特徴
今までのHTTPに対するWebSocketの優位点は以下です
- Webで双方向通信する仕組み
- HTTPに比べて小さなデータサイズで必要な情報を送ることができる
もともとHTTPで状態を保つためにはCookieを使うなどして実現してきました。
また、サーバからメッセージをクライアントに送信する場合はAjaxを使ったpolingやcometで対応してきましたが、HTTPの制約を受けるので無駄な通信が発生しますし、遅延が発生してリアルタイム性に欠けるなどの問題がありました。
通信方法
- TCPと同じようにハンドシェイクでコネクション確立(通信の確立時は今まで通りHTTPを使う)
- WebSocket Openingハンドシェイクと言います
- コネクション確立後、WebSocketプロトコルに切り替える
- フレームと呼ばれる単位でデータ送信する
- 非常に低コスト(payload dataは最大14byte, HTTPだと数100byte)。
コネクション確率時のヘッダーの内容
コネクション開始時のリクエストヘッダー
WebSokectのコネクションを開始するためのリクエストのhttpヘッダーです。
UpgradeヘッダとConnectionヘッダが含まれていて、これらはHTTPからのアップグレードを意味します
GET /resource HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Version: 13 Sec-WebSocket-Key: kH/XISD+rj+uGkkQHBv/Dw==
レスポンスヘッダー
続いてレスポンスヘッダーです。ステータスコード101は switching protocolsを意味します。
HTTP/1.1 101 OK Connection: Upgrade Upgrade: websocket sec-websocket-accept: ZCk6jJdyY474YOdCQNnFwjItvV0=
アプリケーションに残された課題
双方向通信するアプリケーションのアーキテクチャの場合、サーバは1クライアント1接続を維持する必要があります。
この場合、アプリケーションに以下の問題が発生します。
- クライアントと接続し続けるので負荷が高い(普通に作るとサーバの台数を増やす必要がある)
- リクエスト毎にプロセス、スレッドを立ち上げていくとメモリ消費が多く破綻する
- 通信の頻度が多い分エラーも多く発生する
Elixir/Phoenixにはこれらの問題を解決できる以下の機能があります
- 一つのActor(軽量プロセス)を一つのTCPコネクションに割り当てる
- 既存の言語は一つのTCPコネクションで一つのプロセスをフォーク or スレッド生成するのでコンテキストスイッチのコストが高い
- 軽量プロセスはものすごく低コスト(1軽量プロセス300ワード). OSのプロセスではなくGreenスレッドのため
- ErlangVMの耐障害性、Actor毎のGCなのでFullGCが発生しない
Phoenixのリアルタイム通信機能
ElixirのWebアプリケーションFWであるPhoenixには、ChannelというWebSocketを使ったアプリケーションを作る仕組みが用意されています。
Channel
- Phoenixがサポートするリアルタイム通信機能
- 送信者はトピックに関するメッセージをブロードキャスト、受信者はトピックを購読してメッセージを受信
- デフォルトではWebSocketプロトコルを使う
接続しているクライアントへbroadcastでメッセージを送信します
- clientでイベントが発生
- serverへメッセージ送信
- serverは接続している全クライアントへメッセージ送信
Channelの機能
PhoenixのChannel機能は大きくレイヤー分けすると以下のようになります。 (全機能の詳細はこのページを参考にしてください)
- Transport
- Channels
- PubSub
それぞれ機能の概要は以下です
- WebサーバとPhoenixは一つの接続を経由して複数のchannel socketへメッセージをディスパッチします
- Channelはディスパッチされたクライアントからのメッセージを処理します(MVCのControllerのような役割)
- PubSubはクライアントにメッセージをbroadcastする仕組みを提供します
サンプル WebSocketを使ったChatアプリケーション
それではPhoenixのChannel機能とWebSocketを使ったChatアプリケーションの実装を見ながら、どのように実現しているのか確認しましょう。
ソケットをエンドポイントにマウント
以下のようにしてsocektをUserSocket
にマウントします。このモジュール(UserSocket)は全ての接続の開始点となります。
lib/hello/endpoint.ex
defmodule Hello.Endpoint do use Phoenix.Endpoint socket "/socket", Hello.UserSocket ... end
user_socket.ex
でWebSocketを使用することを宣言しています(transpot :websocket
)。
クライアントが"talks:"
から始まるトピックのメッセージを送ると、メッセージはTalkChannel
へ転送されます。
web/channels/user_socket.ex
defmodule Hello.UserSocket do use Phoenix.Socket transport :websocket, Phoenix.Transports.WebSocket channel "talks:*", Hello.TalkChannel @max_age 2 * 7 * 24 * 60 * 60 def connect(%{"token" => token}, socket) do case Phoenix.Token.verify(socket, "user socket", token, max_age: @max_age) do {:ok, user_id} -> {:ok, assign(socket, :user_id, user_id)} {:error, _reason} -> :error end end def id(socket), do: "users_socket:#{socket.assigns.user_id}"
UserSocketモジュールではconnect
とid
という二つのメソッドが定義されています。
idはソケットを識別するため(user IDのようなもの)に定義します。
connectは接続要求のあったクライアントと接続を確立するか決めるメソッドです。ここではトークンを確認して接続を確立しています。
トピックへ参加
TalkChannelモジュールで接続してきたクライアントに対して参加を許可します。
{:ok, socket}
を返却します
web/channels/talk_channel
defmodule Hello.TalkChannel do use Phoenix.Channel def join("talks:hello", msg, socket) do {:ok, socket} end
クライアントの実装
socketの生成
各JSファイルをインポートします。 web/static/js/app.js
import socket from "./socket" import Talk from "./talk" Talk.init(socket, document.getElementById("msg-input"))
このファイルでソケットオブジェクト作成しています。 web/static/js/socket.js
import {Socket} from "phoenix" let socket = new Socket("/socket", { params: {token: window.userToken}, logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data); } }); export default socket
イベントリスナーの登録
ここでは、以下の処理を行っています
- サーバとの接続の確率
socket.connect
でサーバとの接続を確立します。channel
オブジェクトを生成しています。これはPhoenixのTalkChannelと接続するために使用します。
- サーバへのメッセージ送信とサーバからのメッセージ受信のイベントリスナーの登録
web/static/js/talk.js
let Talk = { // 初期化処理.ソケットに接続します init: function(socket, element) { if (!element) { return;} socket.connect(); this.onReady(socket); }, // イベントリスナーの登録. // channel.pushで入力されたメッセージをソケットを通してサーバへ送信します // channel.onでサーバから送信されたメッセージをソケットを通して受け取り画面に表示します onReady: function(socket) { let channel = socket.channel("talks:hello", {}); let chatInput = $("#msg-input"); let msgContainer = $("#msg-container"); channel.join() .receive("ok", resp => { console.log("Welcom to Phoenix Chat!", resp); }) .receive("error", resp => {console.log("Unable to join", resp); }); chatInput.on("keypress", event => { if (event.keyCode === 13) { channel.push("msg", { body: chatInput.val() }); chatInput.val(""); } }); channel.on("msg", payload => { msgContainer.append(this.messageTemplate(payload)); scrollTo(0, document.body.scrollHeight); }); }, sanitize: function(html) { return $("<div/>").text(html).html(); }, messageTemplate: function(msg) { let username = this.sanitize(msg.user) let body = this.sanitize(msg.body); return(`<p><a href='#'>[${username}]</a> ${body}</p>`) } } export default Talk
メッセージの受信、ブロードキャスト
handle_in
メソッドでメッセージを受信します。受け取ったメッセージはbroadcast!
メソッドで他の接続しているクライアントに送信します
web/channels/talk_channel.ex
defmodule Hello.TalkChannel do use Phoenix.Channel def join("talks:hello", msg, socket) do {:ok, socket} end # jsからは`"msg"`というイベント名で送信しているので、そのイベント名でパターンマッチ def handle_in("msg", params, socket) do user = Hello.Repo.get(Hello.User, socket.assigns.user_id) handle_in("msg", params, user, socket) end # 接続しているユーザーにメッセージをブロードキャストする def handle_in("msg", params, user, socket) do broadcast! socket, "msg", %{user: user.name, body: params["body"]} {:reply, {:ok, %{msg: params["body"]}}, assign(socket, :user, params["user"])} end # ブロードキャストする前にメッセージをカスタマイズするにはこのコールバックを使う # ここでは何もしない def handle_out("msg", payload, socket) do push socket, "msg", payload {:noreply, socket} end end
ソケット認証
今回の本題とは関係ありませんがこのサンプルはソケット接続をするときに接続ユーザーの確認をしています。
HTMLファイルと認証用のモジュールに以下の実装を追加する必要があります。
web/templates/layout/app.html
<script>window.userToken = "<%= assigns[:user_token] %>"</script> <script src="<%= static_path(@conn, "/js/app.js") %>"></script>
web/controllers/auth.ex
defmodule Hello.Auth do ... def call(conn, repo) do user_id = get_session(conn, :user_id) cond do user = conn.assigns[:current_user] -> put_current_user(conn, user) user = user_id && repo.get(Toy.User, user_id) -> put_current_user(conn, user) true -> assign(conn, :current_user, nil) end end defp put_current_user(conn, user) do token = Phoenix.Token.sign(conn, "user socket", user.id) conn |> assign(:current_user, user) |> assign(:user_token, token) end ... end
サンプルの実行
複数ブラウザを立ち上げてメッセージの送受信ができることを確認してみましょう
それぞれのユーザーが送信したメッセージが表示されていることが確認できました。
まとめ
ここまでWebSoketとPhoenixを使った常時接続型のアプリケーションの説明をしてきました。
イベントが発生したタイミングで接続している全クライアントへbroadcastすることが可能になるので、WebSocketを使えば今まで力づくで作っていた部分をより効率の良い実装で置き換えることができるかもしれません。
私が作ったことがあるアプリケーションの機能で思いつくのは、以下です
- お知らせ、クーポンなどのコンテンツの更新
- メッセージ通知(メッセンジャーアプリケーション)
Rails5でもWebSocketを使う機能が追加されましたし、今後は様々なWebフレームワークでも常時接続型のアプリケーションを実現する機能が追加されてくる気がします。
参考情報
Channels
Phoenix Framework - Channel 日本語翻訳
WebSocket
WebSocketについて調べてみた。