この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Supervisorによる耐障害性
Akkaでのエラーや障害に対する対処は「let it crash」というアプローチをとっています。 この考え方は障害は発生しうるものという前提に基づき、障害が発生したらプロセスを適切に再スタートさせるという考え方です。 そのためにAkkaではSupervisorとよばれるものがActorを管理し、監視対象のActorに障害が発生した際、 プロセスを再スタートさせ、プロセスを生かし続けます。こうしてAkkaは耐障害性を確保しています。
Supervisorが行う再スタート方法
AkkaのSupervisorは二種類の再スタート方法を持っています。 1.One-For-One 管理対象のうちいずれかのプロセスがクラッシュしたら、その対象のコンポーネントのみを再スタートさせます。 2.All-For-One Supervisorも含む、管理対象のうちいずれかのプロセスがクラッシュしたら、管理しているすべてのコンポーネントを再スタートさせます。
とりあえずサンプルを作成してためしてみましょう。
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.7.2
- Java : 1.6.0_26
- Scala : 2.9.1 final
- SBT : 0.11.2
実行環境のセットアップ
#2の記事で作成したAkkaサンプルを使用します。その記事を参考にプロジェクトを作成しておきましょう。
サンプル実行
AllForOne(1つがエラーになったら全部再スタートする)を試してみましょう。 MyAkka.scalaの中身を下記のようにしてください。
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.Supervisor
import akka.config.Supervision._
/**
* FirstErrorActor用のエラーフラグ
*/
object Flag {
var isError = true
}
/**
* 1回目はエラーになるアクター
*/
class FirstErrorActor extends Actor {
self.start()
def receive = {
case msg => {
println(msg + " by FirstErrorActor")
if(Flag.isError) throw new Exception("FirstErrorActor Error")
println("FirstErrorActor receive done.")
}
}
override def preRestart(reason: Throwable) = {
println("FirstErrorActor pre Restart.")
Flag.isError = false
}
override def postRestart(reason: Throwable) = {
println("FirstErrorActor post Restart.")
}
}
/**
* 通常のアクター
*/
class NomalActor extends Actor {
self.start()
def receive = {
case msg => {
println(msg + " by NomalActor")
}
}
override def preRestart(reason: Throwable) = {
println("NomalActor pre Restart.")
}
override def postRestart(reason: Throwable) = {
println("NomalActor post Restart.")
}
}
object Main extends App {
val myAkkaActor1 = actorOf(new FirstErrorActor())
val myAkkaActor2 = actorOf(new NomalActor())
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(myAkkaActor1 , Permanent) :: Supervise(myAkkaActor2 , Permanent) :: Nil
)
).start
println("-----NomalActor receive-----")
myAkkaActor2 ! "success!"
Thread.sleep(5000)
println("-----FirstErrorActor receive(Error)-----")
myAkkaActor1 ! "error!"
Thread.sleep(5000)
println("-----FirstErrorActor receive(Success)-----")
myAkkaActor1 ! "success!"
}
まずはアクターの説明です。必要なパッケージをimportしたら、2つのアクターを定義します。 FirstErrorActorは1度目の実行では例外を投げるアクターです。 NomalActorはそのまま問題なく実行可能なアクターです。
/**
* 1回目はエラーになるアクター
*/
class FirstErrorActor extends Actor {
self.start()
def receive = {
case msg => {
println(msg + " by FirstErrorActor")
if(Flag.isError) throw new Exception("FirstErrorActor Error")
println("FirstErrorActor receive done.")
}
}
override def preRestart(reason: Throwable) = {
println("FirstErrorActor pre Restart.")
Flag.isError = false
}
override def postRestart(reason: Throwable) = {
println("FirstErrorActor post Restart.")
}
}
/**
* 通常のアクター
*/
class NomalActor extends Actor {
self.start()
def receive = {
case msg => {
println(msg + " by NomalActor")
}
}
override def preRestart(reason: Throwable) = {
println("NomalActor pre Restart.")
}
override def postRestart(reason: Throwable) = {
println("NomalActor post Restart.")
}
}
それぞれのアクターがオーバーライドしているpreRestartとpostRestartは、Supervisorによって再スタートする前/後に実行されます。
Mainクラスでは2つのアクターを作成したあと、Supervisorに登録しています。
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]),//監視対象の例外
3, //再スタートリトライ回数
1000),//指定時間
Supervise(myAkkaActor1 , Permanent)
:: Supervise(myAkkaActor2 , Permanent) :: Nil)
).start
SupervisorConfigでは第1引数にAllForOneStrategy(1つがエラーになったら全部再起動)を指定します。 AllForOneStrategyの第4引数以降はSuperviseを指定します。 Superviseの第1引数にアクターのインスタンスを指定し、第2引数にPermanent(常に再起動)を指定します。 これをListでつなげてSupervisorに渡せば設定が完了します。
設定が完了したら、NomalActor,FirstErrorActor,FirstErrorActorの順番にメッセージを送ります。
println("-----NomalActor receive-----")
myAkkaActor2 ! "success!"
Thread.sleep(5000)
println("-----FirstErrorActor receive(Error)-----")
myAkkaActor1 ! "error!"
Thread.sleep(5000)
println("-----FirstErrorActor receive(Success)-----")
myAkkaActor1 ! "success!"
実行結果下記のようになるはずです。
> run
[info] Running Main
-----NomalActor receive-----
success! by NomalActor
-----FirstErrorActor receive(Error)-----
error! by FirstErrorActor
NomalActor pre Restart.
NomalActor post Restart.
[ERROR] [12/01/31 17:59] [akka:event-driven:dispatcher:global-2] [LocalActorRef] FirstErrorActor Error
java.lang.Exception: FirstErrorActor Error
at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:24)
at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:21)
at akka.actor.Actor$class.apply(Actor.scala:545)
at FirstErrorActor.apply(MyAkka.scala:16)
・・・・・・
・・・・・・
FirstErrorActor pre Restart.
FirstErrorActor post Restart.
-----FirstErrorActor receive(Success)-----
success! by FirstErrorActor
FirstErrorActor receive done.
FirstErrorActorの1回目の実行でエラーが起こった後、NomalActorの再スタートが行われているのがわかります。 その後FirstErrorActorも再スタート処理が行われ、2回目は問題なく処理が終了しています。
まとめ
今回はAkkaのFault Tolerance機能に少しだけふれて見ました。 簡単なサンプルでしたが、この機能を使えばエラー発生時のリカバリ処理が比較的容易に記述できますね。
参考サイトなど
- Scalaで並行処理#1 - Actorを使う:https://dev.classmethod.jp/server-side/scala-actor/
- Scalaで並行処理#2 - AkkaのActorを使う: https://dev.classmethod.jp/etc/scala-akka/
- Scalaで並行処理#3 - Akkaのconfigファイル:https://dev.classmethod.jp/server-side/actor3-config/