Akka と Amazon ECS で構築するワーカー基盤:1万件のメッセージを処理させてみた

2016.12.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

この記事は

Scala Advent Calendar 2016 - Qiita19日目の記事です。私は普段、クラスメソッド株式会社でサーバーサイドエンジニアとして働いています。業務のかたわら、「ScalaとAWSを使ってなにができるか」ということを主なテーマにブログを執筆しています:和田祐介 | Developers.IO

今回は、AkkaとAmazon ECSを触ってみたいと思い、実際にワーカーを構築して処理させてみましたので記録を残します。

なにがしたいか

ScalaおよびJavaの並列分散処理フレームワークである Akka におけるアプリ実行環境として、Amazon ECSが選択できることを示します。また、ECSのTask数(コンテナ数)によって処理性能がどう変わるか、ひとつの例を出します。どちらも、単純に使ってみたかったというのが動機です。ただ、Akkaは分散環境で動作することを前提としていますし、Amazon ECS については分散環境をDockerコンテナで手軽に実現できるため、相性が良いのでは、とも考えました。

結論としては、実行環境として申し分なさそうです。キューイングされたメッセージをさばくタイプのワーカーを実装する場合、ひとつの選択肢として考慮しても良いと思います。この記事では、Akka と Amazon ECS を使ったワーカーの実装に触れつつ、構成の例と、テスト実行した結果について主に記載します。コンテナイメージの作成をはじめとした細かい手順は割愛している箇所が多いのでご注意ください。

実施環境

項目
playframework バージョン 2.5.10
scala バージョン 2.11.8
aws-java-sdk バージョン 1.11.70

Akka

JVM上でメッセージ駆動アプリケーションを作るためのツールキットおよびランタイムです。できることは多岐に渡り、そのためのパターンやライブラリが多くあります。基本的な考え方は、①Actorという処理の実行単位を生成し、Actorに対してメッセージを送ることで処理を行行わせる、②Actorを別のActorが管理することで耐障害性に優れたシステムを構築する、という2軸です。以下の資料が参考になると思います。

Amazon ECS

インフラ側の仕組みです。Dockerコンテナに対応しており、EC2インスタンスへのコンテナ自動展開や、起動したコンテナの監視およびオートスケールを提供してくれます。アプリケーションをコンテナに乗せていく場合、実行環境として一考の価値があるサービスだと思います。Akkaアプリケーションが動作するサービスをDockerイメージとして作成し、ECSを使ってそれらを複数展開していく ということをやります。

実装するワーカーサンプル:ユーザーごとのニュース配信バッチ

例として、あるニュース記事のサイトで、ログインしているユーザだけに表示したい記事があるとします。「このユーザーは配信対象かどうか」というデータをDynamoDBで管理しています。配信情報はSQSに流れてきます。そこで、ユーザーごとのニュースを配信するバッチが必要になりました。SQSをポーリングして取得したメッセージをもとにDynamoDBへ保存する処理です。これをAkkaで実装します。

実装したワーカーからDocker Imageを作成し、Amazon ECSを使ってコンテナとして配備します。

akka-ecs

Supervisor Strategy

ワーカーの中の話になりますが、Supervisor Strategy を採用します。簡単にいうと、Actorに役割を与え、それらを組み合わせてひとつの処理を実現するパターンです。あるActor(Supervisor)が別のActorの状態を監視しておき、Actorに問題が発生したら再起動や再処理を行わせることができるようになります。これにより耐障害性の向上が期待できます。

supervisor-strategy

class NewsProcessSupervisor @Inject()(  // -- ①
    receiveService: NewsMessageReceiveService,
    deleteService: NewsMessageDeleteService,
    contentRegisterService: ContentRegisterService,
    config: NewsConfig
) extends Actor {

  val deleter: ActorRef = context.actorOf(Props(
    classOf[MessageDeleter], deleteService
  ), "MessageDeleter")  // -- ②

  val maintainer: ActorRef = context.actorOf(Props(
    classOf[MessageMaintainer], deleter, contentRegisterService
  ), "MessageMaintainer")// -- ③

  val receiver: ActorRef = context.actorOf(Props(
    classOf[MessageReceiver], maintainer, deleter, receiveService
  ), "MessageReceiver") // -- ④

  val pollingScheduler: ActorRef = context.actorOf(Props(
    classOf[PollingScheduler], receiver, config
  ), "PollingScheduler") // -- ⑤

  override def receive: Receive = {
    case Ping => sender ! Pong
  }
}

