Akka ことはじめ : 起動中の Actor に非同期メッセージを送って本処理のパフォーマンスに影響を与えずバッチ処理をしてもらおう

2017.02.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

耐障害性に優れた並行処理が記述できる Akka ですが、皆様はどのような用途で利用しているでしょうか。ストリーム処理、分散システムの構築…様々考えられますが、ここでは「バッチ処理を依頼され、それをこなすワーカー」というシンプルな用途での Akka を考えます。また、アプリケーションの作成に Play Framework を選択した状況を前提とします。Play Framework を使うことで、Play アプリケーションと Akka を統合することができ、アプリケーションの一部の処理を Akka Actor に実行させたり、アプリケーションとは独立して動作する Akka Actor を構築することができます。

ユースケース

以下のような状況です。

api_akka

  • ① Controller : 外部からのリクエストを受け付けます
  • ② Service : Controllerの指示に従い、MessageReceiver に対して 非同期メッセージを送ります
  • ③ Supervisor : 配下の Actor の死活監視、オートヒーリングを行います
  • ④ MessageReceiver : バッチ処理を受け付ける Actor です
  • ⑤ MessageMaintainer : バッチ処理を実行する Actor です

つまり、「バッチ処理を受け付けるAPIがおり、リクエストベースで処理を走らせたい」というシーンです。リクエストをしてきたクライアントに対しては、Akka Worker への依頼が終わり次第、すぐさま 202 Accepted を返します。実現するには以下3ステップ踏みます。

  1. Akka Worker を構築する
  2. 起動している Akka Worker の Actor インスタンスを、API側から入手する
  3. バッチ処理を受け付ける Actor (MessageReceiver) に対して非同期メッセージを送る

1. Akka Worker を構築する

まず、Supervisorとなる Actor を定義します。

class BulkPointUpdateSupervisor @Inject() (        // --- ①
    bulkUpdateService: BulkPointUpdateService
) extends Actor {

  private val messageMaintainer = context.actorOf(Props(
    classOf[MessageMaintainer], bulkUpdateService
  ), "apirequestmaintainer")

  private val messageReceiver = context.actorOf(Props(
    classOf[MessageReceiver], messageMaintainer
  ), "messagereceiver")

}
  • ① サービス・設定のDI : @Inject() を宣言することで、「BulkPointUpdateServiceのインスタンス作成/取得方法はDIモジュールに一任する」という意味になります

フィールドとして新しく MessageMaintainerMessageReceiver を生成していることがわかります。あとは、Playアプリケーションが起動したときに、このSupervisorがシングルトンで起動すればOKです。DIを設定します。

@Singleton
class WorkerBootstrap @Inject() (
  @Named("bulkpointupdateprocessor") bulkPresentProcessor: ActorRef     // -- ①
)

class WorkerBootstrapModule extends AbstractModule with AkkaGuiceSupport {
  override def configure(): Unit = {
    bind(classOf[WorkerBootstrap]).asEagerSingleton()    // -- ②
    bindActor[BulkPointUpdateSupervisor]("bulkpointupdateprocessor")    // -- ③
  }
}
  • : "bulkpointupdateprocessor" という名前の ActorRef をDI設定から探してインスタンスを取得します。
  • : Guiceの設定です。アプリケーション起動時に走ります。ここでは、WorkerBootstrap をシングルトンで生成します。
  • : BulkPointUpdateSupervisorに"bulkpointupdateprocessor"という名前をつけます。これで、①のような記述からインスタンスを取得することができます。

これで、アプリケーション起動時に Akka Actor たちがシングルトンで起動するようになりました。

2. 起動している Akka Worker の Actor インスタンスを、API側から入手する

話はAPI側に移ります。リクエストを受け付けたら、先で起動した Akka Worker に処理を委譲したいと思ってます。コードは以下のような流れになるでしょう。

class PointController @Inject() (
    bulkPointUpdateMessengerService: BulkPointUpdateMessengerService
) extends Controller {

  def bulkCsv: Action[JsValue] = Action.async(BodyParsers.parse.json) { req =>
    for {
      body <- jsResultToFuture(req.body.validate[PointUpdateRequest])
      _ <- bulkPointUpdateMessengerService.send(body)
    } yield Accepted
  }
}


class BulkPointUpdateMessengerService @Inject() () {

  // MessageReceiver へ メッセージを送りたい・・・
  def send(body: PointUpdateRequest): Future[Unit] = ???

}

BulkPointUpdateMessengerService#sendでどのような処理を書けばよいでしょうか。 Akkaのドキュメントによると起動している Akka Actor を探す方法方法として、ActorSelectionを用いる方法があるようです。早速書いてみます。

class BulkPointUpdateMessengerService @Inject() (
  system: ActorSystem   // --- ①
) {

  // 作戦1 : ActorSystem を取得して MessageReceiver を探す
  def send(body: PointUpdateRequest): Future[Unit] = {
    val receiver = system.actorSelection("/user/bulkpointupdateprocessor/messagereceiver")  // --- ②
    receiver ! MessageReceiver.MessageReceive(body)
  }
}
  • ① ActorSystemのDI : ActorSystem をDIしています。Play Framework では、 ActorSystem をDIすることでPlayアプリケーションの ActorSystem を自動でDIしてくれます。
  • ② Actorを探す : actorSelectionで名前を指定することで、起動している Actor を探すことができます。

