🐥

【コードリーディング】Active Jobのジョブ登録と実行

2021/10/11に公開

はじめに

Railsで非同期処理を触る際、前回執筆したDelayed::Jobに加えて、Active Jobも触る機会がありました。
こちらも内部処理についてコードリーディングしながら理解することにしました。

ActiveJobとは

Railsガイドから引用します。

Active Jobは、ジョブを宣言し、それによってバックエンドでさまざまな方法によるキュー操作を実行するためのフレームワークです。

またActive Jobの目的は以下のとおりです。

Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。

https://railsguides.jp/active_job_basics.html

Active Jobは、2014年12月20日にリリースされたRails 4.2から導入されています。
https://railsguides.jp/4_2_release_notes.html
その4.2がリリースされる前から、すでに非同期ライブラリが存在していました。
例として

があります。

これらのライブラリは呼び出し方がそれぞれ異なりますが、Active Jobは、その呼び出し方の違いを吸収し、呼び出し方法をまとめています。そのため開発者はライブラリを気にすることなく呼び出すことができます。

Active Jobの使い方についてはRailsガイドにありますので、ここでは省略します。

コードリーディング

ここでは主に以下2点についてコードリーディングします。

  • デフォルト設定でのジョブ登録と実行
  • 外部の非同期ライブラリとの連携

環境

Ruby: 2.7.4
Ruby on Rails: 6.1.0

前提

Railsガイドをもとに、ジョブ実行のクラスを定義します。

application_job.rb
class ApplicationJob < ActiveJob::Base
end
guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*args)
    # 後で実行したい作業をここに書く
  end
end

今回、このジョブを実行するコードについて読んでいきます。
なお、本記事ではActive Job以外のコードについては、コードは記載せず概要のみ記載することとしています。

Rails起動時に読み込まれるActiveJobモジュールについて

Rails起動時、ActiveJobモジュールのautoloadで指定している各クラスやモジュールが読み込まれます。
ここでは、本記事に必要な部分だけ記載します。

active_job.rb
module ActiveJob
  extend ActiveSupport::Autoload

  autoload :Base
  autoload :QueueAdapters
  autoload :Serializers
  autoload :ConfiguredJob
  autoload :TestCase
  autoload :TestHelper
end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job.rb#L31

Baseクラスを読み込んでいるので、その中身を見てみます。

base.rb
  class Base
    include Core
    include QueueAdapter
    include QueueName
    include QueuePriority
    include Enqueuing
    include Execution
    include Callbacks
    include Exceptions
    include Logging
    include Instrumentation
    include Timezones
    include Translation

    ActiveSupport.run_load_hooks(:active_job, self)
  end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/base.rb#L63

ジョブの登録、実行に必要な各モジュールがここでincludeされています。

QueueAdapterモジュールの読み込み

QueueAdapterモジュール読み込み時、以下included内部の処理が実行されます。

queue_adapter.rb
module ActiveJob
  module QueueAdapter #:nodoc:
    extend ActiveSupport::Concern

    included do
      class_attribute :_queue_adapter_name, instance_accessor: false, instance_predicate: false
      class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false

      delegate :queue_adapter, to: :class

      self.queue_adapter = :async
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapter.rb#L11

class_attributeはActiveSupportで定義されており、与えられた最初のシンボルをクラスメソッドのセッター/ゲッターメソッドとして動的に定義するメソッドとなります。
そのため、_queue_adapter_nameメソッド、_queue_adapterメソッドが作られ、初期値nilとして定義されます。

その次のdelegateでは、queue_adapterの呼び出しをクラスメソッドとして呼び出すことを表しています。

最後にqueue_adapter:asyncがセットされます。このセットにおいてqueue_adapterメソッドが呼び出されます。

queue_adatper.rb
      def queue_adapter=(name_or_adapter)
        case name_or_adapter
        when Symbol, String
          queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
          assign_adapter(name_or_adapter.to_s, queue_adapter)
        else
          if queue_adapter?(name_or_adapter)
            adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
            assign_adapter(adapter_name, name_or_adapter)
          else
            raise ArgumentError
          end
        end
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapter.rb#L37

name_or_adapterがシンボルなのでActiveJob::QueueAdapters.lookup(name_or_adapter).newが使用されます。
lookupメソッドを見てみます。

queue_adapters.rb
      def lookup(name)
        const_get(name.to_s.camelize << ADAPTER)
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters.rb#L136

ここではアダプタ名に応じたアダプタクラスを取得する処理となります。
ADAPTERADAPTER = "Adapter"と定義されているため、今回のlookupメソッドではAsyncAdapterを取得します。
そのため、先程のActiveJob::QueueAdapters.lookup(name_or_adapter).newActiveJob::QueueAdapters.AsyncAdatper.newとなります。

