Rubyを使ってAmazon SNSとSQSを操作する:fanoutパターン
AWS SDK for Ruby
今年の7月にAWS SDK for Rubyがリリースされました。いつもはフルセットで用意されているJavaを使っているのですが、今回はRubyがどの程度簡単に記述できるのか確認するためにサンプルを作りました。また、サンプル作成にあたって、SNSとSQSの連携を試しました。
Amazon LinuxでAWS SDK for Rubyのセットアップ
早速セットアップしましょう。
$ sudo yum install ruby-devel rubygems gcc make libxml2-devel libxslt-devel aws-sdk
Amazon SNSをセットアップする
通知先としてAmazon SNSのトピックを作成します。トピックは、受付窓口として対応します。受け付けられたメッセージはサブスクライバ(購読者)に渡されます。サブスクライバには、HTTP/HTTPS/EMail/Email(JSON)/SQSを複数指定することができます。
早速トピックを作成してみましょう。
Amazon SQSをセットアップする
Amazon SNSに通知されたメッセージをSQSに渡すためにセットアップします。まずは、管理コンソールからSQSを選択してキューの新規作成をします。
SNSとSQSを連携させる
SNSとSQSを関連づけするのも簡単です。メニューから選択します。
既に作成済みのSNSを指定することができます。これは便利ですね。SNS画面からSQSもドロップボックスから指定したいですね。まぁそのうち追加されているのがAWSの凄いところ。
改めてSNSの画面を見ますと、SQSが登録されているのが分かります。
Amazon SNSに通知する
セットアップが終わりましたので、AWS SDK fro Rubyを使ってAmazon SNSにアクセスしたいと思います。短いシェルを書いて呼び出しています。
$ sudo vi sns-put.sh #!/usr/bin/ruby require 'rubygems' require 'aws-sdk' AWS.config(:access_key_id => '************************', :secret_access_key => '************************', :sns_endpoint => 'sns.ap-northeast-1.amazonaws.com') topic = AWS::SNS::Topic.new('arn:aws:sns:ap-northeast-1:***********:'+ARGV[0]) topic.publish('', :subject => ARGV[1], :email => ARGV[2])
作ったシェルを呼び出すときは、引数にトピック名、タイトル、メッセージを記述します。
$ ./sns-put.sh topic-sample "notice" "This is notice from instance-id xxxxxxx"
Amazon SQSのキューからメッセージを取り出す
先ほどSNSに投げたメッセージは、登録されたサブスクライバであるSQSのキューに転送されました。ここでは、キューからメッセージを取り出したいと思います。
$ sudo vi sqs_get.sh #!/usr/bin/ruby require 'rubygems' require 'aws-sdk' AWS.config(:access_key_id => '************************', :secret_access_key => '************************', :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com') url = 'https://sqs.ap-northeast-1.amazonaws.com/***********/queue-sample' sqs = AWS::SQS.new while true receive = sqs.queues[url].receive_message() if receive message = JSON.parse(receive.body) puts message['Message'] puts message['Timestamp'] receive.delete sleep 1 end puts "Waiting" sleep 1 end
実行してみましょう。
$ ./get_sqs.sh This is notice from instance-id xxxxxxx 2012-11-24T17:15:06.493Z Waiting This is notice from instance-id xxxxxxx 2012-11-24T17:15:02.705Z Waiting This is notice from instance-id xxxxxxx 2012-11-24T17:15:05.779Z Waiting Waiting Waiting
キューから取り出したメッセージの行方
キューからメッセージを取り出すときの特徴をいくつか確認します。SQSは分散キューサービスとして高可用性性を確保しています。そのため、キューから取り出すメッセージの順番が保証されていません。また、同じメッセージが2回が取り出される可能性があります。メッセージは明示的に消さないと残ります。ここで、気になることがあります。複数台のマシンが同時多発的に同じキューのメッセージを取り出すと、みんなで同じメッセージを受け取ってしまわないでしょうか。この点についてSQSはちゃんと考えられています。1回取り出したメッセージは一定時間見えなくなります。これは、Default Visibility Timeoutで設定することができます。
動作を確認してみましょう。以下のスクリプトは、無限ループでキューをチェックしています。メッセージを消していないのでタイムアウトを過ぎると再度取得することができます。このタイムアウト時間は0から12時間まで指定することができます。キューから取り出したメッセージに基づく処理が終わるであろう時間を余裕を持って指定すれば良いかと思います。
$ sudo vi sqs_check.sh #!/usr/bin/ruby require 'rubygems' require 'aws-sdk' AWS.config(:access_key_id => '************************', :secret_access_key => '************************', :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com') url = 'https://sqs.ap-northeast-1.amazonaws.com/**********/queue-sample' sqs = AWS::SQS.new while true receive = sqs.queues[url].receive_message() if receive message = JSON.parse(receive.body) puts message['Timestamp'] end end
クライアント側の無限ループでメッセージを取り出し続けることができました。ただ、無限ループだとクライアント側の負荷が高いと思いますが、スリープしていてはリアルタイムに処理をすることができません。そこで、メッセージの取り出しをロングポーリングで待ちたいと思います。AWS SDK for Rubyにはpollメソッドが用意されていますので使ってみましょう。
#!/usr/bin/ruby require 'rubygems' require 'aws-sdk' AWS.config(:access_key_id => '************************', :secret_access_key => '************************', :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com') url = 'https://sqs.ap-northeast-1.amazonaws.com/************/queue-sample' sqs = AWS::SQS.new sqs.queues[url].poll do |msg| message = JSON.parse(msg.body) puts message['Message'] puts message['Timestamp'] end
Rubyのpollメソッドは取り出したタイミングでdeleteしてしまっているようですので、もう少し詳細な指定をするためにreceive_messageメソッドにポーリングするためのWaitTimeSecondsオプションを指定したいと思います。と、思ったらRuby用のSDKには未だそのオプションがありませんので、またの機会としましょう。
まとめ
基本的で小さな機能を疎結合に繋げることができました。それぞれのモジュールに対して詳細な動きを設定することができました。設定自体はブラウザから数クリックで完了しました。様々なプログラミング言語とSDKを使って短いコードで呼び出すことができました。あぁなんて素晴らしいんだ。この小さなモジュールを組合せれば大規模なシステムも作れるはず。短いコードであればテストも容易です。今回のサンプルでは、1つのSNSトピックから1つのサブスクライバ向けのSQSキューを指定しましたが、応用編としては1つのSNSトピックに複数のサブスクライバを指定するデザインパターンがあります。これをファンアウト(fanout)と言います。ファンアウトとは、「展開する」という意味で、専門用語では「デジタルICチップから出力されるデバイス数」という意味があります。1つのSNSメッセージから駆動して複数の処理(SQSとか)に展開される図が浮かびますね。これは実務に使えそう!では、また明日!
参考資料
【AWS発表】SQS キューとSNS 通知 の連携がより簡単になりました!
Getting Started with the AWS SDK for Ruby
AWS SDK for Ruby Developer Guide (Version v1.0.0)