ちょっと話題の記事

AWSでジョブWorkerを構成するベストプラクティス 〜 Brianの巻

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

よく訓練されたアップル信者、都元です。ここまで「AWSでジョブWorkerを構成するベストプラクティス」と題して2本の記事を書いてまいりました。SQSの巻では、バックグラウンドジョブの実行にはSQSを使ってスケーラビリティを確保しようという、下図中の中段の話をしました。続いてBeanstalk worker tierの巻では、Workerの実装方法として、図中の下段の話をしました。今回は、上段の話です。

sqs

ジョブスケジューリング

ここまでの話では「ジョブは、ユーザからのWebサーバへのリクエストに伴って発生する」という暗黙の前提で進めてきました。具体的には動画のエンコーディングや、少々時間の掛かる集計処理等をイメージして頂いたと思います。

しかし、ジョブというのは「ユーザのリクエスト」に起因して発生するばかりではありません。ユーザからのリクエストに関わらず「定期的な繰り返し」という、スケジュールに基づいた発生があります。例えば日次・月次等の自動集計処理や、定時連絡的なメール送信等が考えられます。

このようなジョブで皆さんが真っ先に思い浮かべるのがcronでしょう。では直感に従って、cronで10分に一度、SQSにメッセージを送るトリガサーバを思い浮かべてください。

トリガサーバの冗長化

ところで、クラウドの売りの一つは「スケーラビリティ(拡張性)」です。これは単純に、HTTPリクエスト処理要求の増加に伴ってWebサーバの数を増やすだけの話ではありません。昨日ご紹介したように、ロードバランサ、キュー、ワーカー、そして(RDSであれば限定的ですが)データベースのように、システム内の各コンポーネントについて、スケールアウトできる構成が期待されます。また「アベイラビリティ(可用性)」の確保のために、各コンポーネントはAZをまたいで冗長化される必要があります。

このcronによるトリガの仕組みは「スケーラビリティ(拡張性)」面において、どうでしょうか。仮にトリガの数と頻度が爆発的に増加しても、スケールアウトによる性能の確保は可能でしょうか。まぁ、1つ1つのcronの処理は小さな負荷ですので、複数のトリガサーバで分担すればどうにかなるかもしれません。が、要するにシャーディングのようなことをするわけですから、簡単にスケールアウトするとは言い切りづらいところがあります。

まぁまぁ、トリガの発火なんて大した負荷ではありませんので、スケーリングが追いつかない程の大量発火はあまり想像できないので、とりあえず良しとしましょう。しかし、では「アベイラビリティ(可用性)」面についてはどうでしょうか。冗長性の確保のため、同じトリガサーバを2つ作った途端、トリガの発火も二重化されることになります。まぁ、SQSの特性上、同じジョブが複数回発火されても良いような作りにしなければならない、というのは以前お話しした通りですが…。それにしてもほぼ必ず2回以上のメッセージが飛んでくる、というのはコレジャナイ感が激しいのではないでしょうか。

以上より、私的には「AWSネイティブな設計を前提にする場合、cronは選択肢から外れる」という結論になりました。まぁ、使うなってことじゃないんですけど、残念な制限事項が残っちゃうことを受け入れる必要がありますね。

AWS上のトリガとして望まれるもの

ではこのようなジョブスケジューリングをAWSネイティブに実装する場合、どうすれば良いのでしょうか。私は「AWSがサービスとして提供してくれる」のが一番良いと考えていますが、残念ながら本稿執筆時点で、このようなサービスはありません。Amazon Elastic Job Schedulerとかがあればなー。cron表記に従って、そのスケジュールでSNS Topicにメッセージを投げてくれるだけでいいんですけど…。

AWSさん、ご検討よろしくお願いします。

Brian

まぁ、無い物ねだりを続けても進歩が無いので。作りました。Brianと呼んでください。

2014-05-16_1502

構成図だけでは伝わりにくいと思うのですがとりあえず。複数のAZにまたがって、複数のEC2インスタンスが起動しています。これらのなかではジョブスケジューラが動いています。すなわちこれらはトリガサーバです。しかし同じトリガが複数台によって発火されてしまわないよう、RDSを介して同期を行っています。発火予定時刻の数秒前に、各トリガサーバが早い者勝ちで発火の担当決めを行います。そしてトリガ発火のタイミングで、SNSにメッセージを投げるようになっています。その先にSQSをつないでおけば、ジョブWorkerまでの道はひらけますね。