object NewsProcessSupervisor {
  case object Ping
  case object Pong
}

Supervisorは、SQSから受け取ったメッセージを扱うActor群を管理します。

  • ① @Inject: Playframeowrkの仕組みを使い、依存性の注入を行っています。
  • ② MessageDeleter: 処理が終わったメッセージをSQSから削除するのが仕事です。
  • ③ MessageMaintainer: SQSメッセージのデータを使って処理を行います。実際にはMessageMaintainerがさらに子Actorをつくり、子ActorがDynamoDBへの保存処理を行うようにさせています。
  • ④ MessageReceiver: SQSからメッセージを受け取り、ワーカーが解釈できるオブジェクトに変換します。
  • ⑤ PollingScheduler: SQSからメッセージをポーリングします。

Docker Image の作成

SBTの Docker Pluginを使えば、簡単に Docker Image が作成できます。

build.sbt(抜粋)

lazy val news = project.in(file("./news"))
    .enablePlugins(PlayScala)
    .enablePlugins(DockerPlugin)
    .settings(
      name := "ecn-worker-news",
      version := "0.1.0"
    )
    .settings(
      maintainer in Docker := "Yusuke Wada <wada.yusuke@classmethod.jp>",
      dockerExposedPorts in Docker := Seq(9000, 9443),
      dockerRepository := Some("cmwadayusuke")
    )
$ bin/activator news/docker:publish

なんとこれでコンテナを立ち上げれば、ワーカーが起動しポーリングが開始するようになります。便利〜。PlayframeworkアプリケーションのDockerコンテナ作成にはこちらを参考にさせていただきました。

Amazon ECS で実行環境の構築

これでワーカー側の準備は整いました。次は実行環境です。Amazon ECS では以下の手順でコンテナの実行環境を構築していきます。

  • Task Definitionの作成
  • ECS Clusterの作成
  • Serviceの定義、Taskの起動

Task Definition

ECSでは、実行環境上で動作するコンテナの単位を Task としています。そのTaskの動作定義を記述するのがTask Definitionです。主に以下のような内容を設定します。

  • Taskを実行するときの IAM Role
  • コンテナに展開するDocker Image
  • コンテナのメモリ上限
  • 環境変数
  • ログ出力

作成したTask Definitionを使って、ServiceがCluster上にTaskを展開します。

ECS Clusterの作成

  • コンテナを展開するEC2インスタンスのタイプ、またその数
  • EC2インスタンスのIAM Role
  • VPC

といったことが設定できます。

Serviceの定義、Taskの起動

  • コンテナ起動に利用するTask Definition
  • Taskをいくつ起動するか
  • Taskのオートスケーリング

これらを決めることが出来ます。起動するTaskの数が1以上になっていれば、Service作成後ただちにTaskが展開されます。Docker Imageがベースとなっているからなのか、起動まで相当早いです。体感だと10秒程度ではないでしょうか。

これで、Akkaで作成したワーカーをECSに展開することができました。次は実際に処理をさせてみます。

1万件のSQSメッセージを処理させるテスト

1万会員分のメッセージがSQSにある状態で、ECSのServiceを使ってTaskを起動し、メッセージをすべて処理し終えるまでどの程度かかるかをみてみます。

条件

項目
SQSメッセージの数 10,000
Akkaワーカーのポーリング間隔 0.1秒
Task あたりのメモリサイズ 1,024MB
ECS ClusterのEC2インスタンスタイプ m3.large
ECS ClusterのEC2インスタンス台数 1
Taskのオートスケーリング なし
リージョン 東京

sqs-10000

テスト内容

変数は同時に実行するコンテナの数だけです。これを、1, 2, 4, 6と変化させて全体としての処理能力をみてみます。つまり、1万件のメッセージに対して、Taskひとつのみで立ち向かうパターンと、複数で処理するパターンで実行します。もちろん、複数で処理する場合の方が処理能力が高くなることを期待しています。

