AWSでジョブWorkerを構成するベストプラクティス 〜 Brianの巻
よく訓練されたアップル信者、都元です。ここまで「AWSでジョブWorkerを構成するベストプラクティス」と題して2本の記事を書いてまいりました。SQSの巻では、バックグラウンドジョブの実行にはSQSを使ってスケーラビリティを確保しようという、下図中の中段の話をしました。続いてBeanstalk worker tierの巻では、Workerの実装方法として、図中の下段の話をしました。今回は、上段の話です。
ジョブスケジューリング
ここまでの話では「ジョブは、ユーザからのWebサーバへのリクエストに伴って発生する」という暗黙の前提で進めてきました。具体的には動画のエンコーディングや、少々時間の掛かる集計処理等をイメージして頂いたと思います。
しかし、ジョブというのは「ユーザのリクエスト」に起因して発生するばかりではありません。ユーザからのリクエストに関わらず「定期的な繰り返し」という、スケジュールに基づいた発生があります。例えば日次・月次等の自動集計処理や、定時連絡的なメール送信等が考えられます。
このようなジョブで皆さんが真っ先に思い浮かべるのがcronでしょう。では直感に従って、cronで10分に一度、SQSにメッセージを送るトリガサーバを思い浮かべてください。
トリガサーバの冗長化
ところで、クラウドの売りの一つは「スケーラビリティ(拡張性)」です。これは単純に、HTTPリクエスト処理要求の増加に伴ってWebサーバの数を増やすだけの話ではありません。昨日ご紹介したように、ロードバランサ、キュー、ワーカー、そして(RDSであれば限定的ですが)データベースのように、システム内の各コンポーネントについて、スケールアウトできる構成が期待されます。また「アベイラビリティ(可用性)」の確保のために、各コンポーネントはAZをまたいで冗長化される必要があります。
このcronによるトリガの仕組みは「スケーラビリティ(拡張性)」面において、どうでしょうか。仮にトリガの数と頻度が爆発的に増加しても、スケールアウトによる性能の確保は可能でしょうか。まぁ、1つ1つのcronの処理は小さな負荷ですので、複数のトリガサーバで分担すればどうにかなるかもしれません。が、要するにシャーディングのようなことをするわけですから、簡単にスケールアウトするとは言い切りづらいところがあります。
まぁまぁ、トリガの発火なんて大した負荷ではありませんので、スケーリングが追いつかない程の大量発火はあまり想像できないので、とりあえず良しとしましょう。しかし、では「アベイラビリティ(可用性)」面についてはどうでしょうか。冗長性の確保のため、同じトリガサーバを2つ作った途端、トリガの発火も二重化されることになります。まぁ、SQSの特性上、同じジョブが複数回発火されても良いような作りにしなければならない、というのは以前お話しした通りですが…。それにしてもほぼ必ず2回以上のメッセージが飛んでくる、というのはコレジャナイ感が激しいのではないでしょうか。
以上より、私的には「AWSネイティブな設計を前提にする場合、cronは選択肢から外れる」という結論になりました。まぁ、使うなってことじゃないんですけど、残念な制限事項が残っちゃうことを受け入れる必要がありますね。
AWS上のトリガとして望まれるもの
ではこのようなジョブスケジューリングをAWSネイティブに実装する場合、どうすれば良いのでしょうか。私は「AWSがサービスとして提供してくれる」のが一番良いと考えていますが、残念ながら本稿執筆時点で、このようなサービスはありません。Amazon Elastic Job Schedulerとかがあればなー。cron表記に従って、そのスケジュールでSNS Topicにメッセージを投げてくれるだけでいいんですけど…。
AWSさん、ご検討よろしくお願いします。
Brian
まぁ、無い物ねだりを続けても進歩が無いので。作りました。Brianと呼んでください。
構成図だけでは伝わりにくいと思うのですがとりあえず。複数のAZにまたがって、複数のEC2インスタンスが起動しています。これらのなかではジョブスケジューラが動いています。すなわちこれらはトリガサーバです。しかし同じトリガが複数台によって発火されてしまわないよう、RDSを介して同期を行っています。発火予定時刻の数秒前に、各トリガサーバが早い者勝ちで発火の担当決めを行います。そしてトリガ発火のタイミングで、SNSにメッセージを投げるようになっています。その先にSQSをつないでおけば、ジョブWorkerまでの道はひらけますね。
そもそものトリガの登録や削除は、トリガサーバにREST風のWeb APIがありますので、そこを叩くことによって実現します。トリガ情報はRDSに書き込まれるため、複数台ある中の何れか1つのAPIさえ叩ければ登録は完了します。というわけで、InternalのELB(VPC内からしか叩けないELB)を介して操作します。
と、偉そうに説明してみましたが、この仕組みはJavaのオープンソースライブラリQuartzを使って実装しています。トリガの登録や削除の操作をWeb APIでラップし、トリガの発火時の処理としてSNSへのメッセージ送信を実装しただけです。詳細はQuartzのドキュメントを御覧ください(丸投げ)。
やってみよう
ここではまず、ざっとポイントを抑えましょう。細かくは色々あるので、実際にBrianを本格的に使ってみようとする際には、次節の詳細仕様をきちんと押さえておいてください。
まず、BrianはJava7で実装されたWebアプリケーションです。warファイルはS3に置いてあります。そして、Elastic Beanstalk (Web tier) で動かすことを前提に考えられています。まぁ、その他環境で動かすことももちろんできますが、色々メンドクサイと思います。
前述の通り、ELBはInternalにします。Brianには認証機構が何もないので、外部公開するのは危険です。ヘルスチェックのエンドポイントは/healthcheckです。
BrianはSNSにメッセージをpublishします。なので、publish先のSNS Topicが必要です。作っておいて、そのARNを控えておいてください。
また、publish操作のためのAPIキーはEC2のInstance Profile経由で取得します。つまり、Brianが動くインスタンスにはInstance Profileが必要です。ざっくりとPowerUserでも良いですし、下記のとおり、きっちり締めあげても構いません。Resourceの項には、先ほど控えたARNを記述します。
{ "Statement": [ { "Sid": "BrianStmt", "Effect": "Allow", "Action": "sns:Publish", "Resource": "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-topic" } ] }
その他、前述の通り、同期用のMySQLが必要なので、RDSで構築する必要があります。
JavaのシステムプロパティとしてJDBC_CONNECTION_STRING, DB_USERNAME, DB_PASSWORD, BRIAN_TOPIC_ARNの4つの値が必要です。それぞれ、JDBC接続URI *1、DBユーザ名、DBパスワード、発火時のメッセージ送信先SNS Topic ARNを指定します。
って素で聞くと嫌になるもんですよね。そういう時のためのCloudFormationです。
パラメータは以下のとおりです。
- VPC: VPC ID(デフォルトVPCのIDを入れましょう)
- VPCCIDR: VPCのCIDRブロック(デフォルトVPCのブロックを入れましょう)
- Subnet1: サブネットIDその1(デフォルトVPCの(ry)
- Subnet2: サブネットIDその2(デフォルトVPCの(ry)
- DBUsername: RDSのマスターユーザ名
- DBPassword: RDSのマスターパスワード
- KeyName: ECのキーペア名。EC2へのSSH接続用。
- BrianVersion: Brianのバージョンです。今のところ0.5で。
あとはデフォルトのままでContinueボタン連打で構いません。構築所要時間は20〜30分 *2です。環境内にbastion(踏み台)が立ち上がりますので、まずはそこにSSH接続します。
続いて、bastionのhome直下に、下記のようなJSONファイルを作って、testEvery20sec.jsonとして保存してください。
{ "triggerName": "testEvery20sec", "scheduleType": "cron", "cronEx": "*/20 * * * * ?", "timeZone": "Universal", "jobData": { "foo": "bar", "baz": "qux" } }
次に、BeanstalkのManagement Consoleを覗いて、ステータスがGreenであることを確認しつつ、beanstalkのURLをひかえておきます。
そのURLを使って、下記のようなコマンドを叩くことによってトリガの登録ができます。上記のJSONをPOSTしているだけですね。
$ curl -v -H "Content-type: application/json" \ -X POST http://brian-demo-XXXXXXXXXX.elasticbeanstalk.com/triggers/DEFAULT \ -d @every20sec.json * About to connect() to brian-demo-XXXXXXXXXX.elasticbeanstalk.com port 80 (#0) * Trying 172.31.24.186... * Connected to brian-demo-XXXXXXXXXX.elasticbeanstalk.com (XX.XX.XX.XX) port 80 (#0) > POST /triggers/DEFAULT HTTP/1.1 > User-Agent: curl/7.29.0 > Host: brian-demo-XXXXXXXXXX.elasticbeanstalk.com > Accept: */* > Content-type: application/json > Content-Length: 166 > * upload completely sent off: 166 out of 166 bytes < HTTP/1.1 201 Created < Content-Type: application/json;charset=UTF-8 < Date: Mon, 19 May 2014 04:43:54 GMT < Server: Apache-Coyote/1.1 < transfer-encoding: chunked < Connection: keep-alive < * Connection #0 to host brian-demo-XXXXXXXXXX.elasticbeanstalk.com left intact {"success":true,"message":null,"content":{"nextFireTime":"2014-05-19T04:44:00Z"}}
JSONでtestEvery20secというトリガ名を指定しています。また、(世界標準時解釈で)20秒に一度発火するトリガーを定義しています。レスポンスとして、次回の発火予定時刻が返っているのが分かると思います。
このPOSTを行った結果、SNSトピックを介して、SQSキューにみるみるメッセージが溜まっていくのが観察できると思います。興味のある方はSNSトピックをemailでsubscribeしてみると、20秒に1回メールが届いて楽しいです。SQSまで届いたメッセージの一つを抜粋・整形して下記に引用しておきます。
{ "Type" : "Notification", "MessageId" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "TopicArn" : "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-demo-BrianTopic-XXXXXXXXXXXXX", "Message" : "{\"brianVersion\":\"0.5\",\"fireTime\":\"2014-05-19T04:44:01Z\",\"scheduledFireTime\":\"2014-05-19T04:44:00Z\",\"nextFireTime\":\"2014-05-19T04:44:20Z\",\"refireCount\":0,\"recovering\":false,\"fireInstanceId\":\"i-XXXXXXXX_1400469148294\",\"trigger\":{\"timeZone\":\"Universal\",\"cronExpression\":\"*/20 * * * * ?\",\"group\":\"DEFAULT\",\"name\":\"testEvery20sec\",\"misfireInstruction\":0,\"startTime\":\"2014-05-19T04:43:54Z\"},\"jobData\":{\"baz\":\"qux\",\"foo\":\"bar\"}}", "Timestamp" : "2014-05-19T04:44:02.719Z", "SignatureVersion" : "1", "Signature" : "BfxBLF9vPfhqeEPOjM2xeBYCJdJ7F2T6MDH7W9MAjuY69VEXAMPLE==", "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-e372f8ca30337fdb084e8ac449342c77.pem", "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:brian-demo-BrianTopic-XXXXXXXXXXXXX:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" }
このフォーマットはHTTP/HTTPS Notification JSON Formatに定義されたものです。電子署名により、正規のルートから配信されたメッセージであるかどうか検証することも可能です。
で、Messageの値として入れ子のJSONが入っているのでちょっと見づらいのですが。こちらを整形してみると下記の通りです。
{ "brianVersion": "0.5", "fireTime": "2014-05-19T04:44:01Z", "scheduledFireTime": "2014-05-19T04:44:00Z", "nextFireTime": "2014-05-19T04:44:20Z", "refireCount": 0, "recovering": false, "fireInstanceId": "i-XXXXXXXX_1400469148294", "trigger": { "timeZone": "Universal", "cronExpression": "*/20 * * * * ?", "group": "DEFAULT", "name": "testEvery20sec", "misfireInstruction": 0, "startTime": "2014-05-19T04:43:54Z" }, "jobData": { "baz": "qux", "foo": "bar" } }
fireTimeは実際の発火時刻、scheduledFireTimeは本来発火予定だった時刻、です。僅かなズレがありますが、分散型である以上許容せざるを得ない誤差です。またjobDataの値としては、トリガ作成時に登録したパラメータがそのまま送られてきています。任意のデータを設定して、好きに使って良い部分です。
まとめ
ジョブスケジューラは、AWS上でHA実装が難しい仕組みでした。この仕組みを導入することにより、高い信頼性を持ったジョブスケジューラを容易に構築できます。
というわけで、あらためまして、ジョブWorker三部作はこれにて完結です。
- AWSでジョブWorkerを構成するベストプラクティス 〜 SQSの巻
- AWSでジョブWorkerを構成するベストプラクティス 〜 Beanstalk worker tierの巻
- AWSでジョブWorkerを構成するベストプラクティス 〜 Brianの巻 (これ)
詳細仕様
以下、長いので付録だと思ってください。本気で使う人用の情報です。
ロケーション
アプリケーション自体はcm-public-eb-applicationsバケットのbrian/brian-0.5.warに公開状態で配置してあります。ご自由に。
デモ起動用のCloudFormationテンプレートの内容もあわせてご確認ください。
起動時のシステムプロパティ
Brianは、動作に必要なパラメータをシステムプロパティとして受け取ります。
キー | 必須 (default) | 説明 | 典型的な値 |
---|---|---|---|
JDBC_CONNECTION_STRING | Yes | MySQLのJDBC接続URL | jdbc:mysql://localhost:3306/brian |
DB_USERNAME | Yes | DB接続ユーザ名 | |
DB_PASSWORD | Yes | DB接続パスワード | |
BRIAN_TOPIC_ARN | Yes | 発火時のメッセージpublish対象SNSトピックARN | arn:aws:sns:ap-northeast-1:000000000000:brian-topic |
aws.accessKeyId | No | SNS publish に利用するAPIキー。無い場合はInstance Profileを利用。 | AKIAXXXXXXXXXXXXXXXX |
aws.secretKey | No | SNS publish に利用するAPIシークレット。無い場合はInstance Profileを利用。 | |
spring.profiles.active | No (production) | 起動プロファイル。動きの違いは、今のところログ出力の詳細化だけ。 | production / development のいずれか |
logback.configurationFile | No | logbackのログ設定ファイルのフルパス | /etc/brian/logback.xml |
トリガ操作エンドポイント
一般的なリクエスト・レスポンス
時刻はISO形式(例: 2014-05-08T06:50:33Z)を用いる。
各エンドポイントは、特に明示のない限り、下記のようなJSON形式のレスポンスを返す。
キー | 説明 | 典型的な値 |
---|---|---|
success | リクエストが成功したかどうか | true |
message | 何かメッセージがあれば | null |
content | レスポンスの本体 | (エンドポイント毎に異なる) |
例1
{ "success": true, "message": null, "content": ... }
例2
{ "success": false, "message": "404 - Not Found", "content": null }
【グループ一覧】GET /triggers
request parameter
なし
response code
code | 備考 |
---|---|
200 OK | 成功 |
500 Internal Server Error | バグorz |
response body
キー | 説明 | 典型的な値 |
---|---|---|
content | グループ名文字列の配列 | ["foo","bar"] |
例
{ "content": [ "DEFAULT" ], "message": null, "success": true }
【グループ内トリガ一覧】GET /triggers/{triggerGroupName}
request parameter
種類 | 名前 | 必須 | 説明 |
---|---|---|---|
パス | triggerGroupName | Yes | 一覧対象グループ名 |
response code
code | 備考 |
---|---|
200 OK | 成功 |
500 Internal Server Error | バグorz |
response body
キー | 説明 | 典型的な値 |
---|---|---|
content | トリガ名文字列の配列 | ["foo","bar"] |
【トリガ作成】POST /triggers/{triggerGroupName}
request paramter
種類 | 名前 | 必須 | 説明 |
---|---|---|---|
パス | triggerGroupName | Yes | 作成するグループ名 |
request body
キー | 必須 (default値) | 説明 | 典型的な値 |
---|---|---|---|
triggerName | Yes | 作成するトリガ名 | testEvery20sec |
scheduleType | Yes | トリガ発火パターンの表現方法
|
cron / simple / oneshot |
priority | No (5) | 優先度 | 5 |
description | No (null) | 説明(任意) | |
startAt | No (現在時刻) | 有効開始時刻(現在時刻がこの時刻以降である場合トリガが有効となる) | |
endAt | No (null) | 有効終了時刻(現在時刻がこの時刻以前である場合トリガが有効となる) | |
misfireInstruction | No (トリガ毎に異なる) | TODO | |
jobData | No (null) | 発火時メッセージの乗せるデータ | |
cronEx | scheduleType = cronの場合はYes そうでない場合はNo | cron表記 | */20 * * * * ? |
timeZone | No (Universal) | (scheduleType = cronの場合のみ有効)cron表記を解釈するタイムゾーン | Universal |
repeatInterval | No (0) | (scheduleType = simpleの場合のみ有効)繰り返し間隔ミリ秒 | 20000 |
repeatCount | No (1) | (scheduleType = simpleの場合のみ有効)繰り返し回数 | 10 |
例
{ "triggerName": "testEvery20sec", "description": "Sample trigger request - fire every 20 seconds.", "scheduleType": "cron", "misfireInstruction": "DO_NOTHING", "cronEx": "*/20 * * * * ?", "timeZone": "Universal", "endAt": "2015-04-11T20:00:00Z", "jobData": { "batchJobName": "testEvery20sec", "batchJobParameters": { "foo" : "bar", "baz(long)" : "qux" } } }
response code
code | 備考 |
---|---|
201 Created | 作成成功 |
400 Bad Request | リクエストBody内のCron表記異常 |
404 NotFound | リクエストBody異常 |
409 Conflict | 同名の既存トリガがある |
500 Internal Server Error | バグorz |
response body
キー | 説明 | 典型的な値 |
---|---|---|
content.nextFireTime | 次回発火予定時刻 | 2014-05-08T15:16:00Z |
例
{ "content": { "nextFireTime": "2014-05-08T15:16:00Z" }, "message": null, "success": true }
【トリガ更新】PUT /triggers/{triggerGroupName}/{triggerName}
request parameter
種類 | 名前 | 必須 | 説明 |
---|---|---|---|
パス | triggerGroupName | Yes | 更新対象のグループ名 |
パス | triggerName | Yes | 更新対象のトリガ名 |
request body
POST /triggers/{triggerGroupName} と同様
response code
code | 備考 |
---|---|
200 OK | 更新成功 |
400 Bad Request | リクエストBody内のCron表記異常 |
404 NotFound | 更新対象のトリガが無い リクエストBody異常 |
500 Internal Server Error | バグorz |
response body
POST /triggers/{triggerGroupName} と同様
【トリガ情報参照】GET /triggers/{triggerGroupName}/{triggerName}
request parameter
種類 | 名前 | 必須 | 説明 |
---|---|---|---|
パス | triggerGroupName | Yes | 参照対象のグループ名 |
パス | triggerName | Yes | 参照対象のトリガ名 |
response code
code | 備考 |
---|---|
200 OK | 成功 |
500 Internal Server Error | バグorz |
response body
このエンドポイントだけ一般的なresponse bodyではないので注意。
キー | 説明 | 典型的な値 |
---|---|---|
group | トリガグループ名 | |
name | トリガ名 | |
description | 説明文 | |
startTime | 有効開始時刻(現在時刻がこの時刻以降である場合トリガが有効となる) | 2014-05-08T06:50:33Z |
endTime | 有効終了時刻(現在時刻がこの時刻以前である場合トリガが有効となる) | |
nextFireTime | 次回発火予定時刻 | |
jobDataMap | 発火時メッセージの乗せるデータ | |
misfireInstruction | 2 | |
priority | 5 | |
cronEx.cronExpression | (cronトリガの場合のみ)cron表記 |
*/20 * * * * ? |
cronEx.timeZone | (cronトリガの場合のみ)cron表記を解釈するタイムゾーン | Universal |
例
{ "group": "DEFAULT", "name": "testEvery20sec", "description": "Sample trigger request - fire every 20 seconds.", "startTime": "2014-05-08T06:50:33Z", "nextFireTime": "2014-05-08T06:50:40Z", "misfireInstruction": 2, "priority": 5, "cronEx": { "cronExpression": "*/20 * * * * ?", "timeZone": "Universal" }, "jobDataMap": { "batchJobParameters": { "foo": "bar", "baz(long)": "qux" }, "batchJobName": "testEvery20sec" } }
【トリガ削除】DELETE /triggers/{triggerGroupName}/{triggerName}
request parameter
種類 | 名前 | 必須 | 説明 |
---|---|---|---|
パス | triggerGroupName | Yes | 削除対象のグループ名 |
パス | triggerName | Yes | 削除対象のトリガ名 |
response code
code | 備考 |
---|---|
200 OK | 削除成功 |
404 Not Found | 削除対象が見つからなかった |
500 Internal Server Error | バグorz |
response body
キー | 説明 | 典型的な値 |
---|---|---|
content | 特に返す内容は無い | null |
発火時メッセージ
キー | 説明 | 典型的な値 |
---|---|---|
brianVersion | Brianのバージョン | 0.5 |
fireTime | この発火が実際に発生した時刻 | 2014-05-08T06:54:20Z |
scheduledFireTime | この発火が本来予定されていた時刻 | 2014-05-08T06:54:40Z または null |
nextFireTime | この発火源となったトリガの次回発火予定時刻 | 2014-05-08T06:54:40Z |
refireCount | TODO | 0 |
recovering | TODO | false |
fireInstanceId | 複数ある(可能性のある)インスタンスのうち、発火を担当したインスタンスID | LOCAL_1399532031174 i-XXXXXXXX_1399532031174 |
trigger | この発火源となったトリガの情報(トリガの種類によってスキーマが異なる) TODO | |
jobData | トリガに登録されたjobData |
例
{ "brianVersion": "0.5", "fireTime": "2014-05-08T06:54:20Z", "scheduledFireTime": "2014-05-08T06:54:20Z", "nextFireTime": "2014-05-08T06:54:40Z", "refireCount": 0, "recovering": false, "fireInstanceId": "LOCAL_1399532031174", "trigger": { "timeZone": "Universal", "cronExpression": "*/20 * * * * ?", "group": "DEFAULT", "name": "testEvery20sec", "misfireInstruction": 2, "description": "Sample trigger request - fire every 20 seconds.", "startTime": "2014-05-08T06:54:05Z" }, "jobData": { "batchJobParameters": { "foo": "bar", "baz(long)": "qux" }, "batchJobName": "testEvery20sec" } }
以上