AsncAdapterクラスのインスタンス作成時、初期化メソッドが実行されます。

async_adapter.rb
      def initialize(**executor_options)
        @scheduler = Scheduler.new(**executor_options)
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L35
Schedulerクラスのインスタンスを作成しているので、その先を見てみます。

async_adapter.rb
      class Scheduler #:nodoc:
        DEFAULT_EXECUTOR_OPTIONS = {
          min_threads:     0,
          max_threads:     Concurrent.processor_count,
          auto_terminate:  true,
          idletime:        60, # 1 minute
          max_queue:       0, # unlimited
          fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
        }.freeze

        attr_accessor :immediate

        def initialize(**options)
          self.immediate = false
          @immediate_executor = Concurrent::ImmediateExecutor.new
          @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
        end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L86

ジョブの即時実行かどうかを表すimmediatefalseとなっており、その後に即時実行を行うインスタンス(Concurrent::ImmediateExecutor)、非同期での実行を行うインスタンス(Concurrent::ThreadPoolExecutor.new)を作成しています。
immediateが初期値falseとなっていることから、基本的に非同期での実行を意図しているようです。

queue_adapterメソッドの処理に戻ります。上記の後のassign_adapterメソッドは以下のとおりです。

queue_adapter.rb
        def assign_adapter(adapter_name, queue_adapter)
          self._queue_adapter_name = adapter_name
          self._queue_adapter = queue_adapter
        end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapter.rb#L53

ここではアダプタ名やアダプタクラスのインスタンスをセットする処理となります。
adapter_name:asyncqueue_adapterAsyncAdatperのインスタンスなので、前述した_queue_adapter_nameメソッド、_queue_adapterにセットされます。

QueueAdapterモジュールの読み込みはここまでとなります。

QueueNameモジュールの読み込み

QueueNameモジュールを見てみます。

queue_name.rb
module ActiveJob
  module QueueName
    extend ActiveSupport::Concern

    module ClassMethods
      mattr_accessor :default_queue_name, default: "default"
      ・・・
    end
    ・・・
    included do
      class_attribute :queue_name, instance_accessor: false, default: -> { self.class.default_queue_name }
      class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
      class_attribute :queue_name_prefix
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_name.rb#L55

class_attributesについては上記で見た通りなので、

  • queue_name: self.class.default_queue_name、つまりdefault
  • queue_name_delimiter: _
  • queue_name_prefix: nil
    がセットされます。

デフォルト設定でのジョブ登録と実行

config/application.rbでバックグラウンドの非同期ライブラリを指定できます。デフォルトでは指定なしとなっています。

以下コードでジョブ登録を行います。

GuestsCleanupJob.perform_later

ここからコードを追っていきます。
perform_laterメソッドは、ActiveJob::BaseincludeされているEnqueuingモジュールで定義されています。

base.rb
  class Base
    include Core
    include QueueAdapter
    include QueueName
    include QueuePriority
    include Enqueuing
    include Execution
    include Callbacks
    include Exceptions
    include Logging
    include Instrumentation
    include Timezones
    include Translation

    ActiveSupport.run_load_hooks(:active_job, self)
  end

https://github.com/rails/rails/blob/4b74021e4e2ebc7948b7fd74dff62b2ab309492e/activejob/lib/active_job/base.rb#L68

enqueuing.rb
      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

      private
        def job_or_instantiate(*args) # :doc:
          args.first.is_a?(self) ? args.first : new(*args)
        end

https://github.com/rails/rails/blob/4b74021e4e2ebc7948b7fd74dff62b2ab309492e/activejob/lib/active_job/enqueuing.rb#L21

perform_laterメソッドには引数を渡していないので、job_or_instantiateメソッドで自身のインスタンスを作ります。

このときRails起動時に読み込んでいたBaseクラスのCoreモジュールの初期化メソッドが実行されます。

core.rb
    def initialize(*arguments)
      @arguments  = arguments
      @job_id     = SecureRandom.uuid
      @queue_name = self.class.queue_name
      @priority   = self.class.priority
      @executions = 0
      @exception_executions = {}
      @timezone   = Time.zone&.name
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/core.rb#L81

ここではそれぞれインスタンス変数にそれぞれ初期値をセットするのみとなります。
なお@argumentsに関しては、perform_laterメソッド実行時に引数を与えていないためnilとなります。

初期化終了後、job_or_instantiate(*args).enqueueの処理に戻り、ジョブ登録のenqueueメソッドを呼び出します

enqueuing.rb
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          queue_adapter.enqueue_at self, scheduled_at
        else
          queue_adapter.enqueue self
        end

        successfully_enqueued = true
      end

      if successfully_enqueued
        self
      else
        false
      end
    end

