ElixirのOTPでMapReduce処理を実装する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Elixirには複数プロセスを使ってアプリケーションを簡単に実装するためのフレームワークであるOTPが用意されています。
OTPを使うとプロセス間のメッセージ通信やエラー時の再起動処理、プロセスの状態管理が非常に簡単に実装できます。
今回はOTPを使ってMapReduceのプログラムを実装してみます。

MapReduce

Apache Lucene

MapReduceの仕組みが利用されているソフトウェアをご紹介します。
LuceneはJavaで実装されている全文検索エンジンで、Elasticsearchの内部でも使われています。
このLuceneですが、MapReduceの仕組みを使って単語の出現場所のインデックスを生成しています(Luceneの作者は最初、Lispで実装を試したそうです)。
今回はこのMapReduceで文書のインデックスを生成する処理をElixirで実装してみます。
(簡単のため、文書は形態素解析済みのものを使います)

map関数

map関数は文字列やリストと関数を引数にして、各要素に対して引数で渡した関数を適用します。
map関数の簡単な例

Enum.map([1,2,3,4,5], fn(elem) -> elem * elem end)
#=> [1, 4, 9, 16, 25]

今回の例では、複数の文書に対して、文書の単語を「ドキュメント番号、単語、単語のインデックス」に変換する関数をmap関数を使って適応します。
このようなデータ構造で形態素解析済みの文書があるとして

{:doc1, "ワタシ ハ エリクサー チョット デキル"}

(doc1というのは文書IDとして、それぞれの文書にユニークに割り当てるものとします。)

次のように「ドキュメント番号、単語、単語のインデックス」の形式に変換します

[{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]

reduce関数

reduceは引数で複数の要素と関数を渡し、引数で渡した要素に対して関数を適応させるのですが、最初の二つの要素に対して適応させ、その結果を3番目の要素と適応させ、さらに4番目、5番目と次々に適応させ最後に返される結果はそれまでの計算処理で縮約されたものになります。
reduce関数の簡単な例

Enum.reduce([1,2,3,4,5], 0, fn(elem, acc) -> elem + acc end) # 第2引数の0は初期値
#=> 15

今回の例では、先ほどのmap関数を適応させた文書のリストに対して、リストを統合するためにreduce関数を使います。
map関数で変換した複数のドキュメントを

[
[{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}],
[{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2}, {:doc2, "チョット", 3}, {:doc2, "デキル", 4}]
]

次のような転置行列に変換します

%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4],
  "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}

ここまで出来れば文書のインデックス化ができているので、検索ワードで検索すればその単語が含まれるドキュメント名(文書ID)と出現位置を取得することができます。

OTPを使ってMapReduce

シングルプロセスの実装

まずはOTPを使わずにシングルプロセスで実装してみます。

defmodule MapReduce do
  # アプリケーションのエントリポイント  
  def entry do
    doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"}
    doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"}

    [doc1, doc2]
    |> Enum.map(fn(doc) -> MapReduce.word_map(doc) end) # map関数で複数のドキュメントにword_map関数を適応
    |> Enum.reduce(%{}, fn(items, acc) -> MapReduce.invert_array(items, acc) end) # reduce関数で複数のドキュメントを一つに縮約
  end

  # map関数で引数に渡される関数。各単語を{:doc1, "ワタシ", 0}の形式に変換します
  def word_map({doc_id, words}) do
    String.split(words)
    |> Enum.with_index
    |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end)
  end
  
  # reduce関数で引数に渡される関数。上記word_map関数で変換した文書を縮約します。
  # %{"ワタシ" => [doc1: 0, doc2: 0], ....}の形式のマップを作ります
  def invert_array([{doc_id, word, index} | tail], shuffle_map) do
    invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])]))
  end
  def invert_array([], shuffle_map), do: shuffle_map  
end

IO.inspect MapReduce.entry

出力結果

%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4],
  "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}

エラーが起きた場合にどうするか?

今回のサンプルでは対象のドキュメントが少ないので、エラーでアプリケーションが落ちてももう一度起動すればいいのですが、対象ドキュメントが多くサイズも大きい場合はそれでは辛いですね。
map関数で複数のドキュメントを「ドキュメント番号、単語、単語のインデックス」に変換している箇所に状態を保たせて、一つのドキュメントの変換処理が失敗してもそのプロセスだけ再起動し、継続して処理できるようにしたいものです。
そのためにはドキュメントの変換処理が終わったらそれを保持する仕組み(状態を持つ)とプロセスを監視してエラーが起きた場合に再起動させる仕組みが必要です。
これらの仕組みを実装するためにOTPのGenServerとSupervisorを使います。
非常に簡単な説明になりますが、それぞれの概要は以下になります。

