🚀

【コードリーディング】 Delayed::Jobのジョブ登録と実行

2021/08/31に公開

はじめに

Ruby on Railsで非同期処理を行う際に、Delayed::Jobを触る機会がありました。
このDelayed::Jobについて動かし方を少し知ってはいたものの、内部処理まではほとんど知らないままでした。
内部構造をほとんど知らなくても動かせることは良いことなのですが、どんな仕組みで動いているのか興味があったので、コードリーディングしながら理解することにしました。

環境

  • Ruby: 2.7.4
  • Ruby on Rails: 6.0.4
  • MySQL: 5.7
  • delayed_job: 4.1.9
  • delayed_job_active_record: 4.1.9

Delayed::Jobとは

https://github.com/collectiveidea/delayed_job

Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are:

  • sending massive newsletters
  • image resizing
  • http downloads
  • updating smart collections
  • updating solr, our search server, after product changes
  • batch imports
  • spam checks

READMEにある通り、バックグラウンドで長いタスクを非同期に実行する一般的なパターンをカプセル化したもの、これがDelayed::Jobとなります。
Shopifyにあったものから抽出したものがDelayed::Jobとなっており、主に、大量のニュースレター送信、イメージリサイズ、ダウンロード、一括インポートなどに使用されていたようです。

ジョブの登録、実行の使い方の詳細に関しては、READMEにありますので、ここでは省略します。

コードリーディング

ここでは主に以下2点についてコードリーディングします。

  • ジョブの登録
  • ワーカーによるジョブの実行

前提

以下Userモデルを作成し、その中でジョブ登録を行っています。

user.rb
class User < ApplicationRecord

  def send_message
    puts '---send_message---'
  end

  def self.send_delay_message
    user = new
    user.save!
    user.delay(priority: 10, run_at: Time.now + 10).send_message 
  end
end

ジョブ登録

それではジョブ登録時の処理についてコードリーディングします。
まずはdelayメソッドからです。

message_sending.rb
    def delay(options = {})
      DelayProxy.new(PerformableMethod, self, options)
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/message_sending.rb#L17

PerfomableMethodはDelayed::Jobのライブラリで定義しているクラスとなります。
https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/performable_method.rb

ここではDelayProxyのインスタンスを作っているのみとなります。
DelayProxyクラスは以下のクラス構造となっており、シンプルなクラスとなっています。

message_sending.rb
  class DelayProxy < Delayed::Compatibility.proxy_object_class
    def initialize(payload_class, target, options)
      @payload_class = payload_class
      @target = target
      @options = options
    end

    # rubocop:disable MethodMissing
    def method_missing(method, *args)
      Job.enqueue({:payload_object => @payload_class.new(@target, method.to_sym, args)}.merge(@options))
    end
    # rubocop:enable MethodMissing
  end

https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/message_sending.rb

delayメソッドの処理が終了したので、user.delay(priority: 10, run_at: Time.now + 10).send_messageでのsend_message実行となります。
このメソッドはDelayProxyクラスには存在しないので、method_missingメソッドが呼び出されます。

message_sending.rb
    def method_missing(method, *args)
      Job.enqueue({:payload_object => @payload_class.new(@target, method.to_sym, args)}.merge(@options))
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/message_sending.rb#L10

payload_classPerformableMethodのため、PerformableMethodのインスタンスを持つハッシュを引数として、enqueueメソッドを呼び出します。
PerformableMethodクラスはアダプターパターンを使用しているため、PerformableMethodクラスのインスタンスを作成することで、指定したクラス(今回は@target、つまりUserクラス)のアダプターとなります。
なお、このJobクラスはdelayed_job_active_recordのライブラリにあるactive_record.rbで定義されており、Delayed::Backend::Baseをincludeしています。

active_record.rb
      class Job < ::ActiveRecord::Base
        include Delayed::Backend::Base

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L33

ではbaseクラスのenqueueメソッドを見てみます。

base.rb
        def enqueue(*args)
          job_options = Delayed::Backend::JobPreparer.new(*args).prepare
          enqueue_job(job_options)
        end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L10