https://github.com/rails/rails/blob/4b74021e4e2ebc7948b7fd74dff62b2ab309492e/activejob/lib/active_job/enqueuing.rb#L48

enqueueメソッドでは、キューのアダプタに対してジョブの登録を行っています。

perform_laterメソッドの引数にスケジュールの日時、キュー名、優先度があれば取り出してセットしています。
その後、run_callbacksメソッドのブロック内でキューアダプターに対するenqueueメソッドを実行しています。
なお、run_callbacksメソッドはActive Supportで定義されているメソッドであり、指定したシンボルに対してbeforeやafterの処理をセットすることで、ブロックの処理前後にbeforeやafterの処理を実行できます。

今回perform_laterメソッドでスケジュール日時を引数で指定していないので、キューアダプターのenqueueを呼び出します。
このときのqueue_adapterを見てみます。

queue_adatper.rb
      def queue_adapter
        _queue_adapter
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapter.rb#L24

前述した_queue_adapterメソッドにAsyncAdatperクラスのインスタンスがセットされているので、queue_adapter.enqueueAsyncAdatperクラスのインスタンスメソッドであるenqueueの呼び出しとなります。

AsyncAdatperクラスのenqueueメソッドを見てみます。

async_adapter.rb
    class AsyncAdapter
      def initialize(**executor_options)
        @scheduler = Scheduler.new(**executor_options)
      end

      def enqueue(job) #:nodoc:
        @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L39

ここでは@schedulerenqueueメソッドを呼び出すのみとなっています。
なお、@schedulerはRails起動時にAsyncAdapterクラスのインスタンスがセットされています。

引数にセットされているJobWrapperクラスのインスタンス作成では以下のようになっています。

async_adapter.rb
      class JobWrapper #:nodoc:
        def initialize(job)
          job.provider_job_id = SecureRandom.uuid
          @job_data = job.serialize
        end

特に処理は行っておらず、jobidのセットと、GuestsCleanupJobクラスを表すjobのシリアライズ情報をセットしているのみとなります。

このserializeメソッドはCoreモジュールで定義されています。

core.rb
    def serialize
      {
        "job_class"  => self.class.name,
        "job_id"     => job_id,
        "provider_job_id" => provider_job_id,
        "queue_name" => queue_name,
        "priority"   => priority,
        "arguments"  => serialize_arguments_if_needed(arguments),
        "executions" => executions,
        "exception_executions" => exception_executions,
        "locale"     => I18n.locale.to_s,
        "timezone"   => timezone,
        "enqueued_at" => Time.now.utc.iso8601
      }
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/core.rb#L94

ここでジョブのキュー名や優先度などをハッシュとして設定しています。

@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_nameの処理に戻ります。
queue_nameに関しては、Rails起動時にQueueNameモジュールにてセットされたqueue_nameの値、つまりdefaultがセットされています。

それでは、引数や呼び出しメソッドがわかったので、@scheduler.enqueueの処理を見てみます。

async_adapter.rb
        def enqueue(job, queue_name:)
          executor.post(job, &:perform)
        end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L92

executorはメソッドがあり、以下のように即時実行か非同期実行かを選べる処理となります。

async_adapter.rb
        def executor
          immediate ? @immediate_executor : @async_executor
        end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L110

immediatefalseなので、非同期のexecutor(Concurrent::ThreadPoolExecutor)が選ばれます。
postメソッド内部はRubyのコードになるため、以下内部処理の概略を載せます。

Concurrent::ThreadPoolExecutorpostメソッドを実行することで、引数に渡したperformpostメソッド内部でProcにセットされます。
またjobが引数で渡されていることにより、postメソッド内部でProccall時に、JobWrapperクラスのperformを呼び出すようになっています。

それではJobWrapperクラスのperformメソッドを見てみます。

async_adapter.rb
        def perform
          Base.execute @job_data
        end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L69

@job_dataは、以前JobWrapperクラスのinitializeメソッドの引数で渡されたjobGuestsCleanupJobクラスのインスタンス)をシリアライズしたものとなっています。

executeメソッドはExecutionモジュールで定義されているので、Executionモジュールを見てみます。

execution.rb
      def execute(job_data) #:nodoc:
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/execution.rb#L22

先程のシリアライズした@job_dataをデシリアライズし、perform_nowメソッドを呼び出してすぐさまジョブを実行するメソッドとなっています。

このデシリアライズはCoreモジュールで定義されています。

core.rb
    def deserialize(job_data)
      self.job_id               = job_data["job_id"]
      self.provider_job_id      = job_data["provider_job_id"]
      self.queue_name           = job_data["queue_name"]
      self.priority             = job_data["priority"]
      self.serialized_arguments = job_data["arguments"]
      self.executions           = job_data["executions"]
      self.exception_executions = job_data["exception_executions"]
      self.locale               = job_data["locale"] || I18n.locale.to_s
      self.timezone             = job_data["timezone"] || Time.zone&.name
      self.enqueued_at          = job_data["enqueued_at"]
    end

