ElixirのOTPでMapReduce処理を実装する
Elixirには複数プロセスを使ってアプリケーションを簡単に実装するためのフレームワークであるOTPが用意されています。
OTPを使うとプロセス間のメッセージ通信やエラー時の再起動処理、プロセスの状態管理が非常に簡単に実装できます。
今回はOTPを使ってMapReduceのプログラムを実装してみます。
MapReduce
Apache Lucene
MapReduceの仕組みが利用されているソフトウェアをご紹介します。
LuceneはJavaで実装されている全文検索エンジンで、Elasticsearchの内部でも使われています。
このLuceneですが、MapReduceの仕組みを使って単語の出現場所のインデックスを生成しています(Luceneの作者は最初、Lispで実装を試したそうです)。
今回はこのMapReduceで文書のインデックスを生成する処理をElixirで実装してみます。
(簡単のため、文書は形態素解析済みのものを使います)
map関数
map関数は文字列やリストと関数を引数にして、各要素に対して引数で渡した関数を適用します。
map関数の簡単な例
Enum.map([1,2,3,4,5], fn(elem) -> elem * elem end) #=> [1, 4, 9, 16, 25]
今回の例では、複数の文書に対して、文書の単語を「ドキュメント番号、単語、単語のインデックス」に変換する関数をmap関数を使って適応します。
このようなデータ構造で形態素解析済みの文書があるとして
{:doc1, "ワタシ ハ エリクサー チョット デキル"}
(doc1というのは文書IDとして、それぞれの文書にユニークに割り当てるものとします。)
次のように「ドキュメント番号、単語、単語のインデックス」の形式に変換します
[{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]
reduce関数
reduceは引数で複数の要素と関数を渡し、引数で渡した要素に対して関数を適応させるのですが、最初の二つの要素に対して適応させ、その結果を3番目の要素と適応させ、さらに4番目、5番目と次々に適応させ最後に返される結果はそれまでの計算処理で縮約されたものになります。
reduce関数の簡単な例
Enum.reduce([1,2,3,4,5], 0, fn(elem, acc) -> elem + acc end) # 第2引数の0は初期値 #=> 15
今回の例では、先ほどのmap関数を適応させた文書のリストに対して、リストを統合するためにreduce関数を使います。
map関数で変換した複数のドキュメントを
[ [{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}], [{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2}, {:doc2, "チョット", 3}, {:doc2, "デキル", 4}] ]
次のような転置行列に変換します
%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4], "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}
ここまで出来れば文書のインデックス化ができているので、検索ワードで検索すればその単語が含まれるドキュメント名(文書ID)と出現位置を取得することができます。
OTPを使ってMapReduce
シングルプロセスの実装
まずはOTPを使わずにシングルプロセスで実装してみます。
defmodule MapReduce do # アプリケーションのエントリポイント def entry do doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"} doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"} [doc1, doc2] |> Enum.map(fn(doc) -> MapReduce.word_map(doc) end) # map関数で複数のドキュメントにword_map関数を適応 |> Enum.reduce(%{}, fn(items, acc) -> MapReduce.invert_array(items, acc) end) # reduce関数で複数のドキュメントを一つに縮約 end # map関数で引数に渡される関数。各単語を{:doc1, "ワタシ", 0}の形式に変換します def word_map({doc_id, words}) do String.split(words) |> Enum.with_index |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end) end # reduce関数で引数に渡される関数。上記word_map関数で変換した文書を縮約します。 # %{"ワタシ" => [doc1: 0, doc2: 0], ....}の形式のマップを作ります def invert_array([{doc_id, word, index} | tail], shuffle_map) do invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])])) end def invert_array([], shuffle_map), do: shuffle_map end IO.inspect MapReduce.entry
出力結果
%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4], "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}
エラーが起きた場合にどうするか?
今回のサンプルでは対象のドキュメントが少ないので、エラーでアプリケーションが落ちてももう一度起動すればいいのですが、対象ドキュメントが多くサイズも大きい場合はそれでは辛いですね。
map関数で複数のドキュメントを「ドキュメント番号、単語、単語のインデックス」に変換している箇所に状態を保たせて、一つのドキュメントの変換処理が失敗してもそのプロセスだけ再起動し、継続して処理できるようにしたいものです。
そのためにはドキュメントの変換処理が終わったらそれを保持する仕組み(状態を持つ)とプロセスを監視してエラーが起きた場合に再起動させる仕組みが必要です。
これらの仕組みを実装するためにOTPのGenServerとSupervisorを使います。
非常に簡単な説明になりますが、それぞれの概要は以下になります。
名称 | 概要 |
---|---|
GenServer | 状態を持ったプロセスを使うアプリケーションを実装するための機能が用意されているモジュール |
Supervisor | プロセスを監視し、エラーが発生した場合に再起動するなどの機能を提供してくれる仕組み |
詳細はElixr公式ページのGenServerの説明とSupervisorの説明を参照してください。
Supervision Tree
それではSupervisorの仕組みを使ってMapReduceの各処理とそれを監視するプロセス、状態を保持するプロセスで親子関係を作っていきます。
名称 | 役割 |
---|---|
Supervisor | 全てのプロセスの親になるプロセス。MapSupervisor, ReduceSupervisorにデータをキャッシュするStashのプロセスIDを渡す |
MapSupervisor | MapServerを監視するプロセス。MapServerの再起動戦略を設定する。 |
MapServer | ドキュメントを「ドキュメント番号、単語、単語のインデックス」に変換する処理を担当するプロセス。処理が完了するとStashに変換したドキュメントを保存する(そのためにstashのプロセスIDを持っている)。このプロセスは複数プロセス起動する。 |
ReduceSupervisor | ReduceServerを監視するプロセスを監視するプロセス。ReduceServerの再起動戦略を設定する。 |
ReduceServer | map関数で変換した複数のドキュメントをStashから取得し、転置行列に変換する処理を担当するプロセス(Stashからデータを取得するためにStashのプロセスIDを持っている)。 |
Supervisor
MapSupervisor, ReduceSupervisorに先に起動したStashのプロセスIDを渡します。
defmodule MapReduce.Supervisor do use Supervisor def start_link do result = {:ok, sup } = Supervisor.start_link(__MODULE__, []) {:ok, stash_pid} = Supervisor.start_child(sup, worker(MapReduce.Stash, [])) Supervisor.start_child(sup, supervisor(MapReduce.MapSupervisor, [stash_pid])) Supervisor.start_child(sup, supervisor(MapReduce.ReduceServer, [stash_pid])) result end def init(_) do supervise [], strategy: :one_for_one end end
MapSupervisor, ReduceSupervisor
Supervisorから渡されたStashのプロセスIDをそれぞれ起動する子プロセスに渡します。
MapSupervisor
defmodule MapReduce.MapSupervisor do use Supervisor def start_link(stash_pid) do {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid) end def init(stash_pid) do child_processes = [ worker(MapReduce.MapServer, [stash_pid]) ] supervise child_processes, strategy: :one_for_one end end
ReduceSupervisor
defmodule Mr.ReduceSupervisor do use Supervisor def start_link(stash_pid) do {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid) end def init(stash_pid) do child_processes = [ worker(MapReduce.ReduceServer, [stash_pid]) ] supervise child_processes, strategy: :one_for_one end end
Stash
StashはMapServerで変換したドキュメントを保持するための状態を持つのでGenServerを使います。
他のプロセスから呼ばれるsave_document、get_documentという関数を定義していますが、それに対してcallback関数を自分で実装する必要があります。
このcallback関数を実装するだけでプロセス間通信ができるようになります。
このプロセスでは起動時に空のリストを作り、save_document関数が他プロセスから呼ばれるとこのリストにデータを追加していきます。
defmodule MapReduce.Stash do use GenServer def start_link do GenServer.start_link(__MODULE__, []) # 起動時に空のリストを作る。ここにドキュメントを追加していく end def save_document(pid, indexing_doc) do GenServer.cast pid, {:save_document, indexing_doc} end def get_documents(pid) do GenServer.call pid, :get_documents end ##### # GenServer implementation def handle_cast({:save_document, indexing_doc}, docs) do {:noreply, [indexing_doc|docs]} # このdocsは起動時に定義したリスト。ここで要素を追加していく。 end def handle_call(:get_documents, _from, docs) do # このdocsも起動時に定義したリスト。このタプルの2番目の要素は呼び出し元へ返却するメッセージ # 3番目の要素はこのプロセスが持ち続けるリスト(状態) {:reply, docs, docs} end end
MapServer
このプロセスも起動時にStashのプロセスIDを渡され保持し続けるのでGenServerを使っています。
実際にドキュメントを変換する関数(word_map)の実装は別ファイルに定義しています。
変換するとドキュメントをStashに保存するため、Stash.save_documentを呼び出します。
defmodule MapReduce.MapServer do use GenServer def start_link(stash_pid) do GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__) end def make_indexing_document(words_list) do GenServer.cast(__MODULE__, {:make_indexing_document, words_list}) end # ここが処理の始まり def make_indexing_documents(words_list) when is_list(words_list) do words_list |> Enum.map(&(Task.async(fn -> make_indexing_document(&1) end))) |> Enum.map(&Task.await/1) end def get_documents do GenServer.call(__MODULE__, :get_documents) end def handle_cast({:make_indexing_document, words}, stash_pid) do word_map(stash_pid, words) {:noreply, stash_pid} end def handle_call(:get_documents, _from, stash_pid) do {:reply, MapReduce.Stash.get_documents(stash_pid), stash_pid} end # ここでドキュメントを変換して、Stashプロセスに保存する defp word_map(stash_pid, {doc_id, words}) do indexing_doc = MapReduce.WorkerFunction.word_map({doc_id, words}) :ok = MapReduce.Stash.save_document(stash_pid, indexing_doc) end end
ReduceServer
MapServerで変換したドキュメントをStashから受け取るためStashのプロセスIDをこのプロセスも持ち続けます。
転置行列にまとめる関数は別ファイルに定義してします。
関数の処理を始める前にStash.get_documents(stash_pid)を呼び出し、対象のドキュメントを取得しています。
defmodule MapReduce.ReduceServer do use GenServer def start_link(stash_pid) do GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__) end # ここが処理の始まり def make_inverted_array do GenServer.call(__MODULE__, :make_inverted_array) end # ここでmapプロセスで変換したドキュメントを転置行列にまとめる # 処理の初めにStashプロセスからドキュメントのリストを取得する def handle_call(:make_inverted_array, _from, stash_pid) do indexing_docs = MapReduce.Stash.get_documents(stash_pid) {:reply, invert_array(indexing_docs), stash_pid } end def invert_array(indexing_docs) do MapReduce.WorkerFunction.invert_array(indexing_docs) end end
word_map, invert_arrayの実装
実装は別ファイルに分けています。シングルプロセスの例と特に変わりはありません。
defmodule MapReduce.WorkerFunction do def word_map({doc_id, words}) do String.split(words) |> Enum.with_index |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end) end def invert_array(items) do items |> Enum.reduce(%{}, fn(item, acc) -> invert_array(item, acc) end) end defp invert_array([{doc_id, word, index} | tail], shuffle_map) do invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])])) end defp invert_array([], shuffle_map), do: shuffle_map end
実行結果
シングルプロセスの例と変わらないのであまり意味がありませんが実行してみます。
まずはプロジェクトのディレクトリでREPLを起動して、データを用意します。
iex -S mix Erlang/OTP 18 [erts-7.1] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace] Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help) iex(1)> doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"} {:doc1, "ワタシ ハ エリクサー チョット デキル"} iex(2)> doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"} {:doc2, "ワタシ ハ ルビー チョット デキル"} iex(3)> doc3 = {:doc3, "ワタシ ハ リナックス チョット デキル"} {:doc3, "ワタシ ハ リナックス チョット デキル"}
次にMap処理を実施します。追加後のデータはリストに追加されています。
iex(4)> MapReduce.make_indexing_documents [doc1, doc2, doc3] iex(5)> MapReduce.MapServer.get_documents [[{:doc3, "ワタシ", 0}, {:doc3, "ハ", 1}, {:doc3, "リナックス", 2}, {:doc3, "チョット", 3}, {:doc3, "デキル", 4}], [{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2}, {:doc2, "チョット", 3}, {:doc2, "デキル", 4}], [{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]]
最後にReduce処理で転置行列にまとめます。
iex(6)> MapReduce.ReduceServer.make_inverted_array %{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3, doc3: 3], "デキル" => [doc1: 4, doc2: 4, doc3: 4], "ハ" => [doc1: 1, doc2: 1, doc3: 1], "リナックス" => [doc3: 2], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0, doc3: 0]}
まとめ
OTPを使うと非常に簡単にマルチプロセス(マルチスレッド)のアプリケーションが実装できました。
これを他の言語でスレッドやプロセスを使って再起動戦略まで含めて実装するとなるとかなり大変なので、そこまで想像できた時にOTPのありがたみを自分はすごく感じました。
公式ページにもOTPを使ったkey value storeのサンプルがあるので興味がある方は是非試してみてください。
Elixir Supervisor and Application
ElixirのWebフレームワークであるPhoenixもこのOTPを利用してアプリケーションを作っていくようなので、次はPhoenixを使ってみたいと思います。