JobPreparerクラスでは、*argsを使用してジョブの各オプションを格納します。格納できるオプションとしては、payload_objectqueuepriorityrun_atがあります。
https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/backend/job_preparer.rb

今回では、

  • priority: 10
  • run_at: Time.now + 10
  • payload_object: @payload_class.new(@target, method.to_sym, args)}.merge(@options)

がジョブのオプションとして格納されています。

その次のenqueue_jobメソッドに移ります。

base.rb
        def enqueue_job(options)
          new(options).tap do |job|
            Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
              job.hook(:enqueue)
              Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
            end
          end
        end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L15

enqueue_jobではジョブを生成し、ジョブ実行前のアクション、ジョブ実行を行っています。
まずnewでは、Delayed::Backend::BaseをincludeしていたJobクラスのインスタンスが生成されます。
その際、baseクラスには代入構文のpayload_objectメソッドが定義されているので、そのメソッドが呼び出されます。

base.rb
      def payload_object=(object)
        @payload_object = object
        self.handler = object.to_yaml
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L66

このメソッドでobject、つまりジョブのオプションを持ったPerformableMethodのインスタンスがYAML化されて、handlerにセットされます。

このnewによるインスタンスが作成されると、このインスタンス(job)に対し、Delayed::Worker.lifecycle.run_callbacksを実行します。

まず、Delayed::Workerが呼び出された時点で、resetメソッドが実行されます。

module Delayed
  class Worker
  ・・・
  end
  ・・・
end
・・・
Delayed::Worker.reset

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L339
このresetメソッドでは、ジョブを登録する上でのデフォルト値がセットされます。

worker.rb
module Delayed
  class Worker # rubocop:disable ClassLength
    DEFAULT_LOG_LEVEL        = 'info'.freeze
    DEFAULT_SLEEP_DELAY      = 5
    DEFAULT_MAX_ATTEMPTS     = 25
    DEFAULT_MAX_RUN_TIME     = 4.hours
    DEFAULT_DEFAULT_PRIORITY = 0
    DEFAULT_DELAY_JOBS       = true
    DEFAULT_QUEUES           = [].freeze
    DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
    DEFAULT_READ_AHEAD       = 5

    cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
                   :default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
                   :read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete,
                   :default_log_level
・・・
    def self.reset
      self.default_log_level = DEFAULT_LOG_LEVEL
      self.sleep_delay       = DEFAULT_SLEEP_DELAY
      self.max_attempts      = DEFAULT_MAX_ATTEMPTS
      self.max_run_time      = DEFAULT_MAX_RUN_TIME
      self.default_priority  = DEFAULT_DEFAULT_PRIORITY
      self.delay_jobs        = DEFAULT_DELAY_JOBS
      self.queues            = DEFAULT_QUEUES
      self.queue_attributes  = DEFAULT_QUEUE_ATTRIBUTES
      self.read_ahead        = DEFAULT_READ_AHEAD
      @lifecycle             = nil
    end
・・・
    self.plugins = [Delayed::Plugins::ClearLocks]
・・・
    self.destroy_failed_jobs = true
・・・
    cattr_accessor :raise_signal_exceptions
    self.raise_signal_exceptions = false

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb
この後呼び出されるDelayed::Worker.lifecycleを見てみます。

worker.rb
    def self.lifecycle
      # In case a worker has not been set up, job enqueueing needs a lifecycle.
      setup_lifecycle unless @lifecycle

      @lifecycle
    end

    def self.setup_lifecycle
      @lifecycle = Delayed::Lifecycle.new
      plugins.each { |klass| klass.new }
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L106
lifecycleメソッドではシングルトンパターンを使用し、@lifecycleを管理しています。

@lifecycleにはresetメソッド実行時に値がnilになったので、setup_lifecycleメソッドを呼び出します。
setup_lifecycleメソッドではDelayed::Lifecycleのインスタンスを作成しています。

