Elixirのプロセス生成とプロセス間通信

elixir-flame-i-catch

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Elixirは並行処理能力に優れたRubyライクなシンタックスを持つ関数型言語です。
前回のエントリではElixirの概要やシンタックスの紹介を簡単にしました。
今回はElixirの複数プロセスを使ったプログラミングについて書いていきます。
Elixirにはプロセス周りの実装をサポートしてくれるOTPというライブラリがあるのですが、まずはOTPを使用しないプログラミングを扱います。

軽量プロセス

Elixirのプロセスは軽量プロセスと言われていて、OSのプロセスやスレッドではなくGreenスレッドです。
カーネルではなくVMでスケジューリングされるので軽量、コンテキストスイッチが発生しないという利点があります。
1軽量プロセスで約300ワードです。

プロセス生成

Elixirで新しいプロセスを生成するのは簡単です。
spawn関数を呼び出し、戻り値に生成したプロセスIDを受け取ります。

defmodule Hoge do
  def say(message) do
    IO.puts "Hello #{message}!"
  end
end

child_pid = spawn(Hoge, :say, ["World"])

spawn関数の引数は以下の通りです

引数 意味
第一引数 モジュール名
第二引数 生成したプロセスが実行する関数
第三引数 関数に渡す引数(この例の場合はsay関数に渡す文字列)

プロセス間のメッセージ送受信

次のサンプルコードで説明していきます。

# 別プロセスとして生成される処理
defmodule Hoge do
  def say do
    receive do # メインプロセスからのメッセージを待ち続ける
      {sender, msg} ->
      send sender, {:ok, "Hello #{msg}" }
    end
  end
end

# メインプロセスの処理
child_pid = spawn(Hoge, :say, [])
send child_pid, {self, "World!"}

receive do # 生成したプロセスからのメッセージを待ち続ける
  {:ok, message} ->
    IO.puts message
end

処理の流れは次のとおりです。

1. プロセスを生成し、メッセージを送信する

send関数に送信先のプロセスIDと送信するメッセージを指定しています。
この例の場合、直前に生成したプロセスにメッセージを送信します。

child_pid = spawn(Hoge, :say, [])
send child_pid, {self, "World!"}

self関数は自分のプロセスIDを返却します。send関数は第一引数に送信先のプロセスID、第二引数に送信するメッセージを指定します。
つまりここでは子プロセスに自分のプロセスIDと文字列を送信しています。

2. 子プロセスのメッセージ受信、メッセージ送信

次に子プロセスはreceive関数でメインプロセスからのメッセージを待ち続けます。
メッセージを受信するとメインプロセスに対して{:ok, "hello #{msg}"}のタプルを返信します。

def greet do
  receive do # メインプロセスからのメッセージを待ち続ける
    {sender, msg} ->
      send sender, {:ok, "Hello #{msg}" } # メインプロセスにメッセージを送信する(senderはメインプロセスのプロセスID)
  end
end

3. メインプロセスのメッセージ受信

メインプロセスもreceive関数で子プロセスからのメッセージを待ち続けます。
メッセージを受信すると、標準出力にメッセージを出力します。

receive do # 生成したプロセスからのメッセージを待ち続ける
  {:ok, message} ->
    IO.puts message
end

receive関数

プロセスはメッセージを受信するとプロセス毎にあるmailboxにメッセージを保存します。
そしてreceive関数はブロックで指定したパターンにマッチしたメッセージを探し、マッチしない場合はマッチするメッセージを受信するまで待ち続けます。
以下の場合、{:ok, messagge}の形式のメッセージを受信するまで待ち続けることになります。
送信側

send sender, {:ok, "Hello #{msg}" }

受信側

receive do 
  {:ok, message} ->
    IO.puts message
end

プロセスの監視

子プロセスの死を検知するには、プロセスを生成するときにプロセス同士を関連づける必要があります。 関連づけるにはspawnではなくsapwn_link関数を使います。

defmodule Hoge do
  import :timer, only: [ sleep: 1 ]

  def end_func do
    sleep 500
    exit(:boom)
  end

  def run do
    spawn_link(Hoge, :end_func, []) #spawn_linkでプロセスを生成
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
      after 1000 ->
        IO.puts "Nothing happened"
    end
  end
end

Hoge.run
# => ** (EXIT from #PID<0.56.0>), :boom

spawn_linkで子プロセスを生成すると、子プロセスがexit関数を呼び出して処理を終了したときに、メインプロセスはメッセージを受け取ることができます。

モニタリング

プロセスの終了をモニタリングするにはspawn_monitor関数を使います。この関数はプロセスを生成するとプロセスIDとともにプロセスへの参照を返します。

spawn_monitor(Hoge, :end_func, [])
# => {#PID<0.62.0>,#Reference<0.0.3.91>}

プロセスが終了した場合、:DOWNメッセージとともにプロセスへの参照を受け取ります。

defmodule Hoge do
  import :timer, only: [ sleep: 1 ]

  def end_func do
    sleep 500
    exit(:boom)
  end

  def run do
    spawn_monitor(Hoge, :end_func, []) #spawn_monitorでプロセスを生成
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
      after 1000 ->
        IO.puts "Nothing happened"
    end
  end
end

Hoge.run
# => {:DOWN,#Reference<0.0.3.91>,:process,#PID<0.62.0>,:boom}

複数プロセスでフィボナッチ数の計算

最後に複数プロセス処理の複雑な例を見てみます。 Programming Elixirで紹介されているフィボナッチ数列計算の例です。

概要図

だいたいこんな感じです

fib_processes

概要

