ElixirとPhoenixでWebSocketを使ったChatアプリケーションを作る

elixir-flame-i-catch

昨今のWebアプリケーションでは、クライアント、サーバが常時接続してメッセージ通信をすることが求められてきています
(リアルタイムWebアプリケーションと呼ばれたりします)。

このアプリケーションには以下の特徴があります

  • クライアント、サーバが常時接続
  • クライアントからではなく、サーバから接続しているクライアントへ情報をプッシュする
  • 通信の頻度が多い

今回はこのような常時接続型のアプリケーションを実現するために作られたWebSocketの説明と、PhoenixでWebSocketをどのように使って常時接続のアプリケーションを作るか説明します。

WebSoketが必要とされてきた背景

今までのWebアプリケーションのリクエスト/レスポンス処理は以下のようなアーキテクチャでした。

  • 全てのリクエストは状態を持たない
  • サーバはリクエストを毎回新しいものとして扱う(HTTPはもともとステートレスなプロトコルなので前回のリクエスト/レスポンスと関係ない)

web-apps-request-response

一方、常時接続型のアプリケーションの場合は以下のようになります。

  • クライアント、サーバで双方向通信可能なコネクションを維持する
  • クライアントは直接サーバと一つのプロセスで接続する
    • クライアントの数だけプロセスを立ち上げる

websocket-apps-connection

インタラクティブなアプリケーションに多い構成です。
このアーキテクチャは通信の頻度が多くなるのでHTTPを使う場合には通信量の大きさが問題になります。また、ステートレスなHTTPでは状態を維持するのが難しいです。
これらの問題を解決する手段としてWebSocketが作られました

WebSocketの特徴

今までのHTTPに対するWebSocketの優位点は以下です

  1. Webで双方向通信する仕組み
  2. HTTPに比べて小さなデータサイズで必要な情報を送ることができる

もともとHTTPで状態を保つためにはCookieを使うなどして実現してきました。
また、サーバからメッセージをクライアントに送信する場合はAjaxを使ったpolingやcometで対応してきましたが、HTTPの制約を受けるので無駄な通信が発生しますし、遅延が発生してリアルタイム性に欠けるなどの問題がありました。

通信方法

  • TCPと同じようにハンドシェイクでコネクション確立(通信の確立時は今まで通りHTTPを使う)
    • WebSocket Openingハンドシェイクと言います
  • コネクション確立後、WebSocketプロトコルに切り替える
  • フレームと呼ばれる単位でデータ送信する
    • 非常に低コスト(payload dataは最大14byte, HTTPだと数100byte)。

コネクション確率時のヘッダーの内容

コネクション開始時のリクエストヘッダー

WebSokectのコネクションを開始するためのリクエストのhttpヘッダーです。
UpgradeヘッダとConnectionヘッダが含まれていて、これらはHTTPからのアップグレードを意味します

GET /resource HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: kH/XISD+rj+uGkkQHBv/Dw==

レスポンスヘッダー

続いてレスポンスヘッダーです。ステータスコード101は switching protocolsを意味します。

HTTP/1.1 101 OK
Connection: Upgrade
Upgrade: websocket
sec-websocket-accept: ZCk6jJdyY474YOdCQNnFwjItvV0=

アプリケーションに残された課題

双方向通信するアプリケーションのアーキテクチャの場合、サーバは1クライアント1接続を維持する必要があります。
この場合、アプリケーションに以下の問題が発生します。

  1. クライアントと接続し続けるので負荷が高い(普通に作るとサーバの台数を増やす必要がある)
  2. リクエスト毎にプロセス、スレッドを立ち上げていくとメモリ消費が多く破綻する
  3. 通信の頻度が多い分エラーも多く発生する

Elixir/Phoenixにはこれらの問題を解決できる以下の機能があります

  • 一つのActor(軽量プロセス)を一つのTCPコネクションに割り当てる
    • 既存の言語は一つのTCPコネクションで一つのプロセスをフォーク or スレッド生成するのでコンテキストスイッチのコストが高い
    • 軽量プロセスはものすごく低コスト(1軽量プロセス300ワード). OSのプロセスではなくGreenスレッドのため
  • ErlangVMの耐障害性、Actor毎のGCなのでFullGCが発生しない

