Rubyを使ってAmazon SNSとSQSを操作する:fanoutパターン

Rubyを使ってAmazon SNSとSQSを操作する:fanoutパターン

Clock Icon2012.11.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS SDK for Ruby

今年の7月にAWS SDK for Rubyがリリースされました。いつもはフルセットで用意されているJavaを使っているのですが、今回はRubyがどの程度簡単に記述できるのか確認するためにサンプルを作りました。また、サンプル作成にあたって、SNSとSQSの連携を試しました。

Amazon LinuxでAWS SDK for Rubyのセットアップ

早速セットアップしましょう。

$ sudo yum install ruby-devel rubygems gcc make libxml2-devel libxslt-devel aws-sdk

Amazon SNSをセットアップする

通知先としてAmazon SNSのトピックを作成します。トピックは、受付窓口として対応します。受け付けられたメッセージはサブスクライバ(購読者)に渡されます。サブスクライバには、HTTP/HTTPS/EMail/Email(JSON)/SQSを複数指定することができます。

早速トピックを作成してみましょう。

Amazon SQSをセットアップする

Amazon SNSに通知されたメッセージをSQSに渡すためにセットアップします。まずは、管理コンソールからSQSを選択してキューの新規作成をします。

SNSとSQSを連携させる

SNSとSQSを関連づけするのも簡単です。メニューから選択します。

既に作成済みのSNSを指定することができます。これは便利ですね。SNS画面からSQSもドロップボックスから指定したいですね。まぁそのうち追加されているのがAWSの凄いところ。

改めてSNSの画面を見ますと、SQSが登録されているのが分かります。

Amazon SNSに通知する

セットアップが終わりましたので、AWS SDK fro Rubyを使ってAmazon SNSにアクセスしたいと思います。短いシェルを書いて呼び出しています。

$ sudo vi sns-put.sh
#!/usr/bin/ruby

require 'rubygems'
require 'aws-sdk'

AWS.config(:access_key_id => '************************', 
  :secret_access_key => '************************',
  :sns_endpoint => 'sns.ap-northeast-1.amazonaws.com')

topic = AWS::SNS::Topic.new('arn:aws:sns:ap-northeast-1:***********:'+ARGV[0])
topic.publish('', :subject => ARGV[1], :email => ARGV[2])

作ったシェルを呼び出すときは、引数にトピック名、タイトル、メッセージを記述します。

$ ./sns-put.sh topic-sample "notice" "This is notice from instance-id xxxxxxx"

Amazon SQSのキューからメッセージを取り出す

先ほどSNSに投げたメッセージは、登録されたサブスクライバであるSQSのキューに転送されました。ここでは、キューからメッセージを取り出したいと思います。

$ sudo vi sqs_get.sh
#!/usr/bin/ruby

require 'rubygems'
require 'aws-sdk'

AWS.config(:access_key_id => '************************', 
  :secret_access_key => '************************',
  :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com')

url = 'https://sqs.ap-northeast-1.amazonaws.com/***********/queue-sample'

sqs = AWS::SQS.new

while true
  receive = sqs.queues[url].receive_message()
  if receive
    message = JSON.parse(receive.body)
    puts message['Message']
    puts message['Timestamp']
    receive.delete
    sleep 1
  end
  puts "Waiting"
  sleep 1
end

実行してみましょう。

$ ./get_sqs.sh 
This is notice from instance-id xxxxxxx
2012-11-24T17:15:06.493Z
Waiting
This is notice from instance-id xxxxxxx
2012-11-24T17:15:02.705Z
Waiting
This is notice from instance-id xxxxxxx
2012-11-24T17:15:05.779Z
Waiting
Waiting
Waiting

キューから取り出したメッセージの行方

キューからメッセージを取り出すときの特徴をいくつか確認します。SQSは分散キューサービスとして高可用性性を確保しています。そのため、キューから取り出すメッセージの順番が保証されていません。また、同じメッセージが2回が取り出される可能性があります。メッセージは明示的に消さないと残ります。ここで、気になることがあります。複数台のマシンが同時多発的に同じキューのメッセージを取り出すと、みんなで同じメッセージを受け取ってしまわないでしょうか。この点についてSQSはちゃんと考えられています。1回取り出したメッセージは一定時間見えなくなります。これは、Default Visibility Timeoutで設定することができます。

動作を確認してみましょう。以下のスクリプトは、無限ループでキューをチェックしています。メッセージを消していないのでタイムアウトを過ぎると再度取得することができます。このタイムアウト時間は0から12時間まで指定することができます。キューから取り出したメッセージに基づく処理が終わるであろう時間を余裕を持って指定すれば良いかと思います。

$ sudo vi sqs_check.sh
#!/usr/bin/ruby

require 'rubygems'
require 'aws-sdk'

AWS.config(:access_key_id => '************************',
  :secret_access_key => '************************',
  :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com')

url = 'https://sqs.ap-northeast-1.amazonaws.com/**********/queue-sample'

sqs = AWS::SQS.new

while true
  receive = sqs.queues[url].receive_message()
  if receive
    message = JSON.parse(receive.body)
    puts message['Timestamp']
  end
end

クライアント側の無限ループでメッセージを取り出し続けることができました。ただ、無限ループだとクライアント側の負荷が高いと思いますが、スリープしていてはリアルタイムに処理をすることができません。そこで、メッセージの取り出しをロングポーリングで待ちたいと思います。AWS SDK for Rubyにはpollメソッドが用意されていますので使ってみましょう。

#!/usr/bin/ruby

require 'rubygems'
require 'aws-sdk'

AWS.config(:access_key_id => '************************',
  :secret_access_key => '************************',
  :sss_endpoint => 'sqs.ap-northeast-1.amazonaws.com')

url = 'https://sqs.ap-northeast-1.amazonaws.com/************/queue-sample'

sqs = AWS::SQS.new

sqs.queues[url].poll do |msg|
  message = JSON.parse(msg.body)
  puts message['Message']
  puts message['Timestamp']
end

Rubyのpollメソッドは取り出したタイミングでdeleteしてしまっているようですので、もう少し詳細な指定をするためにreceive_messageメソッドにポーリングするためのWaitTimeSecondsオプションを指定したいと思います。と、思ったらRuby用のSDKには未だそのオプションがありませんので、またの機会としましょう。

まとめ

基本的で小さな機能を疎結合に繋げることができました。それぞれのモジュールに対して詳細な動きを設定することができました。設定自体はブラウザから数クリックで完了しました。様々なプログラミング言語とSDKを使って短いコードで呼び出すことができました。あぁなんて素晴らしいんだ。この小さなモジュールを組合せれば大規模なシステムも作れるはず。短いコードであればテストも容易です。今回のサンプルでは、1つのSNSトピックから1つのサブスクライバ向けのSQSキューを指定しましたが、応用編としては1つのSNSトピックに複数のサブスクライバを指定するデザインパターンがあります。これをファンアウト(fanout)と言います。ファンアウトとは、「展開する」という意味で、専門用語では「デジタルICチップから出力されるデバイス数」という意味があります。1つのSNSメッセージから駆動して複数の処理(SQSとか)に展開される図が浮かびますね。これは実務に使えそう!では、また明日!

参考資料

AWS SDK for Ruby

【AWS発表】SQS キューとSNS 通知 の連携がより簡単になりました!

Getting Started with the AWS SDK for Ruby

AWS SDK for Ruby Developer Guide (Version v1.0.0)

Release: Amazon Simple Queue Service on 2012-11-21

SQS Queues and SNS Notifications - Now Best Friends

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.