
Scalaで並行処理#4 – AkkaのFault Tolerance(耐障害性)
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Supervisorによる耐障害性
Akkaでのエラーや障害に対する対処は「let it crash」というアプローチをとっています。
この考え方は障害は発生しうるものという前提に基づき、障害が発生したらプロセスを適切に再スタートさせるという考え方です。
そのためにAkkaではSupervisorとよばれるものがActorを管理し、監視対象のActorに障害が発生した際、
プロセスを再スタートさせ、プロセスを生かし続けます。こうしてAkkaは耐障害性を確保しています。
Supervisorが行う再スタート方法
AkkaのSupervisorは二種類の再スタート方法を持っています。
1.One-For-One
管理対象のうちいずれかのプロセスがクラッシュしたら、その対象のコンポーネントのみを再スタートさせます。
2.All-For-One
Supervisorも含む、管理対象のうちいずれかのプロセスがクラッシュしたら、管理しているすべてのコンポーネントを再スタートさせます。
とりあえずサンプルを作成してためしてみましょう。
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.7.2
- Java : 1.6.0_26
- Scala : 2.9.1 final
- SBT : 0.11.2
実行環境のセットアップ
#2の記事で作成したAkkaサンプルを使用します。その記事を参考にプロジェクトを作成しておきましょう。
サンプル実行
AllForOne(1つがエラーになったら全部再スタートする)を試してみましょう。
MyAkka.scalaの中身を下記のようにしてください。
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.Supervisor
import akka.config.Supervision._
/**
 * FirstErrorActor用のエラーフラグ
 */
object Flag {
 var isError = true
}
/**
 * 1回目はエラーになるアクター
 */
class FirstErrorActor extends Actor {
  
  self.start()
  def receive = { 
    case msg => { 
      println(msg + " by FirstErrorActor")
      if(Flag.isError) throw new Exception("FirstErrorActor Error")
      println("FirstErrorActor receive done.")
    }
  }
  override def preRestart(reason: Throwable) = {
	  println("FirstErrorActor pre Restart.")
	  Flag.isError = false
  }
  override def postRestart(reason: Throwable) = {
	  println("FirstErrorActor post Restart.")
  }
}
/**
 * 通常のアクター
 */
class NomalActor extends Actor {
  self.start()
  def receive = { 
    case msg => { 
      println(msg + " by NomalActor")
    }
  }
  override def preRestart(reason: Throwable) = {
	  println("NomalActor pre Restart.")
  }
  override def postRestart(reason: Throwable) = {
	  println("NomalActor post Restart.")
  }
}
object Main extends App {
  val myAkkaActor1 = actorOf(new FirstErrorActor())
  val myAkkaActor2 = actorOf(new NomalActor())
  val supervisor = Supervisor(
	SupervisorConfig(
      AllForOneStrategy(List(classOf[Exception]), 3, 1000),
      Supervise(myAkkaActor1 , Permanent) :: Supervise(myAkkaActor2 , Permanent) :: Nil
	)
  ).start
  println("-----NomalActor receive-----")
  myAkkaActor2 ! "success!" 
  Thread.sleep(5000)
  println("-----FirstErrorActor receive(Error)-----")
  myAkkaActor1 ! "error!" 
  Thread.sleep(5000)
  println("-----FirstErrorActor receive(Success)-----")
  myAkkaActor1 ! "success!" 
}
まずはアクターの説明です。必要なパッケージをimportしたら、2つのアクターを定義します。
FirstErrorActorは1度目の実行では例外を投げるアクターです。
NomalActorはそのまま問題なく実行可能なアクターです。
/**
 * 1回目はエラーになるアクター
 */
class FirstErrorActor extends Actor {
  
  self.start()
  def receive = { 
    case msg => { 
      println(msg + " by FirstErrorActor")
      if(Flag.isError) throw new Exception("FirstErrorActor Error")
      println("FirstErrorActor receive done.")
    }
  }
  override def preRestart(reason: Throwable) = {
	  println("FirstErrorActor pre Restart.")
	  Flag.isError = false
  }
  override def postRestart(reason: Throwable) = {
	  println("FirstErrorActor post Restart.")
  }
}
/**
 * 通常のアクター
 */
class NomalActor extends Actor {
  self.start()
  def receive = { 
    case msg => { 
      println(msg + " by NomalActor")
    }
  }
  override def preRestart(reason: Throwable) = {
	  println("NomalActor pre Restart.")
  }
  override def postRestart(reason: Throwable) = {
	  println("NomalActor post Restart.")
  }
}
それぞれのアクターがオーバーライドしているpreRestartとpostRestartは、Supervisorによって再スタートする前/後に実行されます。
Mainクラスでは2つのアクターを作成したあと、Supervisorに登録しています。
val supervisor = Supervisor(
      SupervisorConfig(
        AllForOneStrategy(List(classOf[Exception]),//監視対象の例外
        3, //再スタートリトライ回数
        1000),//指定時間
        Supervise(myAkkaActor1 , Permanent) 
        :: Supervise(myAkkaActor2 , Permanent) :: Nil)
  ).start
SupervisorConfigでは第1引数にAllForOneStrategy(1つがエラーになったら全部再起動)を指定します。
AllForOneStrategyの第4引数以降はSuperviseを指定します。
Superviseの第1引数にアクターのインスタンスを指定し、第2引数にPermanent(常に再起動)を指定します。
これをListでつなげてSupervisorに渡せば設定が完了します。
設定が完了したら、NomalActor,FirstErrorActor,FirstErrorActorの順番にメッセージを送ります。
  println("-----NomalActor receive-----")
  myAkkaActor2 ! "success!" 
  Thread.sleep(5000)
  println("-----FirstErrorActor receive(Error)-----")
  myAkkaActor1 ! "error!" 
  Thread.sleep(5000)
  println("-----FirstErrorActor receive(Success)-----")
  myAkkaActor1 ! "success!" 
実行結果下記のようになるはずです。
> run [info] Running Main -----NomalActor receive----- success! by NomalActor -----FirstErrorActor receive(Error)----- error! by FirstErrorActor NomalActor pre Restart. NomalActor post Restart. [ERROR] [12/01/31 17:59] [akka:event-driven:dispatcher:global-2] [LocalActorRef] FirstErrorActor Error java.lang.Exception: FirstErrorActor Error at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:24) at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:21) at akka.actor.Actor$class.apply(Actor.scala:545) at FirstErrorActor.apply(MyAkka.scala:16) ・・・・・・ ・・・・・・ FirstErrorActor pre Restart. FirstErrorActor post Restart. -----FirstErrorActor receive(Success)----- success! by FirstErrorActor FirstErrorActor receive done.
FirstErrorActorの1回目の実行でエラーが起こった後、NomalActorの再スタートが行われているのがわかります。
その後FirstErrorActorも再スタート処理が行われ、2回目は問題なく処理が終了しています。
まとめ
今回はAkkaのFault Tolerance機能に少しだけふれて見ました。
簡単なサンプルでしたが、この機能を使えばエラー発生時のリカバリ処理が比較的容易に記述できますね。
参考サイトなど
- Scalaで並行処理#1 - Actorを使う:https://dev.classmethod.jp/server-side/scala-actor/
- Scalaで並行処理#2 - AkkaのActorを使う: https://dev.classmethod.jp/etc/scala-akka/
- Scalaで並行処理#3 - Akkaのconfigファイル:https://dev.classmethod.jp/server-side/actor3-config/











