Elixirのノード接続とメッセージ通信

elixir-flame-i-catch

Elixirでプログラミングをするときは頻繁にプロセス間でメッセージ通信を行います。
Elixirのプロセスはネットワークに透過的なため、別ノード(別VM)へのプロセスへメッセージを送信することができます。
また、別ホスト(別サーバ)で動いているプロセスへも送信できます。

elixir-nodes

今回は異なるVM間、異なるホスト間のメッセージ通信を試してみます。

ノード接続

メッセージ通信を行うノード同士で接続します。
まず最初に名前付きでノードを起動しましょう。
(@以降の内容はホストネームです。hostname -s で確認できます)

    
$ iex --sname foo
iex(foo@kanmo)>

次に別のターミナルウィンドウを開き、同じく名前付きのノードを起動します。

    
$ iex --sname bar
iex(bar@kanmo)>

それでは、ノード同士で接続します。

    
iex(foo@kanmo)> Node.connect(:bar@kanmo)
true

接続できました、確認してみましょう。

    
iex(foo@kanmo)> Node.list
[:bar@kanmo]

別のノードでも確認してみます

    
iex(bar@kanmo)> Node.list
[:foo@kanmo]

両方のノードで接続を確認できました。

ノードの自動接続

ここで面白い機能を一つ、ノードの自動接続をご紹介します。
まずクラスタに新しくノード追加します。

    
$ iex --sname baz
iex(baz@kanmo)> Node.connect(:foo)

新しくノードbazをノードfooに接続しました。 追加したノードbazの接続リストを確認してみましょう。

iex(baz@kanmo)> Node.list
[:foo@kanmo, :bar@kanmo]

接続していないはずの:bar@kanmoとも接続しています。Elixirのノードはクラスタ内のノードに接続すると他のすべてのノードにも自動的に接続します。

メッセージ通信

それではノードから別のノードへメッセージを送信します。事前にモジュールを作成しておきます。

ping_pong.ex

# サーバプロセスのモジュール    
defmodule PingPong do
  def server_start do
    pid = spawn(__MODULE__, :loop, [])
    :global.register_name(:server, pid)
  end
    
  def loop do
    # 2. クライアントプロセスからのメッセージを受信する
    receive do
      {sender, msg} ->
        IO.puts "received message: #{msg}"
        # 3. クライアントプロセスへメッセージを送信する
        send sender, {:ok, "#{msg} Pong!!!!!"} 
    end
    loop
  end
end

# クライアントプロセスのモジュール        
defmodule Client do
  def start do
    # 1. サーバプロセスへメッセージを送信する
    send :global.whereis_name(:server), {self, "Ping"}
    
    # 4. サーバプロセスからメッセージを受信する
    receive do
      {:ok, message} ->
        IO.puts message
    end
  end
end

それぞれVMを立ち上げてコードをコンパイルします。

    
$ iex --sname foo
iex(foo@kanmo)> c("ping_pong.ex")
    
$ iex --sname bar
iex(bar@kanmo)> c("ping_pong.ex")

別ノードに接続をしてから、サーバプロセスを立ち上げます。

iex(foo@kanmo)> Node.connect(:"bar@kanmo")
iex(foo@kanmo)> PingPong.server_start
:yes

それではクライアントプロセスを立ち上げてプログラムを試してみましょう。別ノードで起動します。

    
iex(bar@kanmo)> Client.start
Ping Pong!!!!!
:ok  

サーバプロセスからメッセージを受け取って出力できています。 サーバプロセスを起動しているノードでも、クライアントプロセスからメッセージを受信しているのが分かります。

    
iex(hoge@kanmo)> PingPong.server_start
:yes
received message: Ping

ノードから別ノードへメッセージを送信することができました!

別ホストのノードと接続、メッセージ送信。

次に別ホスト(別サーバ)のノードで動いているプロセスとメッセージ通信を行いましょう。
今回はEC2で試します。

セキュリティグループの設定

最初にノード間が通信するためのポートをセキュリティグループに設定します。
ポートは4369番と9100-9155番をInboundに設定します。

Elixir-SG

(このサンプルは一時的に使用するのみなのでSource IPに0.0.0.0/0 に設定していますが、 実際に使用する場合は適切なセキュリティグループを設定してください。)