シリアライズメソッドとは逆に、ハッシュとして持っていた情報をそれぞれセットしています。

次のperform_nowメソッドを見てみます。

execution.rb
    def perform_now
      # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
      self.executions = (executions || 0) + 1

      deserialize_arguments_if_needed

      run_callbacks :perform do
        perform(*arguments)
      end
    rescue => exception
      rescue_with_handler(exception) || raise
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/execution.rb#L41

このメソッドでは、ActiveJobを継承したクラスのperformメソッドに対し、引数をそえて実行する処理となります。
deserialize_arguments_if_neededメソッドはCoreモジュールで定義されています。

      def deserialize_arguments_if_needed
        if arguments_serialized?
          @arguments = deserialize_arguments(@serialized_arguments)
          @serialized_arguments = nil
        end
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/core.rb#L158

performメソッドのための引数が事前にセットされていればデシリアライズしてセットするメソッドとなります。
if文の前にarguments_serialized?メソッドがあるので、その呼び出し先を見てみます。

core.rb
      def arguments_serialized?
        defined?(@serialized_arguments) && @serialized_arguments
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/core.rb#L173

引数がセットされていないので、ここはfalseとなります。そのため、deserialize_arguments_if_neededメソッドでの引数のセットは行われません。

perform_nowメソッドに戻り、perform(*arguments)メソッドに移ります。

execution.rb
    def perform(*)
      fail NotImplementedError
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/execution.rb#L54
ActiveJobクラスの子クラスでperformメソッドを定義していなければ失敗となります。
今回はGuestsCleanupJobクラスでオーバーライドしているので、そのメソッドが呼び出されます。

上記の一連の流れが、perform_laterメソッドを実行してからperformメソッドを実行する流れとなります。

外部の非同期ライブラリとの連携

外部の非同期ライブラリとの連携は、config/application.rbに連携するライブラリ名をシンボルとして記載するのみとなります。
ここでは、Delayed::Jobを例に取ります。

application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_adapter = :delayed_job
  end
end

前述のコードリーディングで少し触れましたが、このキューアダプタの設定がどこで反映されるのか見てみます。
Rails起動時、上記queue_adapter:delayed_jobをセットしているため以下メソッドが実行されます。

queue_adapter.rb
      def queue_adapter=(name_or_adapter)
        case name_or_adapter
        when Symbol, String
          queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
          assign_adapter(name_or_adapter.to_s, queue_adapter)
        else
          if queue_adapter?(name_or_adapter)
            adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
            assign_adapter(adapter_name, name_or_adapter)
          else
            raise ArgumentError
          end
        end
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapter.rb#L37

このときActiveJob::QueueAdapters.lookup(name_or_adapter).newにて、DelayedJobAdapterクラスのインスタンスが作られます。
このクラスでDelayed::Jobとのアダプタを実現しています。

このクラスが使用されるのは、Enqueueモジュールのqueueメソッドとなります。

enqueuing.rb
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          queue_adapter.enqueue_at self, scheduled_at
        else
          queue_adapter.enqueue self
        end

        successfully_enqueued = true
      end

      if successfully_enqueued
        self
      else
        false
      end
    end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/enqueuing.rb#L48

この中でenqueue_at、もしくはenqueueメソッドを呼び出したときDelayedJobAdapterクラスのメソッドを呼び出します。

    class DelayedJobAdapter
      def enqueue(job) #:nodoc:
        delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
        job.provider_job_id = delayed_job.id
        delayed_job
      end

      def enqueue_at(job, timestamp) #:nodoc:
        delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
        job.provider_job_id = delayed_job.id
        delayed_job
      end

https://github.com/rails/rails/blob/c2b701e33470adb1fab15c5e68957facdb26ebb1/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb#L18
これらのメソッドにて、Delayed::JobActiveJobをつないでいます。

enqueueメソッドを実行したとき、Delayed::Jobでどんな動きをしているのかは、こちらの記事を参照してください。
https://zenn.dev/m_yamashii/articles/6eeabce1199da5

上記の流れが、外部の非同期ライブラリとの連携となります。

おわりに

Active Jobのジョブ登録と実行、また外部の非同期ライブラリとの連携についてコードリーディングを行いました。
挙動の違いを吸収するアダプターパターンが使われていますが、あくまでインターフェースでしかないので、非同期ライブラリの機能をしっかり使いたいならばそのライブラリを直接呼んだほうが良いのかなと感じました。
この記事が誰かのお役に立てれば幸いです。

Discussion