Phoenixのリアルタイム通信機能

ElixirのWebアプリケーションFWであるPhoenixには、ChannelというWebSocketを使ったアプリケーションを作る仕組みが用意されています。

Channel

  • Phoenixがサポートするリアルタイム通信機能
  • 送信者はトピックに関するメッセージをブロードキャスト、受信者はトピックを購読してメッセージを受信
  • デフォルトではWebSocketプロトコルを使う

接続しているクライアントへbroadcastでメッセージを送信します

  1. clientでイベントが発生
  2. serverへメッセージ送信
  3. serverは接続している全クライアントへメッセージ送信

broadcast

Channelの機能

PhoenixのChannel機能は大きくレイヤー分けすると以下のようになります。 (全機能の詳細はこのページを参考にしてください)

  1. Transport
  2. Channels
  3. PubSub

phoenix-channel

それぞれ機能の概要は以下です

  1. WebサーバとPhoenixは一つの接続を経由して複数のchannel socketへメッセージをディスパッチします
  2. Channelはディスパッチされたクライアントからのメッセージを処理します(MVCのControllerのような役割)
  3. PubSubはクライアントにメッセージをbroadcastする仕組みを提供します

サンプル WebSocketを使ったChatアプリケーション

それではPhoenixのChannel機能とWebSocketを使ったChatアプリケーションの実装を見ながら、どのように実現しているのか確認しましょう。

ソケットをエンドポイントにマウント

以下のようにしてsocektをUserSocketにマウントします。このモジュール(UserSocket)は全ての接続の開始点となります。 lib/hello/endpoint.ex

defmodule Hello.Endpoint do
  use Phoenix.Endpoint

  socket "/socket", Hello.UserSocket
  ...
end

user_socket.exでWebSocketを使用することを宣言しています(transpot :websocket)。
クライアントが"talks:"から始まるトピックのメッセージを送ると、メッセージはTalkChannelへ転送されます。

web/channels/user_socket.ex

defmodule Hello.UserSocket do
  use Phoenix.Socket

  transport :websocket, Phoenix.Transports.WebSocket

  channel "talks:*", Hello.TalkChannel

  @max_age 2 * 7 * 24 * 60 * 60

  def connect(%{"token" => token}, socket) do
    case Phoenix.Token.verify(socket, "user socket", token, max_age: @max_age) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _reason} ->
        :error
    end
  end

  def id(socket), do: "users_socket:#{socket.assigns.user_id}"

UserSocketモジュールではconnectidという二つのメソッドが定義されています。
idはソケットを識別するため(user IDのようなもの)に定義します。
connectは接続要求のあったクライアントと接続を確立するか決めるメソッドです。ここではトークンを確認して接続を確立しています。

トピックへ参加

TalkChannelモジュールで接続してきたクライアントに対して参加を許可します。
{:ok, socket}を返却します

web/channels/talk_channel

defmodule Hello.TalkChannel do
  use Phoenix.Channel

  def join("talks:hello", msg, socket) do
    {:ok, socket}
  end

クライアントの実装

socketの生成

各JSファイルをインポートします。 web/static/js/app.js

import socket from "./socket"
import Talk from "./talk"

Talk.init(socket, document.getElementById("msg-input"))

このファイルでソケットオブジェクト作成しています。 web/static/js/socket.js

import {Socket} from "phoenix"

let socket = new Socket("/socket", {
    params: {token: window.userToken},
    logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data); }
});

export default socket

イベントリスナーの登録

ここでは、以下の処理を行っています

  1. サーバとの接続の確率
    • socket.connectでサーバとの接続を確立します。
    • channelオブジェクトを生成しています。これはPhoenixのTalkChannelと接続するために使用します。
  2. サーバへのメッセージ送信とサーバからのメッセージ受信のイベントリスナーの登録

web/static/js/talk.js