lifecycle.rb
module Delayed
・・・
  class Lifecycle
    EVENTS = {
      :enqueue    => [:job],
      :execute    => [:worker],
      :loop       => [:worker],
      :perform    => [:worker, :job],
      :error      => [:worker, :job],
      :failure    => [:worker, :job],
      :invoke_job => [:job]
    }.freeze

    def initialize
      @callbacks = EVENTS.keys.each_with_object({}) do |e, hash|
        hash[e] = Callback.new
      end
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/lifecycle.rb#L15
このLifecycleクラスのインスタンス作成時、それぞれのイベントに応じてCallbackのインスタンスを作成して保持しています。

lifecycle.rb
  class Callback
    def initialize
      @before = []
      @after = []

      # Identity proc. Avoids special cases when there is no existing around chain.
      @around = lambda { |*args, &block| block.call(*args) }
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/lifecycle.rb#L55
Callbackクラスではコールバック実行時や実行前後の準備を行っています。

ではself.setup_lifecycleメソッドの処理に戻り、pluginsのそれぞれに対し、Delayed::Plugins::ClearLocksクラスのインスタンスを作成します。
これにより、setup_lifecycleメソッドの処理がすべて終了するため、Delayed::Worker.lifecycle.run_callbacksのメソッドの中身を見てみます。

lifecycle.rb
    def run_callbacks(event, *args, &block)
      missing_callback(event) unless @callbacks.key?(event)

      unless EVENTS[event].size == args.size
        raise ArgumentError, "Callback #{event} expects #{EVENTS[event].size} parameter(s): #{EVENTS[event].join(', ')}"
      end

      @callbacks[event].execute(*args, &block)
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/lifecycle.rb#L33

run_callbacksメソッドでは、主に引数のイベント名(event)がDelayed::Jobライブラリに定義されているならば、そのイベントのブロックを実行する処理となっています。

run_callbacksを呼び出したときのイベント名はenqueueであり、このライブラリに存在するので、@callbacks[event].executeメソッドを実行します。
@callbacksにはLifecycleの初期化時にCallbackのインスタンスがあるので、Callbackexecuteメソッドを実行します。

lifecycle.rb
    def execute(*args, &block)
      @before.each { |c| c.call(*args) }
      result = @around.call(*args, &block)
      @after.each { |c| c.call(*args) }
      result
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/lifecycle.rb#L64

今回はジョブ実行前後のbeforeafterで行う処理として何も設定していないので、@around.callのみ実行します。
@around@around = lambda { |*args, &block| block.call(*args) }となっているため、run_callbackのブロックを実行します。

base.rb
        def enqueue_job(options)
          new(options).tap do |job|
            Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
              job.hook(:enqueue)
              Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
            end
          end
        end

ブロックでは、ジョブのフックを行った後にジョブの登録を行っています。
まずはフック処理からです。

base.rb
      def hook(name, *args)
        if payload_object.respond_to?(name)
          method = payload_object.method(name)
          method.arity.zero? ? method.call : method.call(self, *args)
        end
      rescue DeserializationError # rubocop:disable HandleExceptions
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L98

payload_objectはbase.rbで定義しているメソッドがあります。

base.rb
      def payload_object
        @payload_object ||= YAML.load_dj(handler)
      rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
        raise DeserializationError, "Job failed to load: #{e.message}. Handler: #{handler.inspect}"
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L71
ここでのhandlerは、以前作成されたPerformableMethodのインスタンスをYAML化したものがセットされています。
load_djメソッドはsyck_ext.rbで定義されています。

syck_ext.rb
module YAML
  def load_dj(yaml)
    # See https://github.com/dtao/safe_yaml
    # When the method is there, we need to load our YAML like this...
    respond_to?(:unsafe_load) ? load(yaml, :safe => false) : load(yaml)
  end
end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/syck_ext.rb#L37
この処理により、PerformableMethodのインスタンスがpayload_objectにセットされます。

hookメソッドに戻ります。
payload_objectであるPerformableMethodのインスタンスはUserクラスのアダプターのため、Userクラスにname(ここではenqueue)があるかどうか確認します。
そのメソッドがあれば実行となりますが、今回はUserクラスに定義していないので、hookメソッドでは何もしないままとなります。

