Amazon SQSをColdFusion10とAWS SDK for javaの組み合わせで操作する #3 | receiveMessage & deleteMessage

2013.08.20

前回はメッセージの送信まででしたので、今回はその続きで実際にメッセージの受信を行い、その後処理が終わったメッセージを削除するまでをやってみたいと思います。

動作させた環境

  • Windows7 64Bit
  • MySQL5.5
  • ColdFusion10 デベロッパー版
  • AWS SDK for java 1.5.3

処理の流れ

  1. receiveMessageを投げる
  2. 受信したメッセージ数分のループをさせる(複数取得できる場合を考慮)
  3. JOB 管理テーブルに対して処理開始時間をアップデートする
  4. なんらかの処理をさせる(ここでは10秒sleepさせて処理した様に待ちを入れる)
  5. JOB 管理テーブルに対して処理終了時間をアップデートする
  6. deleteMessageを投げる
  7. 1〜6までの動作を確認できたらColdFusionの管理画面でスケジュールタスクとして登録する

以下がサンプルコード( aws_sqs_receive.cfm )

<cfsetting enablecfoutputonly="true">
<cfcontent type="text/html; charset=utf-8">
<cfprocessingdirective pageEncoding="utf-8" suppressWhitespace="true">

<cflock scope = "Session" timeout = "30" type = "Exclusive">
    <cfset aws_AK = #accessKeyId#> <!--- Your Access Key --->
	<cfset aws_SK = #secretAccessKey#> <!--- Your Secret Key --->
	<cfset APL_Datasource = #Datasource#>
	<cfset APL_QueueName = #QueueName#>
	<cfset APL_SQL_Url = #SQS_Url#>
</cflock>

<cfoutput>
<html lang="ja">
<head>
    <meta charset="utf-8">
    <title>Coldfusion + AWS SDK for java SQS receiveMessage and deleteMessage Sample</title>
</head>

<body>

<cfset awscreds = createObject("java","com.amazonaws.auth.BasicAWSCredentials").init(aws_AK,aws_SK)>
<cfset SQSClient = createObject("java","com.amazonaws.services.sqs.AmazonSQSClient").init(awscreds)>

<cfset sqs_queue_url = #APL_SQL_Url# & #APL_QueueName#>

<!--- 初期化 --->
<cfset SQSreceiveRequest = createObject("java","com.amazonaws.services.sqs.model.ReceiveMessageRequest").init(sqs_queue_url)>

<!--- メッセージ受信 --->
<cfset sqsReceiveMessage = SQSClient.receiveMessage(SQSreceiveRequest).getMessages()>
<cfset iterator = sqsReceiveMessage.iterator()>

<!--- 受け取ったメッセージの数だけループさせる(基本的には受信は0個か1個ですが念のため) --->
<!--- メッセージを受け取れなかった場合は何もせずに抜ける --->
<cfloop condition="iterator.hasNext()">
    <cfset currentObj = iterator.next() />
    <cfset startDate = dateFormat(now(), 'YYYY年MM月DD日') & TimeFormat(now(), 'HH時mm分ss秒')>

    <!--- SQS JOB管理テーブルに対してmessageIDのレコードを処理中としてアップデートする --->
    <cfquery name="upd_sqs_job" datasource="#APL_Datasource#">
    	update sqs_job set status = 'processing', StartDate = '#startDate#' where sqs_message_id = '#currentObj.getMessageId()#'
    </cfquery>

    <!--- 何かの処理をするロジックを入れればいいのですが、面倒なので10秒のスリープを強制で入れる --->
    <!--- たとえばPDF変換処理や画像変換処理等 --->
    <cfset thread = CreateObject("java", "java.lang.Thread") />
    <cfset thread.sleep(10000) />

    <!--- 処理が終わったと仮定して処理終了時間をセットする --->
    <cfset endDate = dateFormat(now(), 'YYYY年MM月DD日') & TimeFormat(now(), 'HH時mm分ss秒')>
    <cfquery name="upd_sqs_job" datasource="#APL_Datasource#">
    	update sqs_job set status = 'terminated', EndDate = '#endDate#' where sqs_message_id = '#currentObj.getMessageId()#'
    </cfquery>

    <!--- 処理が終了したので、SQSのメッセージを削除する --->
	<cfset messageReceiptHandle = sqsReceiveMessage.get(0).getReceiptHandle()>
	<cfset SQSdeleteRequest = createObject("java","com.amazonaws.services.sqs.model.DeleteMessageRequest").init()>
	<cfset sqsDelete = SQSdeleteRequest.setQueueUrl(sqs_queue_url)>
	<cfset sqsDelete = SQSdeleteRequest.setReceiptHandle(messageReceiptHandle)>

	<cfset sqs_delete_test = SQSClient.deleteMessage(SQSdeleteRequest)>
	メッセージID:#currentObj.getMessageId()# をキューから削除しました。
</cfloop>

</cfoutput>
</cfprocessingdirective>
<cfsetting enablecfoutputonly="false">

</body>
</html>

ColdFusionの管理画面にアクセスし、スケジュールされたタスクに受信用のテンプレート( aws_sqs_receive.cfm )を登録しておきます。

aws_sqs8

まとめ

これで最低限のSQSの機能

  • キューの作成
  • メッセージの受信
  • メッセージの送信
  • メッセージの削除

までを実際に使ってみて理解することができました。画像変換処理やPDF変換処理等をちゃんと組み込んで受信側のテンプレート( aws_sqs_receive.cfm )の実行間隔等(サンプルでは5分間隔で設定)を調整すればSQSと連携したWebシステムが簡単に出来そうです。もしくはJOB管理D/Bでstatusがprocessingになっているものがあれば処理中なので、そのレコードが存在する場合は一旦何もせずに処理を抜けるって言うことでも対応可能かもしれません。
まだまだAWSのサービスはたくさんあるので、次はAmazon SNSをやってみようと思います。