let Talk = {
    // 初期化処理.ソケットに接続します
    init: function(socket, element) {
        if (!element) { return;}
        socket.connect();
        this.onReady(socket);
    },

    // イベントリスナーの登録.
    // channel.pushで入力されたメッセージをソケットを通してサーバへ送信します
    // channel.onでサーバから送信されたメッセージをソケットを通して受け取り画面に表示します
    onReady: function(socket) {
        let channel = socket.channel("talks:hello", {});
        let chatInput = $("#msg-input");
        let msgContainer = $("#msg-container");

        channel.join()
            .receive("ok", resp => { console.log("Welcom to Phoenix Chat!", resp); })
            .receive("error", resp => {console.log("Unable to join", resp); });

        chatInput.on("keypress", event => {
            if (event.keyCode === 13) {
                channel.push("msg", { body: chatInput.val() });
                chatInput.val("");
            }
        });

        channel.on("msg", payload => {
            msgContainer.append(this.messageTemplate(payload));
            scrollTo(0, document.body.scrollHeight);
        });
    },

    sanitize: function(html) { return $("<div/>").text(html).html(); },

    messageTemplate: function(msg) {
        let username = this.sanitize(msg.user)
        let body = this.sanitize(msg.body);
        return(`<p><a href='#'>[${username}]</a>&nbsp; ${body}</p>`)

    }
}

export default Talk

メッセージの受信、ブロードキャスト

handle_inメソッドでメッセージを受信します。受け取ったメッセージはbroadcast!メソッドで他の接続しているクライアントに送信します

web/channels/talk_channel.ex

defmodule Hello.TalkChannel do
  use Phoenix.Channel

  def join("talks:hello", msg, socket) do
    {:ok, socket}
  end

  # jsからは`"msg"`というイベント名で送信しているので、そのイベント名でパターンマッチ
  def handle_in("msg", params, socket) do
    user = Hello.Repo.get(Hello.User, socket.assigns.user_id)
    handle_in("msg", params, user, socket)
  end

  # 接続しているユーザーにメッセージをブロードキャストする
  def handle_in("msg", params, user, socket) do
    broadcast! socket, "msg", %{user: user.name, body: params["body"]}
    {:reply, {:ok, %{msg: params["body"]}}, assign(socket, :user, params["user"])}
  end

  # ブロードキャストする前にメッセージをカスタマイズするにはこのコールバックを使う
  # ここでは何もしない
  def handle_out("msg", payload, socket) do
    push socket, "msg", payload
    {:noreply, socket}
  end
end

ソケット認証

今回の本題とは関係ありませんがこのサンプルはソケット接続をするときに接続ユーザーの確認をしています。
HTMLファイルと認証用のモジュールに以下の実装を追加する必要があります。

web/templates/layout/app.html

<script>window.userToken = "<%= assigns[:user_token] %>"</script>
<script src="<%= static_path(@conn, "/js/app.js") %>"></script>

web/controllers/auth.ex

defmodule Hello.Auth do

  ...

  def call(conn, repo) do
    user_id = get_session(conn, :user_id)

    cond do
      user = conn.assigns[:current_user] ->
        put_current_user(conn, user)
      user = user_id && repo.get(Toy.User, user_id) ->
        put_current_user(conn, user)
      true ->
        assign(conn, :current_user, nil)
    end
  end

  defp put_current_user(conn, user) do
    token = Phoenix.Token.sign(conn, "user socket", user.id)

    conn
    |> assign(:current_user, user)
    |> assign(:user_token, token)
  end
  ...
end

サンプルの実行

複数ブラウザを立ち上げてメッセージの送受信ができることを確認してみましょう

それぞれのユーザーが送信したメッセージが表示されていることが確認できました。

まとめ

ここまでWebSoketとPhoenixを使った常時接続型のアプリケーションの説明をしてきました。   イベントが発生したタイミングで接続している全クライアントへbroadcastすることが可能になるので、WebSocketを使えば今まで力づくで作っていた部分をより効率の良い実装で置き換えることができるかもしれません。
私が作ったことがあるアプリケーションの機能で思いつくのは、以下です

  1. お知らせ、クーポンなどのコンテンツの更新
  2. メッセージ通知(メッセンジャーアプリケーション)

Rails5でもWebSocketを使う機能が追加されましたし、今後は様々なWebフレームワークでも常時接続型のアプリケーションを実現する機能が追加されてくる気がします。

参考情報

Channels
Phoenix Framework - Channel 日本語翻訳
WebSocket
WebSocketについて調べてみた。

AWS Cloud Roadshow 2017 福岡