次に、Delayed::Worker.delay_job?(job) ? job.save : job.invoke_jobです。

worker.rb
    def self.delay_job?(job)
      if delay_jobs.is_a?(Proc)
        delay_jobs.arity == 1 ? delay_jobs.call(job) : delay_jobs.call
      else
        delay_jobs
      end
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L122

delay_jobsDelayed::Workerが最初に呼ばれた時点でデフォルト値がtrueとなっており変わっていないので、そのままdelay_jobsの値が返ります。
そのため、Delayed::Worker.delay_job?(job) ? job.save : job.invoke_jobではjob.saveが実行され、delayed_jobsテーブルに保存されます。

ここまでの処理を行うことで、ジョブの登録が完了します。

ワーカーによるジョブの実行

ジョブの実行については、

$ bundle exec rails jobs:work

でワーカーを実行し、delayed_jobsテーブルに登録されたジョブを実行できます。
この仕組みを追っていきます。

実行する際に、railtie.rbのtaskを実行します。

railtie.rb
    rake_tasks do
      load 'delayed/tasks.rb'
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/railtie.rb#L14
これによりtasks.rbが読み込まれ、Delayed::Jobで定義しているタスクを実行できるようになります。

tasks.rb
namespace :jobs do
  desc 'Clear the delayed_job queue.'
  task :clear => :environment do
    Delayed::Job.delete_all
  end

  desc 'Start a delayed_job worker.'
  task :work => :environment_options do
    Delayed::Worker.new(@worker_options).start
  end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/tasks.rb#L8

jobs:workを実行すると、workタスク実行前にenvironment_optionsタスクが呼び出されます。

tasks.rb
  task :environment_options => :environment do
    @worker_options = {
      :min_priority => ENV['MIN_PRIORITY'],
      :max_priority => ENV['MAX_PRIORITY'],
      :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
      :quiet => ENV['QUIET']
    }

    @worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY']
    @worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD']
  end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/tasks.rb#L17
ここではワーカー実行時に最小・最大の優先度やキュー数などの初期設定を環境変数から読み込んでいます。
今回は何も設定していないので、初期設定は何もありません。

それではDelayed::Worker.new(@worker_options).startを見ていきます。
まず、Delayed::Worker.new(@worker_options)からです。
ジョブの登録時でも記載したとおり、Delayed::Workerクラス呼び出しした際にはresetメソッドが呼ばれ、ジョブの初期設定が行われます。
resetメソッドについては事前に記載しているため、その後のインスタンスについて見てみます。

worker.rb
    def initialize(options = {})
      @quiet = options.key?(:quiet) ? options[:quiet] : true
      @failed_reserve_count = 0

      [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
        self.class.send("#{option}=", options[option]) if options.key?(option)
      end

      # Reset lifecycle on the offhand chance that something lazily
      # triggered its creation before all plugins had been registered.
      self.class.setup_lifecycle
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L130
オプションがあればworkerクラスにそれぞれセットします。
その後、以前に記載したsetup_lifecycleメソッドにより、Lifecycleクラスのインスタンスが作成されます。

Delayed::Workerクラスのインスタンス作成による処理はここまでなので、次のstartメソッドを見てみます。

worker.rb
    def start # rubocop:disable CyclomaticComplexity, PerceivedComplexity
      trap('TERM') do
        Thread.new { say 'Exiting...' }
        stop
        raise SignalException, 'TERM' if self.class.raise_signal_exceptions
      end

      trap('INT') do
        Thread.new { say 'Exiting...' }
        stop
        raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
      end

      say 'Starting job worker'

      self.class.lifecycle.run_callbacks(:execute, self) do
        loop do
          self.class.lifecycle.run_callbacks(:loop, self) do
            @realtime = Benchmark.realtime do
              @result = work_off
            end
          end

          count = @result[0] + @result[1]

          if count.zero?
            if self.class.exit_on_complete
              say 'No more jobs available. Exiting'
              break
            elsif !stop?
              sleep(self.class.sleep_delay)
              reload!
            end
          else
            say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last)
          end

          break if stop?
        end
      end
    end

    def stop
      @exit = true
    end

    def stop?
      !!@exit
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L156
startメソッドの中盤までは、trapを使用したシグナルに対する中断処理があります。
シグナルについて:https://atmarkit.itmedia.co.jp/ait/articles/1708/04/news015.html