ノードの起動

それではノードを起動しましょう。引数にノードのメッセージ通信用のポート番号を指定します。
また、--cookie オプションも指定します。通信するノードはここで指定するcookieの値を同じ値にする必要があります。
(別のノードからのアクセスに対する基本的なセキュリティ機能です)。

$ iex --name a@198.51.100.0 --cookie pingpong --erl "-kernel inet_dist_listen_min 9100 inet_dist_listen_max 9155"
iex(a@198.51.100.0)1> c("ping_pong.ex")

別のノードでも起動します。

$ iex --name b@203.0.113.0  --cookie pingpong --erl "-kernel inet_dist_listen_min 9100 inet_dist_listen_max 9155"
iex(b@203.0.113.0)1> c("ping_pong.ex")

ノード接続、実行

先ほどと同様にノード間で接続をしてメッセージ通信を行います。
サーバ

iex(a@198.51.100.0)> Node.connect(:"b@203.0.113.0")
iex(a@198.51.100.0)> PingPong.server_start
:yes

クライアントプロセス

    
iex(b@203.0.113.0)> Client.start
Ping Pong!!!!!
:ok  

うまくいきました!別ホストのプロセスともメッセージ通信に成功しています。

Task.async & Task.await

ノード間のメッセージ通信とは直接関係ないですが、Elixirの非同期処理の手法を一つご紹介します。
Elixirでは複数プロセスを立ち上げバックグラウンドでプロセスが処理を行い、結果を待ち受ける非同期処理を簡単に書けます。
以下は階乗計算をバックグラウンドプロセスで実行する例です。

    
defmodule Fact do
  def of(0), do: 1
  def of(n), do: n * Fact.of(n-1)
end  
    
Task.async(fn -> Fact.of(10) end)
|> &Task.await(&1)
|> IO.puts

複数ホスト、複数ノードでの非同期処理

それでは、ここまでで試した機能を使ってもう少し大きなコードを書いてみます。
2台のEC2でそれぞれノードを立ち上げ非同期処理を行います。
コードの内容は何でも良いのですが、このサンプルでは手元にある複数企業の株価コードを元に株価とその関連情報を取得します。

プロジェクトの作成

mix new stocker

クラスタの設定

クラスタを構築するノードを設定します(config/config.ex)。

use Mix.Config

config :stocker, master_node: :'a@198.51.100.0'

config :stocker, slave_nodes: [:'b@203.0.113.0',
                               :'c@203.0.113.0']                              

ノード接続

実行時にそれぞれのノードを接続します。以下のように接続します。

def main do
  Application.get_env(:stocker, :master_node)
  |> Node.start

  Application.get_env(:stocker, :slave_nodes)
  |> Enum.each(&Node.connect(&1))
   
  (略)
end

非同期処理

取得する企業の情報は1000件以上あるので逐次処理でやっては時間がかかり過ぎてしまいます。
Task.async & Task.awaitでプロセスを生成し、非同期で処理をします。

def fetch_stocks(nodes, code_per_node)
  (略)
  
  Enum.zip(nodes, code_per_node)
  |> Enum.flat_map(fn {node, codes} ->
    Enum.map(codes, fn code ->
      Task.Supervisor.async({Stocker.TasksSupervisor, node},
        Stocker.Worker, :start, [String.to_integer(code)])
    end)
  end)
  |> Enum.map(&Task.await(&1, :infinity))
  |> print_results
end  

実行

あとはビジネスロジックを記述して実行します。
今回はビジネスロジックは関係ないので省略します。やっていることはスクレイピングで各企業の株価情報を取得して最後に標準出力するだけです。

elixir-on-ec2s

それぞれのノードでプロセスが起動し処理が行われれば成功です。

まとめ

異なるVM間でメッセージ通信を行うことは普通のプログラミング言語ではなかなか大変です、それが別ホストのVMとなるとさらに実装が大変になります。
Elixirでは通信を行うための複雑な部分はVMで行ってくれるため、他の言語よりも簡単にVM間でのメッセージ通信を実装できます。
興味ある方は是非、メッセージ通信を異なるVM間、ホスト間で試してみてください。

参考情報

The little Elixir & OTP Guidebook
プログラミングElixir

AWS Cloud Roadshow 2017 福岡