[Rails] Sidekiq の Worker で Amazon SQS のメッセージを処理する

2014.07.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon SQS のメッセージを処理する際、Worker を使って行うことが多いかと思います。
Rails で非同期処理を実現する方法の 1 つとして sidekiq という gem がありますが、 今回この sidekiq の Worker を使って SQS のメッセージを処理してみたいと思います。

必要な gem の追加

Rails プロジェクトを作り、Gemfile を編集します。

source 'https://rubygems.org'

gem 'rails', '4.1.2'
gem 'sidekiq'
gem 'aws-sdk-core'
gem 'settingslogic'

必要最低限の指定です。
aws-sdk はバージョン 2 系を使ってます。http://aws.amazon.com/jp/sdkforruby/
settingslogic は定数管理の gem です。SQS へ接続する region と endpoint などを指定するために使います。

モデルの定義

SQS を処理する部分をモデルに書きたいですが、
複数のモデルで共有出来るように concern を使って SQS への操作部分を定義しました。

# app/models/concerns/sqs_usable.rb

module SqsUsable
  extend ActiveSupport::Concern

  def initialize(queue_name)
    @queue_name = queue_name
  end

  def sqs
    @sqs ||= Aws::SQS.new(region: Settings.region, endpoint: Settings.endpoint)
  end

  def receive_message(max_number = 10)
    sqs.receive_message(queue_url: queue_url, max_number_of_messages: max_number)
  end

  def batch_delete(entries)
    sqs.delete_message_batch(queue_url: queue_url, entries: entries)
  end

  def queue_url
    @queue_url ||= "#{Settings.queue_url}/#{@queue_name}"
  end
end

receive_message では max_number_of_messages に一度に取得したいメッセージの最大数を 指定出来ます。(最大 10 個まで)
Settings.xxx の部分は、settingslogic の指定です。

# config/application.yml
defaults: &defaults
  region:     'ap-northeast-1'
  endpoint:   'sqs.ap-southeast-1.amazonaws.com'
  queue_url:  'https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx'
  queue_name: 'test_queue'

上記で定義した concern をモデルに mix-inしますが、以下のように作りました。

# app/models/message_collection.rb

class MessageCollection
  include SqsUsable

  def initialize(type)
    super
  end

  def receive_message
    res = super
    res.data.messages
  end

  def batch_delete(messages)
    entries = []
    messages.each_with_index do |message, i|
      entries << { id: i.to_s, receipt_handle: message.receipt_handle }
    end
    super(entries)
  end
end
[/ruby]

<p>
今回のこの MessageCollection は SQS のバッチ受信/削除用です。<br/>
必要に応じて、1 つのメッセージ処理用に Message クラス等を追加するといいと思います。<br/>
初期化の際、 type に渡すのはキューの名前になります。<br/>
</p>

<p>そして worker は次のようにしました。</p>

# app/workers/sqs_receive_worker.rb

class SqsReceiveWorker
  include Sidekiq::Worker

  def perform(queue_name)
    collection = MessageCollection.new(queue_name)

    if messages = collection.receive_message
      messages.each do |message|
        # TODO: 何かしらの処理…

      end

      collection.batch_delete(messages)
    end
    SqsReceiveWorker.perform_in(1.second, queue_name)
  end
end

※ サンプルのためエラーハンドリング等は省略しています。
SQS からロングポーリングで受信して、処理を終えた後に纏めて削除するような作りになっています。
一連の処理が終わったら、1 秒後に次の処理を実行します。 sidekiq が新しいジョブをチェックするのは、デフォルトだと15秒間隔のため、initializer に以下の設定を読み込むようにしています。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.poll_interval = 1
end

SQS の設定

SQS の設定としては、ロングポーリングを有効にしておきます。
マネージメントコンソールから、キューを作成し、Receive Message Wait time に 20 を指定しましょう。
これでキューからメッセージを取得する際に、メッセージが無くても最大 20 秒まで待つようになります。

140630-0002

動作確認

後は、sidekiq を起動して、Worker をキックします。

$ bundle exec sidekiq 
         s
          ss
     sss  sss         ss
     s  sss s   ssss sss   ____  _     _      _    _
     s     sssss ssss     / ___|(_) __| | ___| | _(_) __ _
    s         sss         \___ \| |/ _` |/ _ \ |/ / |/ _` |
    s sssss  s             ___) | | (_| |  __/   <| | (_| |
    ss    s  s            |____/|_|\__,_|\___|_|\_\_|\__, |
    s     s s                                           |_|
          s s
         sss
         sss

2014-07-01T05:21:45Z 17127 TID-owbv6zvfk INFO: Running in ruby 2.1.1p76 (2014-02-24 revision 45161) [x86_64-darwin13.0]
2014-07-01T05:21:45Z 17127 TID-owbv6zvfk INFO: See LICENSE and the LGPL-3.0 for licensing details.
2014-07-01T05:21:45Z 17127 TID-owbv6zvfk INFO: Upgrade to Sidekiq Pro for more features and support: http://sidekiq.org/pro
2014-07-01T05:21:45Z 17127 TID-owbv6zvfk INFO: Starting processing, hit Ctrl-C to stop
2014-07-01T05:21:45Z 17127 TID-owbvtuixw INFO: Booting Sidekiq 3.1.4 with redis options {}
[/shell]

$ bundle exec rails c
> SqsReceiveWorker.perform_async(Settings.queue_name)

本番稼働に向けて

以上で、SQS を使った sidekiq のサンプルの紹介は終わりです。
ここからはおまけとしてプロダクション環境で SQS を使う上で、自分なりに感じた注意点をちょっとだけ書いてみたいと思います。

エラーハンドリングを十分考慮する

SQS のメッセージを取得後、削除しなければ、visibility timeout の期間を過ぎてしまうとまた取得出来るようになります。
これは何らかの例外によって削除前に処理が終わってしまうと、無限ループに陥ってしまう場合があります。(Message Retention Period の期間が過ぎた場合は削除されます) 適切なエラーハンドリングを仕込み、メッセージを削除するようにしたり、受信回数が一定数を超えた場合に他のキューへ転送する Dead Letter Queue の仕組みを使うことを検討したほうがいいと思います。

重複をどう扱うか?

Amazon SQS のメッセージはとても僅かな確率ですが、分散キューという特性上重複して取得されるケースがあるので、 重複してしまった場合の事を一度考えておいた方がいいと思います。
対応する方法としては、

  • 重複して処理をしても影響なく動作させる。
  • 重複を除いて処理する。(例えば SQS のメッセージには UUID があるのでこれを重複チェックに利用する)
  • そもそも重複は無視する。(重複する確率は低いので実装コストとのトレードオフして諦める)

などがあるかと思います。

メッセージ数の予測

登録するメッセージの数が多いと、1 つの Worker でさばききれなくなる可能性があります。
Worker の数を増やすことを検討したり、1 メッセージあたりのペイロードに余裕があるようだったら、 トータルのリクエスト数を減らすために、タイミングによってはその余り分データを詰めてもいいかもしれません。(もちろん受信側でも適切に処理してあげる必要がありますが)
Worker の数を増やすについては、メッセージの数をトリガーとさせて、AutoScaling させたりも出来ますね。

最後に

sidekiq を使うことで、Amazon SQS を捌く例とおまけで注意点をご紹介しました。
SQS はとても便利で楽しいので是非皆さんのプロジェクトでも導入を検討してみてください!