その後、run_callbackのブロックに入りループが開始されます。
このループ処理では、以下2つの条件のどちらかが成り立つと、ループ処理が中断します。

  • 実行対象のジョブがなくなり、ワーカー実施時のオプションexit_on_completeがtrueの場合
  • ワーカー実行中にstopメソッドがどこかで呼ばれた場合

それ以外の場合、sleep_delayの時間分インターバルをおいて、ループ処理続行となります。
以前のresetメソッド実行時に、sleep_delayがデフォルト5秒となっているため、とくに指定がなければ5秒間隔でループ続行となります。

それではrun_callbackブロック内にある

worker.rb
            @realtime = Benchmark.realtime do
              @result = work_off
            end

を深堀ります。
Benchmark.realtimeはブロックの実行時間の計測を行うものなので、work_offメソッドに移ります。
https://docs.ruby-lang.org/ja/latest/method/Benchmark/m/realtime.html

worker.rb
    def work_off(num = 100)
      success = 0
      failure = 0

      num.times do
        case reserve_and_run_one_job
        when true
          success += 1
        when false
          failure += 1
        else
          break # leave if no work could be done
        end
        break if stop? # leave if we're exiting
      end

      [success, failure]
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L208

ここでは、ジョブの取り出しと実行を行い、その成功・失敗に応じてフラグをカウントアップし、結果を返すものとなります。
numが100と固定になっているため、Delayed::Jobでは一度にジョブを100個取り出して実行する仕組みとなっています。
それでは実際の取り出しと実行について、reserve_and_run_one_jobメソッドを見てみます。

worker.rb
    def reserve_and_run_one_job
      job = reserve_job
      self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L310

ここはメソッド名の通り、ジョブの取り出しと実行を行うのみとなります。
それぞれ処理を見ていきます。

ジョブの取得

worker.rb
    def reserve_job
      job = Delayed::Job.reserve(self)
      @failed_reserve_count = 0
      job
    rescue ::Exception => error # rubocop:disable RescueException
      say "Error while reserving job: #{error}"
      Delayed::Job.recover_from(error)
      @failed_reserve_count += 1
      raise FatalBackendError if @failed_reserve_count >= 10
      nil
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L315

Delayed::Job.reserve(self)を追いかけます。

active_record.rb
        def self.reserve(worker, max_run_time = Worker.max_run_time)
          ready_scope =
            ready_to_run(worker.name, max_run_time)
            .min_priority
            .max_priority
            .for_queues
            .by_priority

          reserve_with_scope(ready_scope, worker, db_time_now)
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L77

このメソッドでは、ジョブの取り出しに使用するスコープを作成して取り出します。
Worker.max_run_timeresetメソッド実行時にデフォルト4時間としてセットされています。
そのため、ワーカー名とmax_run_timeを使用したready_to_runメソッドを読んでいきます。

active_record.rb
        def self.ready_to_run(worker_name, max_run_time)
          where(
            "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
            db_time_now,
            db_time_now - max_run_time,
            worker_name
          )
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L55
ここではwhere句を組み立てるのみとなります。
なお、db_time_nowはRuby on Railsが動いているプロセスのタイムゾーンの現在時刻を取ってきています。

active_record.rb
        def self.db_time_now
          if Time.zone
            Time.zone.now
          elsif ::ActiveRecord::Base.default_timezone == :utc
            Time.now.utc
          else
            Time.now # rubocop:disable Rails/TimeZone
          end
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L172