テスト結果

Task数 処理時間 スループット(処理メッセージ数 / 秒)
1 49秒 204.1
2 50秒 200.0
3 38秒 263.2
4 46秒 217.4

Task数3のときに最良のスループットが出るようですが、どうやらこの条件下では線形にスループットが向上するというわけではなさそうです。ただ、データストアに書き込むタイプの処理1万件が1分以内に完了するというのは、なかなかワーカーとして優秀ではないでしょうか。もちろん、DyanmoDBへはデータ書き込みが完了しています。

dynamodb

感想

Akka と Amazon ECS を組み合わせて、ワーカー基盤を構築することができたのでその点は満足しています。ただ、システム全体の並列性テストについてはまだもう少し確かめる余地がありあそうです。

  • ポーリング間隔を抑えてみて、Task数を増減させたときにどうなるか
  • SQS内のメッセージ数を増やしたときにスループットが変化するか
  • SQSのタイプをFIFOにするとどうなるか(FIFOキュー自体にスループット制限があるため微妙かも)
  • Akka Acotrの配置パターンを変えてみるとどうなるか。SupervisorとMaintainerのコンテナを別にする、など

以上についても試してみたいと思います。

つまったこと

そもそもPollingSchedulerからSQSへ接続できず、拒否される(403)※追記あり

以下、aws-java-sdk の古いバージョン(1.10.1)で実施した際に遭遇した事象

Taskに割り当てたIAMロールはSQSへの接続が許可されており、問題なくメッセージを取得できるはずだと思ったのですが、403が返ってきてメッセージが取得できないという問題です。これで3,4時間はまりました。結局、母体となるEC2インスタンスに割りあてられていたAmazonEC2ContainerServiceforEC2RoleにもSQSへのアクセスポリシーを付与することで解消しました。が…

新しく公開されたECSタスクのためのIAMロールの導入によって、EC2コンテナインスタンスではなくECSタスクに直接IAMロールを紐付けることで、基盤をより安全に保つことができます。この手法によって、1つのタスクはS3にアクセスする特定のIAMロールを使いながら他のタスクはDynamoDBテーブルへにアクセスするIAMロールを使うことができます。

この記述と矛盾するため、疑問に思っていました。

参考:ECSタスクのためのIAMロールによってコンテナ利用アプリケーションをより安全にする | Amazon Web Services ブログ

2016年12月21日(水)追記 : aws-java-sdk 1.11.70 でドキュメント記載通りの挙動となりました

この事象について、私が利用しているAWSのSDKが古い可能性があるというご指摘を頂戴しました。aws-sdk-java のGitHubにて最新バージョンを確認の上、build.sbt を修正しイメージを作り直しました。するとドキュメント記載の通り、EC2のロールについては最小限のポリシーを指定するのみで、コンテナからSQSへ接続することができました。最終的なロールとポリシーは以下の状態となっっています。

ロール名 アタッチしているポリシー 適用対象
ecsInstanceRole AmazonEC2ContainerServiceforEC2Role のみ ECS Cluster を構成するEC2インスタンス
ecnWorkerNewsRole AmazonSQSFullAccess, AmazonDynamoDBFullAccess Taskのコンテナ

Taskを複数起動しようとするとポートが重複し起動できない

Task DefinitionでネットワークタイプをHostにしてしまっていたのが問題でした。BridgeにすることによりEC2インスタンスとは別のIPアドレスを取得することが出来、複数起動することができました。

1万メッセージテスト中、オートスケーリングを無効にしているのにTaskが何度も立ち上がる

Taskあたりのメモリサイズを512MBに指定していたのですが、それよりもHard Limitに設定していたことが問題でした。Hard Limitの場合、メモリ使用量が超えた時点でただちにTaskがKillされます。一方ECSのServiceは一定台数が起動している状態を保とうとするため、Killされたそばから新しいTaskが立ち上がるという状態でした。メモリ上限を引き上げ、かつSoft Limitとすることで解消しました。

ソースコード

参考