AWS SDK for Javaを使う#Amazon SQS

2012.02.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

いまやクラウドサービスの代表格とも言えるAmazonですが、EC2やS3をはじめとして、さまざまなサービスを提供しています。数年前に私も少しだけEC2やS3を使用したことがあるのですが、最近はあまりさわっていませんでした。 しかし今回AWSについての調査をきっかけに、各種AWSサービスについて復習&AWS SDKでの動作確認をしていきたいと思います。 今回はクラウドベースのメッセージングサービス、Amazon SQSです。

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.7.2
  • Java : 1.6.0_26
  • Scala : 2.9.1 final
  • SBT : 0.11.2

なお、AWSへの登録は終わっているものとします。

Amazon SQS

アプリケーションの実行で時間のかかる処理非同期で行いたいときにはどのようにするでしょうか。 最近は非同期通信用の実装もいろいろと出てきたのでそういったものを使用することもありますが、 もっとシンプルにメッセージキューを使用するという方法があります。

既存のオンプレミス環境でメッセージキューを使用するとした場合、サーバにActiveMQやRabbitMQなどをインストールしていました。 クラウド環境でメッセージキューを使用したい場合、Amazon SQSを使用することができます。 Amazon SQSは、JMSを使用するような感覚で簡単にメッセージキューを使えます。(Amazon SQSはJMS実装ではありません)

Amazon SQS には以下のような特徴があります。

  • 無制限の数のメッセージと共に、無制限の数の Amazon SQS キューを作成可能。
  • 1つのキューに対して複数のプロセスが読み取り/書き込み可能。
  • 料金は、使用した分に対して発生。価格はこのあたりを参照。
  • メッセージ本文には、最大64KBのテキストを任意のフォーマットで含めることが可能。
  • メッセージは、最大14日間キューに保持できる。
  • Amazon SQS を他者と安全に共有することが可能で、キューは他のAWSアカウントと匿名で共有できる。
  • キューの共有は、IP アドレスや時刻によっても制限することができる。

JSMなどに比べ、とにかくシンプルですぐに使うことができます。 それではサンプルを動かしてみましょう。

実行環境のセットアップ

以前と同じくsbt + scala + aws-sdk-javaをセットアップしておきましょう。

SQSサンプル作成

src/main/scalaディレクトリに、SQS.scalaファイルを作成してください。 下記がSQSのサンプルの全文です。キューの作成、キューへの送信と受信、キューの削除を実施しています。 なお、下記プログラムを連続して行うと、同じ名前キューの作成と削除を連続しておこなっているので時間をあけるように言われるかもしれません。その場合はしばらくしてから再度実行してください。

import com.amazonaws._
import com.amazonaws.auth._
import com.amazonaws.services.sqs.AmazonSQS
import com.amazonaws.services.sqs.AmazonSQSClient
import com.amazonaws.services.sqs.model._

import scala.collection.JavaConversions._

object SQSMain extends App {

  //アクセスキー
  val accessKey = "アクセスキー"
  //シークレットキー
  val secretKey = "シークレットキー"

  val credentials = new BasicAWSCredentials(accessKey,secretKey)

  //SQSオブジェクト作成
  println("create AmazonSQSClient ")
  val sqs = new AmazonSQSClient(credentials)

  //キューを作成
  println("Creating a new SQS queue called MyQueue")
  val createQueueRequest = new CreateQueueRequest("MyQueue")
  val myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl()
  println("myQueueUrl=" + myQueueUrl)
  
  //キュー一覧を取得
  println("Listing all queues in your account")
  for (queueUrl <- sqs.listQueues().getQueueUrls()) {
    println("QueueUrl: " + queueUrl);
  }

  //キューにメッセージ送信(10件)
  println("Sending a message to MyQueue")
  for(i <- 1 to 10) {
	sqs.sendMessage(new SendMessageRequest(myQueueUrl, "send text. Seq=" + i.toString))
  }

  // メッセージ受信. withMaxNumberOfMessagesで受信するメッセージ数を指定
  println("Receiving messages from MyQueue")
  val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl).withMaxNumberOfMessages(10)
  val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
  //10回ループをまわし、メッセージ受信を試みる.
  for(i <- 0 to 10) {    
    for (message <- messages) {
      println(" Message");
      println("    MessageId:     " + message.getMessageId())
      println("    ReceiptHandle: " + message.getReceiptHandle())
      println("    MD5OfBody:     " + message.getMD5OfBody())
      println("    Body:          " + message.getBody())
      for (entry <- message.getAttributes().entrySet()) {
	  println("  Attribute")
  	  println("    Name:  " + entry.getKey())
  	  println("    Value: " + entry.getValue())
      }
    }
    Thread.sleep(1000)
  }

  //キュー削除
  println("Deleting the test queue.\n");
  sqs.deleteQueue(new DeleteQueueRequest(myQueueUrl))
}

プログラムの記述ができたら、sbtを起動してSQSMainクラスを実行してみてください。 キューへのメッセージ送信と受信されたデータが表示されます。

$ sbt
>run

ではポイントの説明です。 SQSに関しても他のサービスと同じく、Clientクラスを作成してそのメソッドを使っています。 AmazonSQSClientのインスタンスを作成したらcreateQueueメソッドで新しいキューを作成します。 なお、同名のキューがすでにあればそのハンドルが返されます。

  val sqs = new AmazonSQSClient(credentials)
  //キューを作成
  println("Creating a new SQS queue called MyQueue")
  val createQueueRequest = new CreateQueueRequest("MyQueue")
  val myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl()
  println("myQueueUrl=" + myQueueUrl)

キューへのメッセージ送信はsendMessageメソッドへSendMessageRequestオブジェクトを渡して実現しています。 メッセージ受信はreceiveMessageメソッドを使用して取得します。取得するメッセージの件数は、最大withMaxNumberOfMessagesで指定した数だけ取得できます。

  //キューにメッセージ送信(10件)
  println("Sending a message to MyQueue")
  for(i <- 1 to 10) {
	sqs.sendMessage(new SendMessageRequest(myQueueUrl, "send text. Seq=" + i.toString))
  }

  // メッセージ受信. withMaxNumberOfMessagesで受信するメッセージ数を指定
  println("Receiving messages from MyQueue")
  val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl).withMaxNumberOfMessages(10)
  val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
  //10回ループをまわし、メッセージ受信を試みる.
  for(i <- 0 to 10) {    
    for (message <- messages) {
      println(" Message");
      ・・・・・
    }
    Thread.sleep(1000)
  }

実際に実行してみるとわかりますが、一度に10件取得できることもあれば、10回のループ中に受信できないメッセージがあることもあります。また、必ず送信した順番で受信できるとは限りません。 実際にアプリケーションで利用するには、そのあたりを考慮して設計する必要があります。

まとめ

今回はクラウドメッセージングサービス、Amazon SQSを使用してみました。 非常に簡単にキューの作成から送信、受信が使用できました。 SQSは大きいデータを扱えませんが、AWSの他のサービス(SimpelDBなど)と連携することでカバーできるので、幅広い用途で使用できそうです。

参考サイトなど