名称 概要
GenServer 状態を持ったプロセスを使うアプリケーションを実装するための機能が用意されているモジュール
Supervisor プロセスを監視し、エラーが発生した場合に再起動するなどの機能を提供してくれる仕組み

詳細はElixr公式ページのGenServerの説明Supervisorの説明を参照してください。

Supervision Tree

それではSupervisorの仕組みを使ってMapReduceの各処理とそれを監視するプロセス、状態を保持するプロセスで親子関係を作っていきます。

MapReduce-Supervision Tree

名称 役割
Supervisor 全てのプロセスの親になるプロセス。MapSupervisor, ReduceSupervisorにデータをキャッシュするStashのプロセスIDを渡す
MapSupervisor MapServerを監視するプロセス。MapServerの再起動戦略を設定する。
MapServer ドキュメントを「ドキュメント番号、単語、単語のインデックス」に変換する処理を担当するプロセス。処理が完了するとStashに変換したドキュメントを保存する(そのためにstashのプロセスIDを持っている)。このプロセスは複数プロセス起動する。
ReduceSupervisor ReduceServerを監視するプロセスを監視するプロセス。ReduceServerの再起動戦略を設定する。
ReduceServer map関数で変換した複数のドキュメントをStashから取得し、転置行列に変換する処理を担当するプロセス(Stashからデータを取得するためにStashのプロセスIDを持っている)。

Supervisor

MapSupervisor, ReduceSupervisorに先に起動したStashのプロセスIDを渡します。

defmodule MapReduce.Supervisor do
  use Supervisor

  def start_link do
    result = {:ok, sup } = Supervisor.start_link(__MODULE__, [])
    {:ok, stash_pid} =
      Supervisor.start_child(sup, worker(MapReduce.Stash, []))
    Supervisor.start_child(sup, supervisor(MapReduce.MapSupervisor, [stash_pid]))
    Supervisor.start_child(sup, supervisor(MapReduce.ReduceServer, [stash_pid]))
    result
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end
end

MapSupervisor, ReduceSupervisor

Supervisorから渡されたStashのプロセスIDをそれぞれ起動する子プロセスに渡します。

MapSupervisor