そもそものトリガの登録や削除は、トリガサーバにREST風のWeb APIがありますので、そこを叩くことによって実現します。トリガ情報はRDSに書き込まれるため、複数台ある中の何れか1つのAPIさえ叩ければ登録は完了します。というわけで、InternalのELB(VPC内からしか叩けないELB)を介して操作します。

と、偉そうに説明してみましたが、この仕組みはJavaのオープンソースライブラリQuartzを使って実装しています。トリガの登録や削除の操作をWeb APIでラップし、トリガの発火時の処理としてSNSへのメッセージ送信を実装しただけです。詳細はQuartzのドキュメントを御覧ください(丸投げ)。

やってみよう

ここではまず、ざっとポイントを抑えましょう。細かくは色々あるので、実際にBrianを本格的に使ってみようとする際には、次節の詳細仕様をきちんと押さえておいてください。

まず、BrianはJava7で実装されたWebアプリケーションです。warファイルはS3に置いてあります。そして、Elastic Beanstalk (Web tier) で動かすことを前提に考えられています。まぁ、その他環境で動かすことももちろんできますが、色々メンドクサイと思います。

前述の通り、ELBはInternalにします。Brianには認証機構が何もないので、外部公開するのは危険です。ヘルスチェックのエンドポイントは/healthcheckです。

BrianはSNSにメッセージをpublishします。なので、publish先のSNS Topicが必要です。作っておいて、そのARNを控えておいてください。

また、publish操作のためのAPIキーはEC2のInstance Profile経由で取得します。つまり、Brianが動くインスタンスにはInstance Profileが必要です。ざっくりとPowerUserでも良いですし、下記のとおり、きっちり締めあげても構いません。Resourceの項には、先ほど控えたARNを記述します。

{
  "Statement": [
    {
      "Sid": "BrianStmt",
      "Effect": "Allow",
      "Action": "sns:Publish",
      "Resource": "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-topic"
    }
  ]
}

その他、前述の通り、同期用のMySQLが必要なので、RDSで構築する必要があります。

JavaのシステムプロパティとしてJDBC_CONNECTION_STRING, DB_USERNAME, DB_PASSWORD, BRIAN_TOPIC_ARNの4つの値が必要です。それぞれ、JDBC接続URI *1、DBユーザ名、DBパスワード、発火時のメッセージ送信先SNS Topic ARNを指定します。

って素で聞くと嫌になるもんですよね。そういう時のためのCloudFormationです。

brianのデモを起動