このプログラムでは計算処理の指示を出すSchedulerとフィボナッチ数の計算処理を行うWorker(Schedulerから生成される)が登場します。複数のフィボナッチ数の計算をするのですが、1プロセスで逐次的に処理をしていては時間がかかるので複数プロセスを使って同時に実行します。
処理の概要は次のとおりです。

順番 処理内容
1 SchedulerはWorkerのプロセスを複数生成する
2 Workerは:readyのメッセージをSchedulerに送信する
3 Schedulerは:readyのメッセージを受信すると:fibのメッセージをWorkerに送信して計算を指示する
4 Workerは:fibのメッセージを受信するとフィボナッチ数を計算する
5 Workerは計算が終わると、:answerのメッセージをSchedulerに送信する
6 Schedulerは:answerのメッセージを受け取ると、フィボナッチ数の結果を配列に追加する
7 ここまでの処理を与えられたフィボナッチ数計算の対象の数だけ、複数プロセスを使って繰り返す

ではコードを見ていきましょう

# Schedulerから生成されるWorker
defmodule FibSolver do
  def fib(scheduler) do
    send scheduler, { :ready, self }
    receive do
      { :fib, n, client } ->
        send client, { :answer, n, fib_calc(n), self }
        fib(scheduler)
      { :shutdown } ->
        exit(:normal)
    end
  end

  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end

defmodule Scheduler do
  def run(num_processes, module, func, fib_array) do
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self]) end)
    |> schedule_processes(fib_array, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
          send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
                schedule_processes(processes, queue, [ {number, result} | results ])
    end
  end
end

fib_array = [37, 37, 37, 37, 37, 37]

# ここがエントリポイント
Enum.each 1..10, fn num_processes ->
  {time, result} = :timer.tc(Scheduler, :run, [num_processes, FibSolver, :fib, fib_array])
  if num_processes == 1 do
    IO.puts inspect result
    IO.puts "\n #  time (s)"
  end
  :io.format "~2B       ~.2f~n", [num_processes, time/1000000.0]
end

Elixirの特徴が目立つコードです。特に再帰が多く使われています。

Schedulerの処理

run関数

順番 処理内容
1 引数で生成するプロセスの数とプロセス生成するモジュール、関数及び計算するフィボナッチ数の配列を受け取る(この例では10個のプロセスを生成する
2 spawnでプロセスを生成し、schedule_processes関数に生成したプロセスのPIDの配列と計算するフィボナッチ数の配列を渡す
def run(num_processes, module, func, fib_array) do
  (1..num_processes)
  |> Enum.map(fn(_) -> spawn(module, func, [self]) end) # プロセス生成処理を10回繰り返す、戻り値のPIDは次の関数に渡す
  |> schedule_processes(fib_array, []) # パイプでつないでいるので第一引数にPIDの配列が渡される
end

schdule_processes関数

順番 処理内容
1 receive関数でWorkerプロセスからの:readyメッセージを待ち続ける
2 :readyメッセージを受け取るとキュー([37,37,37,37,37,37]のこと)に入っている先頭の数を:fibメッセージでWorkerプロセスに送信する
3 残ったキューを引数に再帰処理に入り、再度receive関数でメッセージを待ち続ける(生成したプロセスの数だけ、:readyメッセージを受ける)
4 Workerプロセスからの:answerメッセージを受け取った場合、計算したフィボナッチ数を結果用の配列に追加し再帰処理に入る
5 :readyメッセージを受け取った時にキューの要素がない場合、Workerに:shutdownメッセージを送信する
6 :shutdownメッセージ送信後、WorkerのPIDをプロセスIDの配列(processes)から削除する
7 :shutdownメッセージ送信後、プロセスIDの配列の要素数が1の場合最後のプロセスからのメッセージなので、結果の配列をソートして処理を終了する

やや複雑ですね。。

Worker(FibSolver)の処理

fib関数

順番 処理内容
1 スケジューラに:readyメッセージを送信しreceive関数でSchedulerからの:fibメッセージを待ち続ける
2 :fibメッセージを受信するとフィボナッチ数の計算をしてSchedulerに:answerメッセージを送信する
3 ここまでの処理を再帰で繰り返す(計算が終わった後、:readyメッセージを再度スケジューラに送信して:fibメッセージもしくは:shutdownメッセージを待ち続ける
4 :shutdownメッセージを受け取ったらexitで終了する

実行結果

実行してみるとこんな感じでフィボナッチ数と各プロセスの処理時間が出力されます

elixir fib.exs
[{37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}]

 #  time (s)
 1       8.62
 2       4.39
 3       4.34
 4       4.25
 5       4.90
 6       4.91
 7       4.69
 8       5.03
 9       4.74
10       4.61

まとめ

Elixirでは非常に簡単にプロセス生成、プロセス間でのメッセージ通信のコードを実装することができます。
今回の例であげると、子プロセスの終了通知をメインプロセスが受け取る処理が分かりやすいと思います。
他の言語であれば、同時に別のプロセス(もしくはスレッド)がメッセージを送信してきた場合などの考慮が必要になります(おそらく、キュー、セマフォロックなどを使ってメッセージ送受信をすることになる)が、Elixirでは不要です。
このあたりはマルチスレッド、マルチプロセスを使ったプログラミングで辛い思いをしてきた方にはかなりありがたいのではないでしょうか?

とは言っても、最後の例のように複雑な処理を一から実装するのはやはり手間がかかりますしコードもどんどん分かりにくくなっていきます。
次回はプロセス周りの実装をサポートするOTPライブラリを使ったプロセス間通信を説明します。

参考情報

Programming Elixir

AWS Cloud Roadshow 2017 福岡