[Ruby on Rails]Active Job – 複数のキューを優先順位をつけて実行する

2014.12.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

前回に引き続き、Active Jobについてです。今回は複数のキューについてです。前回と同様、Active JobとSidekiqを使用します。

キューについて

前回にも書いたように、Active Jobはキューを実現するためのインターフェースであり、実行にはSidekiqなどの外部のGemを利用します。なのでここでは、Sidekiqのキューについて見てみたいと思います。

By default, sidekiq uses a single queue called "default" in Redis. If you want to use multiple queues, you can either specify them as arguments to the sidekiq command, or set them in the Sidekiq configuration file. Each queue can be configured with an optional weight. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1:

Sidekiq Advanced Optionsより

Sidekiqのドキュメントより抜粋してみました。纏めると、Sidekiqのキューの特徴は以下のようになるかと思います。

  • デフォルトでは"default"という名のキューを(Redisに入れて)使用する
  • 複数のキーを使いたい場合、コマンドや定義ファイルに指定することができる
  • キューには「重み」をつけることができる。例えば「重み」を2つ付けたキューは、「重み」が1つのキューより2倍チェックされる。


「重み」という訳が適切か微妙ですが・・・、要は優先したい順に大きい値をキューに定義すれば良いということになります。

これらのキューにJobが複数登録され、優先順位に従って実行されることになります。

サンプルソース

複数のキーに対して優先順位を付けて実行するサンプルソースを作成してみました。ファイルの構成や手順自体は前回と同様なので、今回は優先順位付けに必要な部分だけを記述します。

1.sidekiq.yml

今回はSidekiqの定義ファイルに、複数のキーと優先順位の定義を行うことにしました。以下のようになります。

/config/sidekiq.yml
:concurrency: 25
:pidfile: ./tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
:queues:
  - [often, 7]
  - [default, 5]
  - [seldom, 3]

「queues」以下が、キューの定義です。「- [キュー名, 重み]」という書式となります。ここでは"often"、"default"、"seldom"の3つのキューを定義し、上から順に優先となるようにしています。

2.Job

上記で定義したキューに入れる、Jobの定義です。Active Jobでは、Jobが入るキューを「queue_as」で定義することができます。(ちなみに前回は説明しませんでしたが「default」で定義していました)

/app/jobs/my_job.rb
class MyJob < ActiveJob::Base
  queue_as do
    case priority
    when 'often'
      :often
    when 'default'
      :default
    when 'seldom'
      :seldom
    end
  end

  def perform(*args)
    Task.new.exec(message)
  end

  private

  def priority
    self.arguments[0]
  end

  def message
    self.arguments[1]
  end
end
[/ruby]
まず下の2つのprivateメソッド「priority」「message」では、Jobの引数より優先順位とメッセージを取得しています。Active Jobでは「self.arguments」で引数を配列形式で取得できるので、それを利用している形です。
</p>
<p>
上の「queue_as」では先述の通り、Jobを入れるキューを設定しています。priorityメソッドが返した値により、"often"、"default"、"seldom"のいずれかを選択しています。
</p>
<P>
Jobの実行時に呼び出される「perform」では、メッセージをTask.execに渡しているだけです。
</P>
<h3>3.Taskクラス</h3>
<p>
Taskクラスは以下の通りです。
<h5>/app/models/task.rb</h5>

class Task
  def exec(message)
    sleep(5)
    Rails.logger.info('executed : ' + message)
  end
end

5秒のSleep後、引数で受け取ったメッセージをログに出力します。

4.Jobの呼び出し

Jobの呼び出しを行うControllerです。

/app/controllers/start_controller.rb
class StartController < ApplicationController
  def run

    (1..1000).each do |i|
      MyJob.perform_later('seldom',  i.to_s + 'nd seldom task.')
      MyJob.perform_later('default', i.to_s + 'nd default task.')
      MyJob.perform_later('often',   i.to_s + 'nd often task.')
    end

    render :nothing => true
  end
end

1000回、"seldom"、"default"、"often"のキューに入るよう、Jobを呼び出しています。第一引数にはキュー名を、第二引数には回数・キュー名が分かるようなメッセージを渡しています。

実行結果

3つのキューが設定した優先順位通りに実行されるかを、実際に実行してみました。とはいってもControllerで3つ同時に呼び出している訳ではないので、厳密な検証とは言えないとは思います。今回は優先順位の逆順に呼び出し、1000回目に実行されるJobがどうなるのかを見てました。

以下が、ログファイルです。

/log/development.log
I, [2014-12-25T16:03:28.656773 #1102]  INFO -- : [ActiveJob] [MyJob] [d3444b7d-e452-49f7-b75a-51182fcbeeb3] executed : 1000nd often task.
(中略)
I, [2014-12-25T16:05:23.864988 #1102]  INFO -- : [ActiveJob] [MyJob] [2cc9e7ea-5f29-4dd6-a045-deca7fd31cea] executed : 1000nd default task.
(中略)
I, [2014-12-25T16:06:34.048356 #1102]  INFO -- : [ActiveJob] [MyJob] [b3ee7b15-f12a-4784-8b7b-dcbccc61b504] executed : 1000nd seldom task.

"often"、"default"、"seldom"の順になっています。

キューの優先順位を変えて実行してみました。

/config/sidekiq.yml
(中略)
:queues:
  - [often, 7]
  - [default, 5]
  - [seldom, 6]

以下が、ログファイルです。

/log/development.log
I, [2014-12-25T16:28:17.565769 #1172]  INFO -- : [ActiveJob] [MyJob] [6741d2a2-9c9d-4745-a91b-b9d1a3c997f7] executed : 1000nd often task.
(中略)
I, [2014-12-25T16:28:47.396225 #1172]  INFO -- : [ActiveJob] [MyJob] [b12febb9-279f-4fd2-9bf5-4aa77a6a5f26] executed : 1000nd seldom task.
(中略)
I, [2014-12-25T16:29:22.697153 #1172]  INFO -- : [ActiveJob] [MyJob] [e2bdccc9-33e8-405e-9fbe-67aa04428280] executed : 1000nd default task.

こちらも定義した「重み」の順になってますね。

まとめ

イベントを待機して、イベント毎に異なる処理を非同期で行いたい場合や、同時に来たイベントを判定して優先順位を付けて実行したい場合などが使い道として想定できます。何かのときに役立てば幸いです。

参考サイト

How to Integrate Sidekiq With ActiveJob
Sidekiq Advanced Options