パラメータは以下のとおりです。

  • VPC: VPC ID(デフォルトVPCのIDを入れましょう)
  • VPCCIDR: VPCのCIDRブロック(デフォルトVPCのブロックを入れましょう)
  • Subnet1: サブネットIDその1(デフォルトVPCの(ry)
  • Subnet2: サブネットIDその2(デフォルトVPCの(ry)
  • DBUsername: RDSのマスターユーザ名
  • DBPassword: RDSのマスターパスワード
  • KeyName: ECのキーペア名。EC2へのSSH接続用。
  • BrianVersion: Brianのバージョンです。今のところ0.5で。

あとはデフォルトのままでContinueボタン連打で構いません。構築所要時間は20〜30分 *2です。環境内にbastion(踏み台)が立ち上がりますので、まずはそこにSSH接続します。

続いて、bastionのhome直下に、下記のようなJSONファイルを作って、testEvery20sec.jsonとして保存してください。

{
  "triggerName": "testEvery20sec",
  "scheduleType": "cron",
  "cronEx": "*/20 * * * * ?",
  "timeZone": "Universal",
  "jobData": {
    "foo": "bar",
    "baz": "qux"
  }
}

次に、BeanstalkのManagement Consoleを覗いて、ステータスがGreenであることを確認しつつ、beanstalkのURLをひかえておきます。

2014-05-20_1554

そのURLを使って、下記のようなコマンドを叩くことによってトリガの登録ができます。上記のJSONをPOSTしているだけですね。

$ curl -v -H "Content-type: application/json" \
    -X POST http://brian-demo-XXXXXXXXXX.elasticbeanstalk.com/triggers/DEFAULT \
    -d @every20sec.json
* About to connect() to brian-demo-XXXXXXXXXX.elasticbeanstalk.com port 80 (#0)
*   Trying 172.31.24.186...
* Connected to brian-demo-XXXXXXXXXX.elasticbeanstalk.com (XX.XX.XX.XX) port 80 (#0)
> POST /triggers/DEFAULT HTTP/1.1
> User-Agent: curl/7.29.0
> Host: brian-demo-XXXXXXXXXX.elasticbeanstalk.com
> Accept: */*
> Content-type: application/json
> Content-Length: 166
>
* upload completely sent off: 166 out of 166 bytes
< HTTP/1.1 201 Created
< Content-Type: application/json;charset=UTF-8
< Date: Mon, 19 May 2014 04:43:54 GMT
< Server: Apache-Coyote/1.1
< transfer-encoding: chunked
< Connection: keep-alive
<
* Connection #0 to host brian-demo-XXXXXXXXXX.elasticbeanstalk.com left intact
{"success":true,"message":null,"content":{"nextFireTime":"2014-05-19T04:44:00Z"}}

JSONでtestEvery20secというトリガ名を指定しています。また、(世界標準時解釈で)20秒に一度発火するトリガーを定義しています。レスポンスとして、次回の発火予定時刻が返っているのが分かると思います。

このPOSTを行った結果、SNSトピックを介して、SQSキューにみるみるメッセージが溜まっていくのが観察できると思います。興味のある方はSNSトピックをemailでsubscribeしてみると、20秒に1回メールが届いて楽しいです。SQSまで届いたメッセージの一つを抜粋・整形して下記に引用しておきます。

{
  "Type" : "Notification",
  "MessageId" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
  "TopicArn" : "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-demo-BrianTopic-XXXXXXXXXXXXX",
  "Message" : "{\"brianVersion\":\"0.5\",\"fireTime\":\"2014-05-19T04:44:01Z\",\"scheduledFireTime\":\"2014-05-19T04:44:00Z\",\"nextFireTime\":\"2014-05-19T04:44:20Z\",\"refireCount\":0,\"recovering\":false,\"fireInstanceId\":\"i-XXXXXXXX_1400469148294\",\"trigger\":{\"timeZone\":\"Universal\",\"cronExpression\":\"*/20 * * * * ?\",\"group\":\"DEFAULT\",\"name\":\"testEvery20sec\",\"misfireInstruction\":0,\"startTime\":\"2014-05-19T04:43:54Z\"},\"jobData\":{\"baz\":\"qux\",\"foo\":\"bar\"}}",
  "Timestamp" : "2014-05-19T04:44:02.719Z",
  "SignatureVersion" : "1",
  "Signature" : "BfxBLF9vPfhqeEPOjM2xeBYCJdJ7F2T6MDH7W9MAjuY69VEXAMPLE==",
  "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-e372f8ca30337fdb084e8ac449342c77.pem",
  "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-demo-BrianTopic-XXXXXXXXXXXXX:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}

このフォーマットはHTTP/HTTPS Notification JSON Formatに定義されたものです。電子署名により、正規のルートから配信されたメッセージであるかどうか検証することも可能です。

で、Messageの値として入れ子のJSONが入っているのでちょっと見づらいのですが。こちらを整形してみると下記の通りです。

{
  "brianVersion": "0.5",
  "fireTime": "2014-05-19T04:44:01Z",
  "scheduledFireTime": "2014-05-19T04:44:00Z",
  "nextFireTime": "2014-05-19T04:44:20Z",
  "refireCount": 0,
  "recovering": false,
  "fireInstanceId": "i-XXXXXXXX_1400469148294",
  "trigger": {
    "timeZone": "Universal",
    "cronExpression": "*/20 * * * * ?",
    "group": "DEFAULT",
    "name": "testEvery20sec",
    "misfireInstruction": 0,
    "startTime": "2014-05-19T04:43:54Z"
  },
  "jobData": {
    "baz": "qux",
    "foo": "bar"
  }
}

fireTimeは実際の発火時刻、scheduledFireTimeは本来発火予定だった時刻、です。僅かなズレがありますが、分散型である以上許容せざるを得ない誤差です。またjobDataの値としては、トリガ作成時に登録したパラメータがそのまま送られてきています。任意のデータを設定して、好きに使って良い部分です。

まとめ

ジョブスケジューラは、AWS上でHA実装が難しい仕組みでした。この仕組みを導入することにより、高い信頼性を持ったジョブスケジューラを容易に構築できます。

というわけで、あらためまして、ジョブWorker三部作はこれにて完結です。

詳細仕様

以下、長いので付録だと思ってください。本気で使う人用の情報です。

ロケーション

アプリケーション自体はcm-public-eb-applicationsバケットのbrian/brian-0.5.warに公開状態で配置してあります。ご自由に。

デモ起動用のCloudFormationテンプレートの内容もあわせてご確認ください。

起動時のシステムプロパティ

Brianは、動作に必要なパラメータをシステムプロパティとして受け取ります。

キー 必須 (default) 説明 典型的な値
JDBC_CONNECTION_STRING Yes MySQLのJDBC接続URL jdbc:mysql://localhost:3306/brian
DB_USERNAME Yes DB接続ユーザ名
DB_PASSWORD Yes DB接続パスワード
BRIAN_TOPIC_ARN Yes 発火時のメッセージpublish対象SNSトピックARN arn:aws:sns:ap-northeast-1:000000000000:brian-topic
aws.accessKeyId No SNS publish に利用するAPIキー。無い場合はInstance Profileを利用。 AKIAXXXXXXXXXXXXXXXX
aws.secretKey No SNS publish に利用するAPIシークレット。無い場合はInstance Profileを利用。
spring.profiles.active No (production) 起動プロファイル。動きの違いは、今のところログ出力の詳細化だけ。 production / development のいずれか
logback.configurationFile No logbackのログ設定ファイルのフルパス /etc/brian/logback.xml

トリガ操作エンドポイント

一般的なリクエスト・レスポンス

時刻はISO形式(例: 2014-05-08T06:50:33Z)を用いる。

各エンドポイントは、特に明示のない限り、下記のようなJSON形式のレスポンスを返す。

キー 説明 典型的な値
success リクエストが成功したかどうか true
message 何かメッセージがあれば null
content レスポンスの本体 (エンドポイント毎に異なる)

例1

{
  "success": true,
  "message": null,
  "content": ...
}

例2

{
  "success": false,
  "message": "404 - Not Found",
  "content": null
}

【グループ一覧】GET /triggers

request parameter

なし

response code
code 備考
200 OK 成功
500 Internal Server Error バグorz
response body
キー 説明 典型的な値
content グループ名文字列の配列 ["foo","bar"]

{
  "content": [
    "DEFAULT"
  ],
  "message": null,
  "success": true
}

【グループ内トリガ一覧】GET /triggers/{triggerGroupName}

request parameter
種類 名前 必須 説明
パス triggerGroupName Yes 一覧対象グループ名
response code
code 備考
200 OK 成功
500 Internal Server Error バグorz
response body
キー 説明 典型的な値
content トリガ名文字列の配列 ["foo","bar"]

【トリガ作成】POST /triggers/{triggerGroupName}

request paramter
種類 名前 必須 説明
パス triggerGroupName Yes 作成するグループ名
request body
キー 必須 (default値) 説明 典型的な値
triggerName Yes 作成するトリガ名 testEvery20sec
scheduleType Yes トリガ発火パターンの表現方法

  • cron : cron表記による表現
  • simple : 間隔と回数による表現
  • oneshot : startAtのみによる単発表現(単純な単回遅延実行)
cron / simple / oneshot
priority No (5) 優先度 5
description No (null) 説明(任意)
startAt No (現在時刻) 有効開始時刻(現在時刻がこの時刻以降である場合トリガが有効となる)
endAt No (null) 有効終了時刻(現在時刻がこの時刻以前である場合トリガが有効となる)
misfireInstruction No (トリガ毎に異なる) TODO
jobData No (null) 発火時メッセージの乗せるデータ
cronEx scheduleType = cronの場合はYes そうでない場合はNo cron表記 */20 * * * * ?
timeZone No (Universal) (scheduleType = cronの場合のみ有効)cron表記を解釈するタイムゾーン Universal
repeatInterval No (0) (scheduleType = simpleの場合のみ有効)繰り返し間隔ミリ秒 20000
repeatCount No (1) (scheduleType = simpleの場合のみ有効)繰り返し回数 10

{
  "triggerName": "testEvery20sec",
  "description": "Sample trigger request - fire every 20 seconds.",
  "scheduleType": "cron",
  "misfireInstruction": "DO_NOTHING",
  "cronEx": "*/20 * * * * ?",
  "timeZone": "Universal",
  "endAt": "2015-04-11T20:00:00Z",
  "jobData": {
    "batchJobName": "testEvery20sec",
    "batchJobParameters": {
      "foo" : "bar",
      "baz(long)" : "qux"
    }
  }
}
response code
code 備考
201 Created 作成成功
400 Bad Request リクエストBody内のCron表記異常
404 NotFound リクエストBody異常
409 Conflict 同名の既存トリガがある
500 Internal Server Error バグorz
response body
キー 説明 典型的な値
content.nextFireTime 次回発火予定時刻 2014-05-08T15:16:00Z

{
  "content": {
    "nextFireTime": "2014-05-08T15:16:00Z"
  },
  "message": null,
  "success": true
}

【トリガ更新】PUT /triggers/{triggerGroupName}/{triggerName}

request parameter
種類 名前 必須 説明
パス triggerGroupName Yes 更新対象のグループ名
パス triggerName Yes 更新対象のトリガ名
request body

POST /triggers/{triggerGroupName} と同様

response code
code 備考
200 OK 更新成功
400 Bad Request リクエストBody内のCron表記異常
404 NotFound 更新対象のトリガが無い リクエストBody異常
500 Internal Server Error バグorz
response body

POST /triggers/{triggerGroupName} と同様

【トリガ情報参照】GET /triggers/{triggerGroupName}/{triggerName}

request parameter
種類 名前 必須 説明
パス triggerGroupName Yes 参照対象のグループ名
パス triggerName Yes 参照対象のトリガ名
response code
code 備考
200 OK 成功
500 Internal Server Error バグorz
response body

このエンドポイントだけ一般的なresponse bodyではないので注意。

キー 説明 典型的な値
group トリガグループ名
name トリガ名
description 説明文
startTime 有効開始時刻(現在時刻がこの時刻以降である場合トリガが有効となる) 2014-05-08T06:50:33Z
endTime 有効終了時刻(現在時刻がこの時刻以前である場合トリガが有効となる)
nextFireTime 次回発火予定時刻
jobDataMap 発火時メッセージの乗せるデータ
misfireInstruction 2
priority 5
cronEx.cronExpression (cronトリガの場合のみ)cron表記
*/20 * * * * ?
cronEx.timeZone (cronトリガの場合のみ)cron表記を解釈するタイムゾーン Universal

{
   "group": "DEFAULT",
   "name": "testEvery20sec",
   "description": "Sample trigger request - fire every 20 seconds.",
   "startTime": "2014-05-08T06:50:33Z",
   "nextFireTime": "2014-05-08T06:50:40Z",
   "misfireInstruction": 2,
   "priority": 5,
   "cronEx": {
      "cronExpression": "*/20 * * * * ?",
      "timeZone": "Universal"
   },
   "jobDataMap": {
      "batchJobParameters": {
         "foo": "bar",
         "baz(long)": "qux"
      },
      "batchJobName": "testEvery20sec"
   }
}

【トリガ削除】DELETE /triggers/{triggerGroupName}/{triggerName}

request parameter
種類 名前 必須 説明
パス triggerGroupName Yes 削除対象のグループ名
パス triggerName Yes 削除対象のトリガ名
response code
code 備考
200 OK 削除成功
404 Not Found 削除対象が見つからなかった
500 Internal Server Error バグorz
response body
キー 説明 典型的な値
content 特に返す内容は無い null

発火時メッセージ

キー 説明 典型的な値
brianVersion Brianのバージョン 0.5
fireTime この発火が実際に発生した時刻 2014-05-08T06:54:20Z
scheduledFireTime この発火が本来予定されていた時刻 2014-05-08T06:54:40Z または null
nextFireTime この発火源となったトリガの次回発火予定時刻 2014-05-08T06:54:40Z
refireCount TODO 0
recovering TODO false
fireInstanceId 複数ある(可能性のある)インスタンスのうち、発火を担当したインスタンスID LOCAL_1399532031174 i-XXXXXXXX_1399532031174
trigger この発火源となったトリガの情報(トリガの種類によってスキーマが異なる) TODO
jobData トリガに登録されたjobData

{
   "brianVersion": "0.5",
   "fireTime": "2014-05-08T06:54:20Z",
   "scheduledFireTime": "2014-05-08T06:54:20Z",
   "nextFireTime": "2014-05-08T06:54:40Z",
   "refireCount": 0,
   "recovering": false,
   "fireInstanceId": "LOCAL_1399532031174",
   "trigger": {
      "timeZone": "Universal",
      "cronExpression": "*/20 * * * * ?",
      "group": "DEFAULT",
      "name": "testEvery20sec",
      "misfireInstruction": 2,
      "description": "Sample trigger request - fire every 20 seconds.",
      "startTime": "2014-05-08T06:54:05Z"
   },
   "jobData": {
      "batchJobParameters": {
         "foo": "bar",
         "baz(long)": "qux"
      },
      "batchJobName": "testEvery20sec"
   }
}

以上

脚注

  1. 「jdbc:mysql://DBホスト名:3306/brian」など。
  2. RDSを作るので少々長め。