それではready_to_runメソッドの呼び出し元に戻り、チェーンされていたメソッドを見ていきます。

          ready_scope =
            ready_to_run(worker.name, max_run_time)
            .min_priority
            .max_priority
            .for_queues
            .by_priority

min_prioritymax_priorityfor_queuesby_priorityはそれぞれscopeとして定義されています。

active_record.rb
        scope :by_priority, lambda { order("priority ASC, run_at ASC") }
        scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
        scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
        scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L41

このうちby_priority以外に関しては、ワーカー実行時にオプションを指定していないので、Worker.min_priorityWorker.max_prioritynilWorker.queue[]となり、スコープが作成されません。
そのため、by_priorityのみを使用してスコープが作られます。

結果として、ready_scopeにセットされるwhere句とorder句は以下のようになります。

(
  (
    run_at <= db_time_now
    AND
    (
      locked_at IS NULL
      OR
      locked_at < (db_time_now - max_run_time)
    )
  )
  OR
  locked_by = worker_name
)
AND failed_at IS NULL
order priority ASC, run_at ASC

スコープが作られたので、reserve_with_scopeメソッドを見ていきます。

active_record.rb
        def self.reserve_with_scope(ready_scope, worker, now)
          case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
          # Optimizations for faster lookups on some common databases
          when :optimized_sql
            reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
          # Slower but in some cases more unproblematic strategy to lookup records
          # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
          when :default_sql
            reserve_with_scope_using_default_sql(ready_scope, worker, now)
          end
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L88

ここでは、発行するSQLのストラテジが、最適化したSQLか通常のSQLかによって、対応するメソッドを呼び出しています。
ストラテジについてDelayed::Backend::ActiveRecord.configurationの呼び出しを見てみます。

active_record.rb
  module Backend
    module ActiveRecord
      class Configuration
        attr_reader :reserve_sql_strategy

        def initialize
          self.reserve_sql_strategy = :optimized_sql
        end
・・・
      def self.configuration
        @configuration ||= Configuration.new
      end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L23

configurationメソッドの呼び出しによりこのクラスのインスタンスを作成しています。
こちらにもシングルトンパターンが使用されています。
インスタンス作成時に、ストラテジのデフォルト値としてoptimized_sqlがセットされます。
今回はストラテジに何もセットしていないので、Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategyoptimized_sqlとなります。

そのため、reserve_with_scope_using_optimized_sqlメソッドを呼び出します。

active_record.rb
        def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
          case connection.adapter_name
          when "PostgreSQL", "PostGIS"
            reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
          when "MySQL", "Mysql2"
            reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
          when "MSSQL", "Teradata"
            reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
          # Fallback for unknown / other DBMS
          else
            reserve_with_scope_using_default_sql(ready_scope, worker, now)
          end
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L100

このメソッドでは、DBの種類に応じて対応するSQL発行のメソッドを呼び出しています。
今回使用しているDBはMySQLなので、reserve_with_scope_using_optimized_mysqlメソッドを使用します。

active_record.rb
        def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
          # Removing the millisecond precision from now(time object)
          # MySQL 5.6.4 onwards millisecond precision exists, but the
          # datetime object created doesn't have precision, so discarded
          # while updating. But during the where clause, for mysql(>=5.6.4),
          # it queries with precision as well. So removing the precision
          now = now.change(usec: 0)
          # This works on MySQL and possibly some other DBs that support
          # UPDATE...LIMIT. It uses separate queries to lock and return the job
          count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
          return nil if count == 0

          where(locked_at: now, locked_by: worker.name, failed_at: nil).first
        end

https://github.com/collectiveidea/delayed_job_active_record/blob/d65b0f9900f5b0c78c341c7c0209c2d138d64ec5/lib/delayed/backend/active_record.rb#L139
メソッド内のコメントでもわかる通り、MySQL 5.6.4以降はdatetimeに小数点がサポートされていますが、デフォルトの精度が0(小数点なし)のため、changeを使用して小数点を削除(0に統一)しています。
https://dev.mysql.com/doc/refman/5.6/ja/date-and-time-type-overview.html

