【コードリーディング】 Delayed::Jobのジョブ登録と実行
はじめに
Ruby on Railsで非同期処理を行う際に、Delayed::Jobを触る機会がありました。
このDelayed::Jobについて動かし方を少し知ってはいたものの、内部処理まではほとんど知らないままでした。
内部構造をほとんど知らなくても動かせることは良いことなのですが、どんな仕組みで動いているのか興味があったので、コードリーディングしながら理解することにしました。
環境
- Ruby: 2.7.4
- Ruby on Rails: 6.0.4
- MySQL: 5.7
- delayed_job: 4.1.9
- delayed_job_active_record: 4.1.9
Delayed::Jobとは
Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are:
- sending massive newsletters
- image resizing
- http downloads
- updating smart collections
- updating solr, our search server, after product changes
- batch imports
- spam checks
READMEにある通り、バックグラウンドで長いタスクを非同期に実行する一般的なパターンをカプセル化したもの、これがDelayed::Jobとなります。
Shopifyにあったものから抽出したものがDelayed::Jobとなっており、主に、大量のニュースレター送信、イメージリサイズ、ダウンロード、一括インポートなどに使用されていたようです。
ジョブの登録、実行の使い方の詳細に関しては、READMEにありますので、ここでは省略します。
コードリーディング
ここでは主に以下2点についてコードリーディングします。
- ジョブの登録
- ワーカーによるジョブの実行
前提
以下User
モデルを作成し、その中でジョブ登録を行っています。
class User < ApplicationRecord
def send_message
puts '---send_message---'
end
def self.send_delay_message
user = new
user.save!
user.delay(priority: 10, run_at: Time.now + 10).send_message
end
end
ジョブ登録
それではジョブ登録時の処理についてコードリーディングします。
まずはdelay
メソッドからです。
def delay(options = {})
DelayProxy.new(PerformableMethod, self, options)
end
PerfomableMethod
はDelayed::Jobのライブラリで定義しているクラスとなります。
ここではDelayProxy
のインスタンスを作っているのみとなります。
DelayProxy
クラスは以下のクラス構造となっており、シンプルなクラスとなっています。
class DelayProxy < Delayed::Compatibility.proxy_object_class
def initialize(payload_class, target, options)
@payload_class = payload_class
@target = target
@options = options
end
# rubocop:disable MethodMissing
def method_missing(method, *args)
Job.enqueue({:payload_object => @payload_class.new(@target, method.to_sym, args)}.merge(@options))
end
# rubocop:enable MethodMissing
end
delay
メソッドの処理が終了したので、user.delay(priority: 10, run_at: Time.now + 10).send_message
でのsend_message
実行となります。
このメソッドはDelayProxy
クラスには存在しないので、method_missing
メソッドが呼び出されます。
def method_missing(method, *args)
Job.enqueue({:payload_object => @payload_class.new(@target, method.to_sym, args)}.merge(@options))
end
payload_class
がPerformableMethod
のため、PerformableMethod
のインスタンスを持つハッシュを引数として、enqueue
メソッドを呼び出します。
PerformableMethod
クラスはアダプターパターンを使用しているため、PerformableMethod
クラスのインスタンスを作成することで、指定したクラス(今回は@target
、つまりUser
クラス)のアダプターとなります。
なお、このJob
クラスはdelayed_job_active_recordのライブラリにあるactive_record.rbで定義されており、Delayed::Backend::Base
をincludeしています。
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
ではbaseクラスのenqueue
メソッドを見てみます。
def enqueue(*args)
job_options = Delayed::Backend::JobPreparer.new(*args).prepare
enqueue_job(job_options)
end
JobPreparer
クラスでは、*args
を使用してジョブの各オプションを格納します。格納できるオプションとしては、payload_object
、queue
、priority
、run_at
があります。
今回では、
-
priority
: 10 -
run_at
:Time.now + 10
-
payload_object
:@payload_class.new(@target, method.to_sym, args)}.merge(@options)
がジョブのオプションとして格納されています。
その次のenqueue_job
メソッドに移ります。
def enqueue_job(options)
new(options).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
end
end
enqueue_job
ではジョブを生成し、ジョブ実行前のアクション、ジョブ実行を行っています。
まずnew
では、Delayed::Backend::Base
をincludeしていたJob
クラスのインスタンスが生成されます。
その際、baseクラスには代入構文のpayload_object
メソッドが定義されているので、そのメソッドが呼び出されます。
def payload_object=(object)
@payload_object = object
self.handler = object.to_yaml
end
このメソッドでobject
、つまりジョブのオプションを持ったPerformableMethod
のインスタンスがYAML化されて、handler
にセットされます。
このnew
によるインスタンスが作成されると、このインスタンス(job
)に対し、Delayed::Worker.lifecycle.run_callbacks
を実行します。
まず、Delayed::Worker
が呼び出された時点で、reset
メソッドが実行されます。
module Delayed
class Worker
・・・
end
・・・
end
・・・
Delayed::Worker.reset
reset
メソッドでは、ジョブを登録する上でのデフォルト値がセットされます。
module Delayed
class Worker # rubocop:disable ClassLength
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
:read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete,
:default_log_level
・・・
def self.reset
self.default_log_level = DEFAULT_LOG_LEVEL
self.sleep_delay = DEFAULT_SLEEP_DELAY
self.max_attempts = DEFAULT_MAX_ATTEMPTS
self.max_run_time = DEFAULT_MAX_RUN_TIME
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
end
・・・
self.plugins = [Delayed::Plugins::ClearLocks]
・・・
self.destroy_failed_jobs = true
・・・
cattr_accessor :raise_signal_exceptions
self.raise_signal_exceptions = false
Delayed::Worker.lifecycle
を見てみます。
def self.lifecycle
# In case a worker has not been set up, job enqueueing needs a lifecycle.
setup_lifecycle unless @lifecycle
@lifecycle
end
def self.setup_lifecycle
@lifecycle = Delayed::Lifecycle.new
plugins.each { |klass| klass.new }
end
lifecycle
メソッドではシングルトンパターンを使用し、@lifecycle
を管理しています。
@lifecycle
にはreset
メソッド実行時に値がnil
になったので、setup_lifecycle
メソッドを呼び出します。
setup_lifecycle
メソッドではDelayed::Lifecycle
のインスタンスを作成しています。
module Delayed
・・・
class Lifecycle
EVENTS = {
:enqueue => [:job],
:execute => [:worker],
:loop => [:worker],
:perform => [:worker, :job],
:error => [:worker, :job],
:failure => [:worker, :job],
:invoke_job => [:job]
}.freeze
def initialize
@callbacks = EVENTS.keys.each_with_object({}) do |e, hash|
hash[e] = Callback.new
end
end
Lifecycle
クラスのインスタンス作成時、それぞれのイベントに応じてCallback
のインスタンスを作成して保持しています。
class Callback
def initialize
@before = []
@after = []
# Identity proc. Avoids special cases when there is no existing around chain.
@around = lambda { |*args, &block| block.call(*args) }
end
Callback
クラスではコールバック実行時や実行前後の準備を行っています。
ではself.setup_lifecycle
メソッドの処理に戻り、plugins
のそれぞれに対し、Delayed::Plugins::ClearLocks
クラスのインスタンスを作成します。
これにより、setup_lifecycle
メソッドの処理がすべて終了するため、Delayed::Worker.lifecycle.run_callbacks
のメソッドの中身を見てみます。
def run_callbacks(event, *args, &block)
missing_callback(event) unless @callbacks.key?(event)
unless EVENTS[event].size == args.size
raise ArgumentError, "Callback #{event} expects #{EVENTS[event].size} parameter(s): #{EVENTS[event].join(', ')}"
end
@callbacks[event].execute(*args, &block)
end
run_callbacks
メソッドでは、主に引数のイベント名(event
)がDelayed::Jobライブラリに定義されているならば、そのイベントのブロックを実行する処理となっています。
run_callbacks
を呼び出したときのイベント名はenqueue
であり、このライブラリに存在するので、@callbacks[event].execute
メソッドを実行します。
@callbacks
にはLifecycle
の初期化時にCallback
のインスタンスがあるので、Callback
のexecute
メソッドを実行します。
def execute(*args, &block)
@before.each { |c| c.call(*args) }
result = @around.call(*args, &block)
@after.each { |c| c.call(*args) }
result
end
今回はジョブ実行前後のbefore
、after
で行う処理として何も設定していないので、@around.call
のみ実行します。
@around
は@around = lambda { |*args, &block| block.call(*args) }
となっているため、run_callback
のブロックを実行します。
def enqueue_job(options)
new(options).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
end
end
ブロックでは、ジョブのフックを行った後にジョブの登録を行っています。
まずはフック処理からです。
def hook(name, *args)
if payload_object.respond_to?(name)
method = payload_object.method(name)
method.arity.zero? ? method.call : method.call(self, *args)
end
rescue DeserializationError # rubocop:disable HandleExceptions
end
payload_object
はbase.rbで定義しているメソッドがあります。
def payload_object
@payload_object ||= YAML.load_dj(handler)
rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
raise DeserializationError, "Job failed to load: #{e.message}. Handler: #{handler.inspect}"
end
handler
は、以前作成されたPerformableMethod
のインスタンスをYAML化したものがセットされています。
load_dj
メソッドはsyck_ext.rbで定義されています。
module YAML
def load_dj(yaml)
# See https://github.com/dtao/safe_yaml
# When the method is there, we need to load our YAML like this...
respond_to?(:unsafe_load) ? load(yaml, :safe => false) : load(yaml)
end
end
PerformableMethod
のインスタンスがpayload_object
にセットされます。
hook
メソッドに戻ります。
payload_object
であるPerformableMethod
のインスタンスはUser
クラスのアダプターのため、User
クラスにname
(ここではenqueue
)があるかどうか確認します。
そのメソッドがあれば実行となりますが、今回はUser
クラスに定義していないので、hook
メソッドでは何もしないままとなります。
次に、Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
です。
def self.delay_job?(job)
if delay_jobs.is_a?(Proc)
delay_jobs.arity == 1 ? delay_jobs.call(job) : delay_jobs.call
else
delay_jobs
end
end
delay_jobs
はDelayed::Worker
が最初に呼ばれた時点でデフォルト値がtrueとなっており変わっていないので、そのままdelay_jobs
の値が返ります。
そのため、Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
ではjob.save
が実行され、delayed_jobs
テーブルに保存されます。
ここまでの処理を行うことで、ジョブの登録が完了します。
ワーカーによるジョブの実行
ジョブの実行については、
$ bundle exec rails jobs:work
でワーカーを実行し、delayed_jobs
テーブルに登録されたジョブを実行できます。
この仕組みを追っていきます。
実行する際に、railtie.rbのtaskを実行します。
rake_tasks do
load 'delayed/tasks.rb'
end
これによりtasks.rbが読み込まれ、Delayed::Jobで定義しているタスクを実行できるようになります。
namespace :jobs do
desc 'Clear the delayed_job queue.'
task :clear => :environment do
Delayed::Job.delete_all
end
desc 'Start a delayed_job worker.'
task :work => :environment_options do
Delayed::Worker.new(@worker_options).start
end
jobs:work
を実行すると、work
タスク実行前にenvironment_options
タスクが呼び出されます。
task :environment_options => :environment do
@worker_options = {
:min_priority => ENV['MIN_PRIORITY'],
:max_priority => ENV['MAX_PRIORITY'],
:queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
:quiet => ENV['QUIET']
}
@worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY']
@worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD']
end
今回は何も設定していないので、初期設定は何もありません。
それではDelayed::Worker.new(@worker_options).start
を見ていきます。
まず、Delayed::Worker.new(@worker_options)
からです。
ジョブの登録時でも記載したとおり、Delayed::Worker
クラス呼び出しした際にはreset
メソッドが呼ばれ、ジョブの初期設定が行われます。
reset
メソッドについては事前に記載しているため、その後のインスタンスについて見てみます。
def initialize(options = {})
@quiet = options.key?(:quiet) ? options[:quiet] : true
@failed_reserve_count = 0
[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
self.class.send("#{option}=", options[option]) if options.key?(option)
end
# Reset lifecycle on the offhand chance that something lazily
# triggered its creation before all plugins had been registered.
self.class.setup_lifecycle
end
worker
クラスにそれぞれセットします。
その後、以前に記載したsetup_lifecycle
メソッドにより、Lifecycle
クラスのインスタンスが作成されます。
Delayed::Worker
クラスのインスタンス作成による処理はここまでなので、次のstart
メソッドを見てみます。
def start # rubocop:disable CyclomaticComplexity, PerceivedComplexity
trap('TERM') do
Thread.new { say 'Exiting...' }
stop
raise SignalException, 'TERM' if self.class.raise_signal_exceptions
end
trap('INT') do
Thread.new { say 'Exiting...' }
stop
raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
end
say 'Starting job worker'
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
@realtime = Benchmark.realtime do
@result = work_off
end
end
count = @result[0] + @result[1]
if count.zero?
if self.class.exit_on_complete
say 'No more jobs available. Exiting'
break
elsif !stop?
sleep(self.class.sleep_delay)
reload!
end
else
say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last)
end
break if stop?
end
end
end
def stop
@exit = true
end
def stop?
!!@exit
end
start
メソッドの中盤までは、trap
を使用したシグナルに対する中断処理があります。
シグナルについて:https://atmarkit.itmedia.co.jp/ait/articles/1708/04/news015.html
その後、run_callback
のブロックに入りループが開始されます。
このループ処理では、以下2つの条件のどちらかが成り立つと、ループ処理が中断します。
- 実行対象のジョブがなくなり、ワーカー実施時のオプション
exit_on_complete
がtrueの場合 - ワーカー実行中に
stop
メソッドがどこかで呼ばれた場合
それ以外の場合、sleep_delay
の時間分インターバルをおいて、ループ処理続行となります。
以前のreset
メソッド実行時に、sleep_delay
がデフォルト5秒となっているため、とくに指定がなければ5秒間隔でループ続行となります。
それではrun_callback
ブロック内にある
@realtime = Benchmark.realtime do
@result = work_off
end
を深堀ります。
Benchmark.realtime
はブロックの実行時間の計測を行うものなので、work_off
メソッドに移ります。
def work_off(num = 100)
success = 0
failure = 0
num.times do
case reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if stop? # leave if we're exiting
end
[success, failure]
end
ここでは、ジョブの取り出しと実行を行い、その成功・失敗に応じてフラグをカウントアップし、結果を返すものとなります。
num
が100と固定になっているため、Delayed::Jobでは一度にジョブを100個取り出して実行する仕組みとなっています。
それでは実際の取り出しと実行について、reserve_and_run_one_job
メソッドを見てみます。
def reserve_and_run_one_job
job = reserve_job
self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
end
ここはメソッド名の通り、ジョブの取り出しと実行を行うのみとなります。
それぞれ処理を見ていきます。
ジョブの取得
def reserve_job
job = Delayed::Job.reserve(self)
@failed_reserve_count = 0
job
rescue ::Exception => error # rubocop:disable RescueException
say "Error while reserving job: #{error}"
Delayed::Job.recover_from(error)
@failed_reserve_count += 1
raise FatalBackendError if @failed_reserve_count >= 10
nil
end
Delayed::Job.reserve(self)
を追いかけます。
def self.reserve(worker, max_run_time = Worker.max_run_time)
ready_scope =
ready_to_run(worker.name, max_run_time)
.min_priority
.max_priority
.for_queues
.by_priority
reserve_with_scope(ready_scope, worker, db_time_now)
end
このメソッドでは、ジョブの取り出しに使用するスコープを作成して取り出します。
Worker.max_run_time
はreset
メソッド実行時にデフォルト4時間としてセットされています。
そのため、ワーカー名とmax_run_time
を使用したready_to_run
メソッドを読んでいきます。
def self.ready_to_run(worker_name, max_run_time)
where(
"((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
end
なお、db_time_now
はRuby on Railsが動いているプロセスのタイムゾーンの現在時刻を取ってきています。
def self.db_time_now
if Time.zone
Time.zone.now
elsif ::ActiveRecord::Base.default_timezone == :utc
Time.now.utc
else
Time.now # rubocop:disable Rails/TimeZone
end
end
それではready_to_run
メソッドの呼び出し元に戻り、チェーンされていたメソッドを見ていきます。
ready_scope =
ready_to_run(worker.name, max_run_time)
.min_priority
.max_priority
.for_queues
.by_priority
min_priority
、max_priority
、for_queues
、by_priority
はそれぞれscopeとして定義されています。
scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }
このうちby_priority
以外に関しては、ワーカー実行時にオプションを指定していないので、Worker.min_priority
、Worker.max_priority
はnil
、Worker.queue
は[]
となり、スコープが作成されません。
そのため、by_priority
のみを使用してスコープが作られます。
結果として、ready_scope
にセットされるwhere句とorder句は以下のようになります。
(
(
run_at <= db_time_now
AND
(
locked_at IS NULL
OR
locked_at < (db_time_now - max_run_time)
)
)
OR
locked_by = worker_name
)
AND failed_at IS NULL
order priority ASC, run_at ASC
スコープが作られたので、reserve_with_scope
メソッドを見ていきます。
def self.reserve_with_scope(ready_scope, worker, now)
case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
# Optimizations for faster lookups on some common databases
when :optimized_sql
reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
# Slower but in some cases more unproblematic strategy to lookup records
# See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
when :default_sql
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
ここでは、発行するSQLのストラテジが、最適化したSQLか通常のSQLかによって、対応するメソッドを呼び出しています。
ストラテジについてDelayed::Backend::ActiveRecord.configuration
の呼び出しを見てみます。
module Backend
module ActiveRecord
class Configuration
attr_reader :reserve_sql_strategy
def initialize
self.reserve_sql_strategy = :optimized_sql
end
・・・
def self.configuration
@configuration ||= Configuration.new
end
configuration
メソッドの呼び出しによりこのクラスのインスタンスを作成しています。
こちらにもシングルトンパターンが使用されています。
インスタンス作成時に、ストラテジのデフォルト値としてoptimized_sql
がセットされます。
今回はストラテジに何もセットしていないので、Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
がoptimized_sql
となります。
そのため、reserve_with_scope_using_optimized_sql
メソッドを呼び出します。
def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
case connection.adapter_name
when "PostgreSQL", "PostGIS"
reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
when "MySQL", "Mysql2"
reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
when "MSSQL", "Teradata"
reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# Fallback for unknown / other DBMS
else
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
このメソッドでは、DBの種類に応じて対応するSQL発行のメソッドを呼び出しています。
今回使用しているDBはMySQLなので、reserve_with_scope_using_optimized_mysql
メソッドを使用します。
def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
# Removing the millisecond precision from now(time object)
# MySQL 5.6.4 onwards millisecond precision exists, but the
# datetime object created doesn't have precision, so discarded
# while updating. But during the where clause, for mysql(>=5.6.4),
# it queries with precision as well. So removing the precision
now = now.change(usec: 0)
# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
return nil if count == 0
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end
メソッド内のコメントでもわかる通り、MySQL 5.6.4以降はdatetimeに小数点がサポートされていますが、デフォルトの精度が0(小数点なし)のため、changeを使用して小数点を削除(0に統一)しています。
次にready_scope
の条件から最初の1件を取り出し、locked_at
を現在時刻、locked_by
をワーカー名で更新しています。
この更新したレコードがなければ実行ジョブなしとしてreturnしています。
もしレコードがあれば、さきほどのupdate_all
メソッドで使用した条件に加え、failed_at
がnil
のレコードを1件取り出します。
ジョブの取得はここまでとなります。
取得したジョブの実行
ジョブを取得したので、self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
を見ていきます。
def reserve_and_run_one_job
job = reserve_job
self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
end
run
メソッドは以下のとおりです。
def run(job)
job_say job, 'RUNNING'
runtime = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
job.destroy
end
job_say job, format('COMPLETED after %.4f', runtime)
return true # did work
rescue DeserializationError => error
job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'
job.error = error
failed(job)
rescue Exception => error # rubocop:disable RescueException
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
return false # work failed
end
Benchmark.realtime
メソッドは以前説明したとおりなのでおいておき、そのブロックを確認します。
max_run_time
はデフォルト4時間のため、ジョブのタイムアウト時間は通常4時間となります。
またタイムアウトしたときに出すメッセージはWorkerTimeout
クラスで定義しています。
タイムアウトが設定されたので、job.invoke_job
の中身を見てみます。
def invoke_job
Delayed::Worker.lifecycle.run_callbacks(:invoke_job, self) do
begin
hook :before
payload_object.perform
hook :success
rescue Exception => e # rubocop:disable RescueException
hook :error, e
raise e
ensure
hook :after
end
end
end
hook
メソッドは以前記載したとおり、引数に指定したメソッド名を、PerformableMethod
クラスを経由してアダプター先のクラス(User
)のメソッドとして呼び出します。
ここではperform
メソッドの前後でbefore
、success
メソッドがUser
クラスにあれば呼び出します。
今回は定義していないので、payload_object.perform
のみ行われます。
payload_object
はPerformableMethod
クラスのインスタンスなので、そのクラスのperform
メソッドを見てみます。
def perform
object.send(method_name, *args) if object
end
object
や、method_name、*argsについては、ジョブの登録時にPerformableMethod
クラスのインスタンス作成で引数にセットしていた値が入るので、それぞれUser
クラスのインスタンス、message_sending
、引数なしとなります。
そのためここで、User
クラスのmessage_sending
メソッドが実行されます。
ジョブの実行が終了するので、最後にjob.destroy
を行いジョブのレコードを削除して、完了となります。
ジョブ実行中に例外が発生したときの挙動
「取得したジョブの実行」項目でのrun
メソッドで、ジョブが失敗した場合を追いかけてみます。
def run(job)
job_say job, 'RUNNING'
runtime = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
job.destroy
end
job_say job, format('COMPLETED after %.4f', runtime)
return true # did work
rescue DeserializationError => error
job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'
job.error = error
failed(job)
rescue Exception => error # rubocop:disable RescueException
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
return false # work failed
end
ジョブ実行時の例外として2つ定義されています。
- デシリアライズ時のエラー(handlerからYAMLとして読み込む際のエラー)
- その他の例外
それぞれの例外パターンに対する処理を見ていきます。
デシリアライズ時のエラー
rescue DeserializationError => error
job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'
job.error = error
failed(job)
job
にエラー情報がセットされ、failed
メソッドを呼び出します。
def failed(job)
self.class.lifecycle.run_callbacks(:failure, self, job) do
begin
job.hook(:failure)
rescue => error
say "Error when running failure callback: #{error}", 'error'
say error.backtrace.join("\n"), 'error'
ensure
job.destroy_failed_jobs? ? job.destroy : job.fail!
end
end
end
hook
メソッドを使用して、User
クラスにfailure
メソッドがあれば呼び出しています。
その後失敗したジョブのレコードを削除するかどうか判断し、その判断に応じて処理を行っています。
判断処理のjob.destroy_failed_jobs?
はbase.rbで定義されています。
def destroy_failed_jobs?
payload_object.respond_to?(:destroy_failed_jobs?) ? payload_object.destroy_failed_jobs? : Delayed::Worker.destroy_failed_jobs
rescue DeserializationError
Delayed::Worker.destroy_failed_jobs
end
User
クラスへのアダプターを持つpayload_object
はdestroy_failed_jobs
メソッドを持っていないためDelayed::Worker.destroy_failed_jobs
が使用されます。
このdestroy_failed_jobs
は、以前のDelayed::Worker呼び出し時にデフォルトとしてtrueとなっているため、destroy_failed_jobs?
がtrueとなります。
その結果、job.destroy_failed_jobs? ? job.destroy : job.fail!
でjob.destroy
を実行し、レコードを削除します。
その他の例外
rescue Exception => error # rubocop:disable RescueException
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
return false # work failed
end
この例外時では、handle_failed_job
メソッドを呼び出しています。
job.error = error
job_say job, "FAILED (#{job.attempts} prior attempts) with #{error.class.name}: #{error.message}", 'error'
reschedule(job)
end
ジョブのエラー情報をセットし、reschedule
メソッドを呼び出します。
def reschedule(job, time = nil)
if (job.attempts += 1) < max_attempts(job)
time ||= job.reschedule_at
job.run_at = time
job.unlock
job.save!
else
job_say job, "FAILED permanently because of #{job.attempts} consecutive failures", 'error'
failed(job)
end
end
reschedule
メソッドでは、ジョブの失敗回数に応じて再スケジュールするか、そのまま失敗とするかを行っています。
max_attemptsはジョブの最大試行数となっており、以前のDelayed::Worker
呼び出し時にデフォルトで25回がセットされています。
その試行数以内の場合、再スケジュールを行います。
次回実行日時を決めるための、job.reschedule_at
は以下のようになっています。
def reschedule_at
if payload_object.respond_to?(:reschedule_at)
payload_object.reschedule_at(self.class.db_time_now, attempts)
else
self.class.db_time_now + (attempts**4) + 5
end
end
User
クラスへのアダプターを持つpayload_object
を使用し、User
クラスにreschedule_at
メソッドが定義されていれば、そのメソッドから次回実行日時を求めます。
メソッドがない場合、現在の日時に、試行回数の4乗プラスアルファを加えます。
そのため試行回数が多いほど次の再実行までの時間が長くなります。
その後unlock
メソッドでロック日時を解除し、ジョブを保存します。
これにより、再実行日時になるとこのジョブが再実行されます。
ジョブの最大試行回数を超えた場合、failed
メソッドが呼ばれ、以降はデシリアライズ時のエラーと同じ流れ(ジョブのレコード削除)となります。
これらジョブ実行時の失敗を見ると、
- ジョブを再実行しても必ず失敗する(デシリアライズエラー)場合:基本的にレコードを削除
- ジョブを再実行すると成功する可能性がある(その他の例外)の場合:ジョブを再スケジュール
と作られていることがわかります。
おわりに
Delayed::Jobのコードリーディングを行い、ジョブの登録、実行、失敗時の挙動を確認しました。
コードリーディングにより、利用者の視点では気づけなかったジョブの再スケジューリング間隔、一度に実行するジョブ数、気になったところなどを発見できました。
この記事が誰かのお役に立てれば幸いです。
Discussion