defmodule MapReduce.MapSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(MapReduce.MapServer, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end
end

ReduceSupervisor

defmodule Mr.ReduceSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(MapReduce.ReduceServer, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end
end

Stash

StashはMapServerで変換したドキュメントを保持するための状態を持つのでGenServerを使います。
他のプロセスから呼ばれるsave_document、get_documentという関数を定義していますが、それに対してcallback関数を自分で実装する必要があります。
このcallback関数を実装するだけでプロセス間通信ができるようになります。
このプロセスでは起動時に空のリストを作り、save_document関数が他プロセスから呼ばれるとこのリストにデータを追加していきます。

defmodule MapReduce.Stash do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, []) # 起動時に空のリストを作る。ここにドキュメントを追加していく
  end

  def save_document(pid, indexing_doc) do
    GenServer.cast pid, {:save_document, indexing_doc}
  end

  def get_documents(pid) do
    GenServer.call pid, :get_documents
  end

  #####
  # GenServer implementation
  
  def handle_cast({:save_document, indexing_doc}, docs) do
    {:noreply, [indexing_doc|docs]} # このdocsは起動時に定義したリスト。ここで要素を追加していく。
  end

  def handle_call(:get_documents, _from, docs) do
    # このdocsも起動時に定義したリスト。このタプルの2番目の要素は呼び出し元へ返却するメッセージ
    # 3番目の要素はこのプロセスが持ち続けるリスト(状態)
    {:reply, docs, docs}
  end
end

MapServer

このプロセスも起動時にStashのプロセスIDを渡され保持し続けるのでGenServerを使っています。
実際にドキュメントを変換する関数(word_map)の実装は別ファイルに定義しています。
変換するとドキュメントをStashに保存するため、Stash.save_documentを呼び出します。

defmodule MapReduce.MapServer do
  use GenServer

  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  def make_indexing_document(words_list) do
    GenServer.cast(__MODULE__, {:make_indexing_document, words_list})
  end

    # ここが処理の始まり
  def make_indexing_documents(words_list) when is_list(words_list) do
    words_list
    |> Enum.map(&(Task.async(fn -> make_indexing_document(&1) end)))
    |> Enum.map(&Task.await/1)
  end

  def get_documents do
    GenServer.call(__MODULE__, :get_documents)
  end

  def handle_cast({:make_indexing_document, words}, stash_pid) do
    word_map(stash_pid, words)
    {:noreply, stash_pid}
  end

  def handle_call(:get_documents, _from, stash_pid) do
    {:reply, MapReduce.Stash.get_documents(stash_pid), stash_pid}
  end

  # ここでドキュメントを変換して、Stashプロセスに保存する
  defp word_map(stash_pid, {doc_id, words}) do
    indexing_doc = MapReduce.WorkerFunction.word_map({doc_id, words})
    :ok = MapReduce.Stash.save_document(stash_pid, indexing_doc)
  end
end

ReduceServer

MapServerで変換したドキュメントをStashから受け取るためStashのプロセスIDをこのプロセスも持ち続けます。
転置行列にまとめる関数は別ファイルに定義してします。
関数の処理を始める前にStash.get_documents(stash_pid)を呼び出し、対象のドキュメントを取得しています。

defmodule MapReduce.ReduceServer do
  use GenServer

  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  # ここが処理の始まり
  def make_inverted_array do
    GenServer.call(__MODULE__, :make_inverted_array)
  end

  # ここでmapプロセスで変換したドキュメントを転置行列にまとめる
  # 処理の初めにStashプロセスからドキュメントのリストを取得する
  def handle_call(:make_inverted_array, _from, stash_pid) do
    indexing_docs = MapReduce.Stash.get_documents(stash_pid)
    {:reply, invert_array(indexing_docs), stash_pid }
  end

  def invert_array(indexing_docs) do
    MapReduce.WorkerFunction.invert_array(indexing_docs)
  end
end

word_map, invert_arrayの実装

実装は別ファイルに分けています。シングルプロセスの例と特に変わりはありません。

defmodule MapReduce.WorkerFunction do
  def word_map({doc_id, words}) do
    String.split(words)
    |> Enum.with_index
    |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end)
  end

  def invert_array(items) do
    items
    |> Enum.reduce(%{}, fn(item, acc) -> invert_array(item, acc) end)
  end

  defp invert_array([{doc_id, word, index} | tail], shuffle_map) do
    invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])]))
  end

  defp invert_array([], shuffle_map), do: shuffle_map
end

実行結果

シングルプロセスの例と変わらないのであまり意味がありませんが実行してみます。

まずはプロジェクトのディレクトリでREPLを起動して、データを用意します。

iex -S mix
Erlang/OTP 18 [erts-7.1]  [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"}
{:doc1, "ワタシ ハ エリクサー チョット デキル"}
iex(2)> doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"}
{:doc2, "ワタシ ハ ルビー チョット デキル"}
iex(3)> doc3 = {:doc3, "ワタシ ハ リナックス チョット デキル"}
{:doc3, "ワタシ ハ リナックス チョット デキル"}

次にMap処理を実施します。追加後のデータはリストに追加されています。

iex(4)> MapReduce.make_indexing_documents [doc1, doc2, doc3]
iex(5)> MapReduce.MapServer.get_documents
[[{:doc3, "ワタシ", 0}, {:doc3, "ハ", 1}, {:doc3, "リナックス", 2},
  {:doc3, "チョット", 3}, {:doc3, "デキル", 4}],
 [{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2},
  {:doc2, "チョット", 3}, {:doc2, "デキル", 4}],
 [{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2},
  {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]]

最後にReduce処理で転置行列にまとめます。

iex(6)> MapReduce.ReduceServer.make_inverted_array
%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3, doc3: 3],
  "デキル" => [doc1: 4, doc2: 4, doc3: 4],
  "ハ" => [doc1: 1, doc2: 1, doc3: 1], "リナックス" => [doc3: 2],
  "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0, doc3: 0]}

まとめ

OTPを使うと非常に簡単にマルチプロセス(マルチスレッド)のアプリケーションが実装できました。
これを他の言語でスレッドやプロセスを使って再起動戦略まで含めて実装するとなるとかなり大変なので、そこまで想像できた時にOTPのありがたみを自分はすごく感じました。
公式ページにもOTPを使ったkey value storeのサンプルがあるので興味がある方は是非試してみてください。
Elixir Supervisor and Application

ElixirのWebフレームワークであるPhoenixもこのOTPを利用してアプリケーションを作っていくようなので、次はPhoenixを使ってみたいと思います。