【コードリーディング】Active Jobのジョブ登録と実行
はじめに
Railsで非同期処理を触る際、前回執筆したDelayed::Jobに加えて、Active Jobも触る機会がありました。
こちらも内部処理についてコードリーディングしながら理解することにしました。
ActiveJobとは
Railsガイドから引用します。
Active Jobは、ジョブを宣言し、それによってバックエンドでさまざまな方法によるキュー操作を実行するためのフレームワークです。
またActive Jobの目的は以下のとおりです。
Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。
Active Jobは、2014年12月20日にリリースされたRails 4.2から導入されています。
例として
- Delayed::Job(バージョン:4.0.4)
https://rubygems.org/gems/delayed_job/versions - Sidekiq(バージョン:3.3.0)
https://rubygems.org/gems/sidekiq/versions
があります。
これらのライブラリは呼び出し方がそれぞれ異なりますが、Active Jobは、その呼び出し方の違いを吸収し、呼び出し方法をまとめています。そのため開発者はライブラリを気にすることなく呼び出すことができます。
Active Jobの使い方についてはRailsガイドにありますので、ここでは省略します。
コードリーディング
ここでは主に以下2点についてコードリーディングします。
- デフォルト設定でのジョブ登録と実行
- 外部の非同期ライブラリとの連携
環境
Ruby: 2.7.4
Ruby on Rails: 6.1.0
前提
Railsガイドをもとに、ジョブ実行のクラスを定義します。
class ApplicationJob < ActiveJob::Base
end
class GuestsCleanupJob < ApplicationJob
queue_as :default
def perform(*args)
# 後で実行したい作業をここに書く
end
end
今回、このジョブを実行するコードについて読んでいきます。
なお、本記事ではActive Job以外のコードについては、コードは記載せず概要のみ記載することとしています。
ActiveJob
モジュールについて
Rails起動時に読み込まれるRails起動時、ActiveJob
モジュールのautoload
で指定している各クラスやモジュールが読み込まれます。
ここでは、本記事に必要な部分だけ記載します。
module ActiveJob
extend ActiveSupport::Autoload
autoload :Base
autoload :QueueAdapters
autoload :Serializers
autoload :ConfiguredJob
autoload :TestCase
autoload :TestHelper
end
Base
クラスを読み込んでいるので、その中身を見てみます。
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Instrumentation
include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
ジョブの登録、実行に必要な各モジュールがここでinclude
されています。
QueueAdapter
モジュールの読み込み
QueueAdapter
モジュール読み込み時、以下included
内部の処理が実行されます。
module ActiveJob
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
included do
class_attribute :_queue_adapter_name, instance_accessor: false, instance_predicate: false
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
delegate :queue_adapter, to: :class
self.queue_adapter = :async
end
class_attribute
はActiveSupportで定義されており、与えられた最初のシンボルをクラスメソッドのセッター/ゲッターメソッドとして動的に定義するメソッドとなります。
そのため、_queue_adapter_name
メソッド、_queue_adapter
メソッドが作られ、初期値nil
として定義されます。
その次のdelegate
では、queue_adapter
の呼び出しをクラスメソッドとして呼び出すことを表しています。
最後にqueue_adapter
に:async
がセットされます。このセットにおいてqueue_adapter
メソッドが呼び出されます。
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
name_or_adapter
がシンボルなのでActiveJob::QueueAdapters.lookup(name_or_adapter).new
が使用されます。
lookup
メソッドを見てみます。
def lookup(name)
const_get(name.to_s.camelize << ADAPTER)
end
ここではアダプタ名に応じたアダプタクラスを取得する処理となります。
ADAPTER
はADAPTER = "Adapter"
と定義されているため、今回のlookup
メソッドではAsyncAdapter
を取得します。
そのため、先程のActiveJob::QueueAdapters.lookup(name_or_adapter).new
はActiveJob::QueueAdapters.AsyncAdatper.new
となります。
AsncAdapter
クラスのインスタンス作成時、初期化メソッドが実行されます。
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
Scheduler
クラスのインスタンスを作成しているので、その先を見てみます。
class Scheduler #:nodoc:
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze
attr_accessor :immediate
def initialize(**options)
self.immediate = false
@immediate_executor = Concurrent::ImmediateExecutor.new
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end
ジョブの即時実行かどうかを表すimmediate
はfalse
となっており、その後に即時実行を行うインスタンス(Concurrent::ImmediateExecutor
)、非同期での実行を行うインスタンス(Concurrent::ThreadPoolExecutor.new
)を作成しています。
immediate
が初期値false
となっていることから、基本的に非同期での実行を意図しているようです。
queue_adapter
メソッドの処理に戻ります。上記の後のassign_adapter
メソッドは以下のとおりです。
def assign_adapter(adapter_name, queue_adapter)
self._queue_adapter_name = adapter_name
self._queue_adapter = queue_adapter
end
ここではアダプタ名やアダプタクラスのインスタンスをセットする処理となります。
adapter_name
は:async
、queue_adapter
はAsyncAdatper
のインスタンスなので、前述した_queue_adapter_name
メソッド、_queue_adapter
にセットされます。
QueueAdapter
モジュールの読み込みはここまでとなります。
QueueName
モジュールの読み込み
QueueName
モジュールを見てみます。
module ActiveJob
module QueueName
extend ActiveSupport::Concern
module ClassMethods
mattr_accessor :default_queue_name, default: "default"
・・・
end
・・・
included do
class_attribute :queue_name, instance_accessor: false, default: -> { self.class.default_queue_name }
class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
class_attribute :queue_name_prefix
end
class_attributes
については上記で見た通りなので、
-
queue_name
:self.class.default_queue_name
、つまりdefault
-
queue_name_delimiter
:_
-
queue_name_prefix
:nil
がセットされます。
デフォルト設定でのジョブ登録と実行
config/application.rbでバックグラウンドの非同期ライブラリを指定できます。デフォルトでは指定なしとなっています。
以下コードでジョブ登録を行います。
GuestsCleanupJob.perform_later
ここからコードを追っていきます。
perform_later
メソッドは、ActiveJob::Base
にinclude
されているEnqueuing
モジュールで定義されています。
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Instrumentation
include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
perform_later
メソッドには引数を渡していないので、job_or_instantiate
メソッドで自身のインスタンスを作ります。
このときRails起動時に読み込んでいたBase
クラスのCore
モジュールの初期化メソッドが実行されます。
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
@executions = 0
@exception_executions = {}
@timezone = Time.zone&.name
end
ここではそれぞれインスタンス変数にそれぞれ初期値をセットするのみとなります。
なお@arguments
に関しては、perform_later
メソッド実行時に引数を与えていないためnil
となります。
初期化終了後、job_or_instantiate(*args).enqueue
の処理に戻り、ジョブ登録のenqueue
メソッドを呼び出します
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
false
end
end
enqueue
メソッドでは、キューのアダプタに対してジョブの登録を行っています。
perform_later
メソッドの引数にスケジュールの日時、キュー名、優先度があれば取り出してセットしています。
その後、run_callbacks
メソッドのブロック内でキューアダプターに対するenqueue
メソッドを実行しています。
なお、run_callbacks
メソッドはActive Supportで定義されているメソッドであり、指定したシンボルに対してbeforeやafterの処理をセットすることで、ブロックの処理前後にbeforeやafterの処理を実行できます。
今回perform_later
メソッドでスケジュール日時を引数で指定していないので、キューアダプターのenqueue
を呼び出します。
このときのqueue_adapter
を見てみます。
def queue_adapter
_queue_adapter
end
前述した_queue_adapter
メソッドにAsyncAdatper
クラスのインスタンスがセットされているので、queue_adapter.enqueue
はAsyncAdatper
クラスのインスタンスメソッドであるenqueue
の呼び出しとなります。
AsyncAdatper
クラスのenqueue
メソッドを見てみます。
class AsyncAdapter
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
def enqueue(job) #:nodoc:
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
ここでは@scheduler
のenqueue
メソッドを呼び出すのみとなっています。
なお、@scheduler
はRails起動時にAsyncAdapter
クラスのインスタンスがセットされています。
引数にセットされているJobWrapper
クラスのインスタンス作成では以下のようになっています。
class JobWrapper #:nodoc:
def initialize(job)
job.provider_job_id = SecureRandom.uuid
@job_data = job.serialize
end
特に処理は行っておらず、job
のid
のセットと、GuestsCleanupJob
クラスを表すjob
のシリアライズ情報をセットしているのみとなります。
このserialize
メソッドはCore
モジュールで定義されています。
def serialize
{
"job_class" => self.class.name,
"job_id" => job_id,
"provider_job_id" => provider_job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments_if_needed(arguments),
"executions" => executions,
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
"enqueued_at" => Time.now.utc.iso8601
}
end
ここでジョブのキュー名や優先度などをハッシュとして設定しています。
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
の処理に戻ります。
queue_name
に関しては、Rails起動時にQueueName
モジュールにてセットされたqueue_name
の値、つまりdefault
がセットされています。
それでは、引数や呼び出しメソッドがわかったので、@scheduler.enqueue
の処理を見てみます。
def enqueue(job, queue_name:)
executor.post(job, &:perform)
end
executor
はメソッドがあり、以下のように即時実行か非同期実行かを選べる処理となります。
def executor
immediate ? @immediate_executor : @async_executor
end
immediate
はfalse
なので、非同期のexecutor(Concurrent::ThreadPoolExecutor
)が選ばれます。
post
メソッド内部はRubyのコードになるため、以下内部処理の概略を載せます。
Concurrent::ThreadPoolExecutor
のpost
メソッドを実行することで、引数に渡したperform
がpost
メソッド内部でProcにセットされます。
またjob
が引数で渡されていることにより、post
メソッド内部でProc
のcall
時に、JobWrapper
クラスのperform
を呼び出すようになっています。
それではJobWrapper
クラスのperform
メソッドを見てみます。
def perform
Base.execute @job_data
end
@job_data
は、以前JobWrapper
クラスのinitialize
メソッドの引数で渡されたjob
(GuestsCleanupJob
クラスのインスタンス)をシリアライズしたものとなっています。
execute
メソッドはExecution
モジュールで定義されているので、Execution
モジュールを見てみます。
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
先程のシリアライズした@job_data
をデシリアライズし、perform_now
メソッドを呼び出してすぐさまジョブを実行するメソッドとなっています。
このデシリアライズはCore
モジュールで定義されています。
def deserialize(job_data)
self.job_id = job_data["job_id"]
self.provider_job_id = job_data["provider_job_id"]
self.queue_name = job_data["queue_name"]
self.priority = job_data["priority"]
self.serialized_arguments = job_data["arguments"]
self.executions = job_data["executions"]
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
self.enqueued_at = job_data["enqueued_at"]
end
シリアライズメソッドとは逆に、ハッシュとして持っていた情報をそれぞれセットしています。
次のperform_now
メソッドを見てみます。
def perform_now
# Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
self.executions = (executions || 0) + 1
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
このメソッドでは、ActiveJob
を継承したクラスのperform
メソッドに対し、引数をそえて実行する処理となります。
deserialize_arguments_if_needed
メソッドはCore
モジュールで定義されています。
def deserialize_arguments_if_needed
if arguments_serialized?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
perform
メソッドのための引数が事前にセットされていればデシリアライズしてセットするメソッドとなります。
if文の前にarguments_serialized?
メソッドがあるので、その呼び出し先を見てみます。
def arguments_serialized?
defined?(@serialized_arguments) && @serialized_arguments
end
引数がセットされていないので、ここはfalse
となります。そのため、deserialize_arguments_if_needed
メソッドでの引数のセットは行われません。
perform_now
メソッドに戻り、perform(*arguments)
メソッドに移ります。
def perform(*)
fail NotImplementedError
end
ActiveJob
クラスの子クラスでperform
メソッドを定義していなければ失敗となります。
今回はGuestsCleanupJob
クラスでオーバーライドしているので、そのメソッドが呼び出されます。
上記の一連の流れが、perform_later
メソッドを実行してからperform
メソッドを実行する流れとなります。
外部の非同期ライブラリとの連携
外部の非同期ライブラリとの連携は、config/application.rbに連携するライブラリ名をシンボルとして記載するのみとなります。
ここでは、Delayed::Jobを例に取ります。
module YourApp
class Application < Rails::Application
config.active_job.queue_adapter = :delayed_job
end
end
前述のコードリーディングで少し触れましたが、このキューアダプタの設定がどこで反映されるのか見てみます。
Rails起動時、上記queue_adapter
に:delayed_job
をセットしているため以下メソッドが実行されます。
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
このときActiveJob::QueueAdapters.lookup(name_or_adapter).new
にて、DelayedJobAdapter
クラスのインスタンスが作られます。
このクラスでDelayed::Jobとのアダプタを実現しています。
このクラスが使用されるのは、Enqueue
モジュールのqueue
メソッドとなります。
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
false
end
end
この中でenqueue_at
、もしくはenqueue
メソッドを呼び出したときDelayedJobAdapter
クラスのメソッドを呼び出します。
class DelayedJobAdapter
def enqueue(job) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
Delayed::Job
とActiveJob
をつないでいます。
enqueue
メソッドを実行したとき、Delayed::Job
でどんな動きをしているのかは、こちらの記事を参照してください。
上記の流れが、外部の非同期ライブラリとの連携となります。
おわりに
Active Jobのジョブ登録と実行、また外部の非同期ライブラリとの連携についてコードリーディングを行いました。
挙動の違いを吸収するアダプターパターンが使われていますが、あくまでインターフェースでしかないので、非同期ライブラリの機能をしっかり使いたいならばそのライブラリを直接呼んだほうが良いのかなと感じました。
この記事が誰かのお役に立てれば幸いです。
Discussion