次にready_scopeの条件から最初の1件を取り出し、locked_atを現在時刻、locked_byをワーカー名で更新しています。
この更新したレコードがなければ実行ジョブなしとしてreturnしています。
もしレコードがあれば、さきほどのupdate_allメソッドで使用した条件に加え、failed_atnilのレコードを1件取り出します。

ジョブの取得はここまでとなります。

取得したジョブの実行

ジョブを取得したので、self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if jobを見ていきます。

worker.rb
    def reserve_and_run_one_job
      job = reserve_job
      self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
    end

runメソッドは以下のとおりです。

worker.rb
    def run(job)
      job_say job, 'RUNNING'
      runtime = Benchmark.realtime do
        Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
        job.destroy
      end
      job_say job, format('COMPLETED after %.4f', runtime)
      return true # did work
    rescue DeserializationError => error
      job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'

      job.error = error
      failed(job)
    rescue Exception => error # rubocop:disable RescueException
      self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
      return false # work failed
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L227
このメソッドではジョブを実行し、実行後のジョブを削除する処理となります。
Benchmark.realtimeメソッドは以前説明したとおりなのでおいておき、そのブロックを確認します。
max_run_timeはデフォルト4時間のため、ジョブのタイムアウト時間は通常4時間となります。
またタイムアウトしたときに出すメッセージはWorkerTimeoutクラスで定義しています。
https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/exceptions.rb

タイムアウトが設定されたので、job.invoke_jobの中身を見てみます。

base.rb
      def invoke_job
        Delayed::Worker.lifecycle.run_callbacks(:invoke_job, self) do
          begin
            hook :before
            payload_object.perform
            hook :success
          rescue Exception => e # rubocop:disable RescueException
            hook :error, e
            raise e
          ensure
            hook :after
          end
        end
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L77
hookメソッドは以前記載したとおり、引数に指定したメソッド名を、PerformableMethodクラスを経由してアダプター先のクラス(User)のメソッドとして呼び出します。
ここではperformメソッドの前後でbeforesuccessメソッドがUserクラスにあれば呼び出します。
今回は定義していないので、payload_object.performのみ行われます。
payload_objectPerformableMethodクラスのインスタンスなので、そのクラスのperformメソッドを見てみます。

performable_method.rb
    def perform
      object.send(method_name, *args) if object
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/performable_method.rb#L25
objectや、method_name、*argsについては、ジョブの登録時にPerformableMethodクラスのインスタンス作成で引数にセットしていた値が入るので、それぞれUserクラスのインスタンス、message_sending、引数なしとなります。
そのためここで、Userクラスのmessage_sendingメソッドが実行されます。

ジョブの実行が終了するので、最後にjob.destroyを行いジョブのレコードを削除して、完了となります。

ジョブ実行中に例外が発生したときの挙動

「取得したジョブの実行」項目でのrunメソッドで、ジョブが失敗した場合を追いかけてみます。

worker.rb
    def run(job)
      job_say job, 'RUNNING'
      runtime = Benchmark.realtime do
        Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
        job.destroy
      end
      job_say job, format('COMPLETED after %.4f', runtime)
      return true # did work
    rescue DeserializationError => error
      job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'

      job.error = error
      failed(job)
    rescue Exception => error # rubocop:disable RescueException
      self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
      return false # work failed
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L227

ジョブ実行時の例外として2つ定義されています。

  • デシリアライズ時のエラー(handlerからYAMLとして読み込む際のエラー)
  • その他の例外

それぞれの例外パターンに対する処理を見ていきます。

デシリアライズ時のエラー
worker.rb
    rescue DeserializationError => error
      job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error'

      job.error = error
      failed(job)

jobにエラー情報がセットされ、failedメソッドを呼び出します。

worker.rb
    def failed(job)
      self.class.lifecycle.run_callbacks(:failure, self, job) do
        begin
          job.hook(:failure)
        rescue => error
          say "Error when running failure callback: #{error}", 'error'
          say error.backtrace.join("\n"), 'error'
        ensure
          job.destroy_failed_jobs? ? job.destroy : job.fail!
        end
      end
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L259