/userは、ユーザ(プログラマ)が明示的にアプリケーションコードで作成した Actor であることを示すパスです。/bulkpointupdateprocessorはSupervisorの名前です。/messagereceiverは MessageReceiver の名前です。どちらもインスタンスを作る際に、名前をつけていましたね。 MessageReceiver は Supervisor によって生成されたActorであるため、親子関係にある ということで、Actor のパスも階層を下っていってるような形になります。

これでもメッセージを送ることは出来ますが…ちょっと…微妙ですよね…。一番違和感を覚えるのは APIがどこかで生成されたActorの名前を知っている という点です。Actor の階層が変わったり、名前が変わったりした場合、API側のこのコードも修正せねばなりません。この先、複数箇所でactorSelectionを行う未来が可能性としてあることを考えると、あまりとりたくない選択肢です。せっかくDIの仕組みを使っているので、「インスタンスの取得は、DIモジュールに任せる」ようにしましょう!

@Singleton
class WorkerBootstrap @Inject() (
  @Named("bulkpointupdateprocessor") bulkPresentProcessor: ActorRef
)

class WorkerBootstrapModule extends AbstractModule with AkkaGuiceSupport {
  override def configure(): Unit = {
    bind(classOf[WorkerBootstrap]).asEagerSingleton()
    bindActor[BulkPointUpdateSupervisor]("bulkpointupdateprocessor")
  }

  @Provides
  @Named("actor_bulk_point_update_receive")
  def provideBulkPresentReceiver(system: ActorSystem): ActorSelection =
    system.actorSelection("/user/bulkpointupdateprocessor/messagereceiver")
}

WorkerBootstrap モジュールに プロバイダーを追加しました。これで、「"actor_bulk_point_update_receive" という名前で ActorSelection が Inject された場合、ActorSystem から /user/bulkpointupdateprocessor/messagereceiver を探して返す」という動きになります。そうするとAPI側のサービスは以下のように書けます。

class BulkPointUpdateMessengerService @Inject() (
  @Named("actor_bulk_point_update_receive") messageReceiver: ActorSelection
) {

  // 作戦2 : DIモジュールに ActorSelectionをとってきてもらう
  def send(body: PointUpdateRequest): Future[Unit] = {
    messageReceiver ! MessageReceiver.MessageReceive(body)
  }
}

美しい。

3. バッチ処理を受け付ける Actor (MessageReceiver) に対して非同期メッセージを送る

実はもうやっています。

messageReceiver ! MessageReceiver.MessageReceive(body)

これです。! は Actor のメソッドで、「非同期メッセージを送る」ということをやっています。そうなると MessageReceiver.MessageReceive(body) は引数で、メッセージの内容ということになります。あとは MessageReceiver 側で、受け取ったメッセージからリクエスト内容を展開、 MessageMaintainer に処理を渡す流れです。

ちなみに、

messageReceiver ? MessageReceiver.MessageReceive(body)

こうすると MessageReceiver がら返事がくるまで待ちます。投げっぱなしではなく、リクエスト内容を正しく解釈できたかどうかを待つ場合などに用いると良いでしょう。うっかり「バッチ処理が終わるまで待つ」ということにならないようにしてください。実は私がMessageReceiverとMessageMaintainerを分けているのはこういった理由があります。解釈する人と、実行する人を分けるということです。

応用

これまで述べた内容は重い処理を Akka Worker に依頼する例でしたが、同じ原理を利用して「本処理にパフォーマンスを与えず非同期ログを出力する」といったシーンでも使えます。ログ出力する時刻と順序がそれほど問題にならないような、パフォーマンスログなどを出力したい場合に役立ちます。

class LoggerActor extends Actor {
  import LoggerMessage._

  val logger = new play.api.Logger(LoggerFactory.getLogger("application"))

  override def receive: Receive = {
    case Debug(c) => logger.debug(c)
    case Info(c) => logger.info(c)
    case Warn(c) => logger.warn(c)
    case Error(c, e) => logger.error(c, e)
  }
}

//  ロガー用アクターに対するメッセージ
object LoggerMessage {
  case class Debug(content: String)
  case class Info(content: String)
  case class Warn(content: String)
  case class Error(content: String, e: Throwable)
}

特にSupervisor等で監視する必要もないので、object内で Actor を生成することでインスタンスを一つだけつくってやります。

object ActorLogger {

  val system = ActorSystem("ActorLogger", ConfigFactory.load().getConfig("actor-logger"))

  val logger = system.actorOf(Props(classOf[LoggerActor]))    // Actor 生成

  def debug(c: String): Unit = logger ! Debug(c)

  def info(c: String): Unit = logger ! Info(c)

  def warn(c: String): Unit = logger ! Warn(c)

  def error(c: String, e: Throwable): Unit = logger ! Error(c, e)

}

以下のように使えます。

ActorLogger.info("type=Service body=" + Json.toJson(request))

まとめ

時間のかかるバッチ処理を実行するにあたり様々な方法が考えられますが、SQSをはじめとしたキューイングシステム等を使い始めるとなかなかアーキテクチャが大掛かりになりがちです。Java / Scala, とりわけ Play Framework を使っている場合、Akka Actor により非同期処理の依頼と実行をアプリケーション内で完結できるため、選択肢として強力です。Akka は様々な可能性を秘めたライブラリで、その用途は多岐に渡りますが、ひとつ、Supervisorパターンを踏襲したワーカーとして利用できること、ふたつ、ワーカーに対してアプリケーションから非同期にバッチ処理の依頼ができることを示しました。また新しいユースケースを試すことが出来たら、公開していきたいと思います。

参考資料