hookメソッドを使用して、Userクラスにfailureメソッドがあれば呼び出しています。
その後失敗したジョブのレコードを削除するかどうか判断し、その判断に応じて処理を行っています。
判断処理のjob.destroy_failed_jobs?はbase.rbで定義されています。

base.rb
      def destroy_failed_jobs?
        payload_object.respond_to?(:destroy_failed_jobs?) ? payload_object.destroy_failed_jobs? : Delayed::Worker.destroy_failed_jobs
      rescue DeserializationError
        Delayed::Worker.destroy_failed_jobs
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L129

Userクラスへのアダプターを持つpayload_objectdestroy_failed_jobsメソッドを持っていないためDelayed::Worker.destroy_failed_jobsが使用されます。
このdestroy_failed_jobsは、以前のDelayed::Worker呼び出し時にデフォルトとしてtrueとなっているため、destroy_failed_jobs?がtrueとなります。

その結果、job.destroy_failed_jobs? ? job.destroy : job.fail!job.destroyを実行し、レコードを削除します。

その他の例外
worker.rb
    rescue Exception => error # rubocop:disable RescueException
      self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
      return false # work failed
    end

この例外時では、handle_failed_jobメソッドを呼び出しています。

worker.rb
      job.error = error
      job_say job, "FAILED (#{job.attempts} prior attempts) with #{error.class.name}: #{error.message}", 'error'
      reschedule(job)
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L302

ジョブのエラー情報をセットし、rescheduleメソッドを呼び出します。

worker.rb
    def reschedule(job, time = nil)
      if (job.attempts += 1) < max_attempts(job)
        time ||= job.reschedule_at
        job.run_at = time
        job.unlock
        job.save!
      else
        job_say job, "FAILED permanently because of #{job.attempts} consecutive failures", 'error'
        failed(job)
      end
    end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/worker.rb#L247

rescheduleメソッドでは、ジョブの失敗回数に応じて再スケジュールするか、そのまま失敗とするかを行っています。
max_attemptsはジョブの最大試行数となっており、以前のDelayed::Worker呼び出し時にデフォルトで25回がセットされています。
その試行数以内の場合、再スケジュールを行います。
次回実行日時を決めるための、job.reschedule_atは以下のようになっています。

base.rb
      def reschedule_at
        if payload_object.respond_to?(:reschedule_at)
          payload_object.reschedule_at(self.class.db_time_now, attempts)
        else
          self.class.db_time_now + (attempts**4) + 5
        end
      end

https://github.com/collectiveidea/delayed_job/blob/baed6e813870e1144e7a4291bc71e06a67a533de/lib/delayed/backend/base.rb#L106
Userクラスへのアダプターを持つpayload_objectを使用し、Userクラスにreschedule_atメソッドが定義されていれば、そのメソッドから次回実行日時を求めます。
メソッドがない場合、現在の日時に、試行回数の4乗プラスアルファを加えます。
そのため試行回数が多いほど次の再実行までの時間が長くなります。

その後unlockメソッドでロック日時を解除し、ジョブを保存します。
これにより、再実行日時になるとこのジョブが再実行されます。

ジョブの最大試行回数を超えた場合、failedメソッドが呼ばれ、以降はデシリアライズ時のエラーと同じ流れ(ジョブのレコード削除)となります。

これらジョブ実行時の失敗を見ると、

  • ジョブを再実行しても必ず失敗する(デシリアライズエラー)場合:基本的にレコードを削除
  • ジョブを再実行すると成功する可能性がある(その他の例外)の場合:ジョブを再スケジュール

と作られていることがわかります。

おわりに

Delayed::Jobのコードリーディングを行い、ジョブの登録、実行、失敗時の挙動を確認しました。
コードリーディングにより、利用者の視点では気づけなかったジョブの再スケジューリング間隔、一度に実行するジョブ数、気になったところなどを発見できました。
この記事が誰かのお役に立てれば幸いです。

Discussion