【コードリーディング】Active Jobのジョブ登録と実行
はじめに
Railsで非同期処理を触る際、前回執筆したDelayed::Jobに加えて、Active Jobも触る機会がありました。
こちらも内部処理についてコードリーディングしながら理解することにしました。
ActiveJobとは
Railsガイドから引用します。
Active Jobは、ジョブを宣言し、それによってバックエンドでさまざまな方法によるキュー操作を実行するためのフレームワークです。
またActive Jobの目的は以下のとおりです。
Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。
Active Jobは、2014年12月20日にリリースされたRails 4.2から導入されています。
その4.2がリリースされる前から、すでに非同期ライブラリが存在していました。
例として
- Delayed::Job(バージョン:4.0.4)
https://rubygems.org/gems/delayed_job/versions - Sidekiq(バージョン:3.3.0)
https://rubygems.org/gems/sidekiq/versions
があります。
これらのライブラリは呼び出し方がそれぞれ異なりますが、Active Jobは、その呼び出し方の違いを吸収し、呼び出し方法をまとめています。そのため開発者はライブラリを気にすることなく呼び出すことができます。
Active Jobの使い方についてはRailsガイドにありますので、ここでは省略します。
コードリーディング
ここでは主に以下2点についてコードリーディングします。
- デフォルト設定でのジョブ登録と実行
- 外部の非同期ライブラリとの連携
環境
Ruby: 2.7.4
Ruby on Rails: 6.1.0
前提
Railsガイドをもとに、ジョブ実行のクラスを定義します。
class ApplicationJob < ActiveJob::Base
end
class GuestsCleanupJob < ApplicationJob
queue_as :default
def perform(*args)
# 後で実行したい作業をここに書く
end
end
今回、このジョブを実行するコードについて読んでいきます。
なお、本記事ではActive Job以外のコードについては、コードは記載せず概要のみ記載することとしています。
Rails起動時に読み込まれるActiveJobモジュールについて
Rails起動時、ActiveJobモジュールのautoloadで指定している各クラスやモジュールが読み込まれます。
ここでは、本記事に必要な部分だけ記載します。
module ActiveJob
extend ActiveSupport::Autoload
autoload :Base
autoload :QueueAdapters
autoload :Serializers
autoload :ConfiguredJob
autoload :TestCase
autoload :TestHelper
end
Baseクラスを読み込んでいるので、その中身を見てみます。
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Instrumentation
include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
ジョブの登録、実行に必要な各モジュールがここでincludeされています。
QueueAdapterモジュールの読み込み
QueueAdapterモジュール読み込み時、以下included内部の処理が実行されます。
module ActiveJob
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
included do
class_attribute :_queue_adapter_name, instance_accessor: false, instance_predicate: false
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
delegate :queue_adapter, to: :class
self.queue_adapter = :async
end
class_attributeはActiveSupportで定義されており、与えられた最初のシンボルをクラスメソッドのセッター/ゲッターメソッドとして動的に定義するメソッドとなります。
そのため、_queue_adapter_nameメソッド、_queue_adapterメソッドが作られ、初期値nilとして定義されます。
その次のdelegateでは、queue_adapterの呼び出しをクラスメソッドとして呼び出すことを表しています。
最後にqueue_adapterに:asyncがセットされます。このセットにおいてqueue_adapterメソッドが呼び出されます。
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
name_or_adapterがシンボルなのでActiveJob::QueueAdapters.lookup(name_or_adapter).newが使用されます。
lookupメソッドを見てみます。
def lookup(name)
const_get(name.to_s.camelize << ADAPTER)
end
ここではアダプタ名に応じたアダプタクラスを取得する処理となります。
ADAPTERはADAPTER = "Adapter"と定義されているため、今回のlookupメソッドではAsyncAdapterを取得します。
そのため、先程のActiveJob::QueueAdapters.lookup(name_or_adapter).newはActiveJob::QueueAdapters.AsyncAdatper.newとなります。
AsncAdapterクラスのインスタンス作成時、初期化メソッドが実行されます。
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
Schedulerクラスのインスタンスを作成しているので、その先を見てみます。
class Scheduler #:nodoc:
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze
attr_accessor :immediate
def initialize(**options)
self.immediate = false
@immediate_executor = Concurrent::ImmediateExecutor.new
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end
ジョブの即時実行かどうかを表すimmediateはfalseとなっており、その後に即時実行を行うインスタンス(Concurrent::ImmediateExecutor)、非同期での実行を行うインスタンス(Concurrent::ThreadPoolExecutor.new)を作成しています。
immediateが初期値falseとなっていることから、基本的に非同期での実行を意図しているようです。
queue_adapterメソッドの処理に戻ります。上記の後のassign_adapterメソッドは以下のとおりです。
def assign_adapter(adapter_name, queue_adapter)
self._queue_adapter_name = adapter_name
self._queue_adapter = queue_adapter
end
ここではアダプタ名やアダプタクラスのインスタンスをセットする処理となります。
adapter_nameは:async、queue_adapterはAsyncAdatperのインスタンスなので、前述した_queue_adapter_nameメソッド、_queue_adapterにセットされます。
QueueAdapterモジュールの読み込みはここまでとなります。
QueueNameモジュールの読み込み
QueueNameモジュールを見てみます。
module ActiveJob
module QueueName
extend ActiveSupport::Concern
module ClassMethods
mattr_accessor :default_queue_name, default: "default"
・・・
end
・・・
included do
class_attribute :queue_name, instance_accessor: false, default: -> { self.class.default_queue_name }
class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
class_attribute :queue_name_prefix
end
class_attributesについては上記で見た通りなので、
-
queue_name:self.class.default_queue_name、つまりdefault -
queue_name_delimiter:_ -
queue_name_prefix:nil
がセットされます。
デフォルト設定でのジョブ登録と実行
config/application.rbでバックグラウンドの非同期ライブラリを指定できます。デフォルトでは指定なしとなっています。
以下コードでジョブ登録を行います。
GuestsCleanupJob.perform_later
ここからコードを追っていきます。
perform_laterメソッドは、ActiveJob::BaseにincludeされているEnqueuingモジュールで定義されています。
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Instrumentation
include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
perform_laterメソッドには引数を渡していないので、job_or_instantiateメソッドで自身のインスタンスを作ります。
このときRails起動時に読み込んでいたBaseクラスのCoreモジュールの初期化メソッドが実行されます。
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
@executions = 0
@exception_executions = {}
@timezone = Time.zone&.name
end
ここではそれぞれインスタンス変数にそれぞれ初期値をセットするのみとなります。
なお@argumentsに関しては、perform_laterメソッド実行時に引数を与えていないためnilとなります。
初期化終了後、job_or_instantiate(*args).enqueueの処理に戻り、ジョブ登録のenqueueメソッドを呼び出します
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
false
end
end
enqueueメソッドでは、キューのアダプタに対してジョブの登録を行っています。
perform_laterメソッドの引数にスケジュールの日時、キュー名、優先度があれば取り出してセットしています。
その後、run_callbacksメソッドのブロック内でキューアダプターに対するenqueueメソッドを実行しています。
なお、run_callbacksメソッドはActive Supportで定義されているメソッドであり、指定したシンボルに対してbeforeやafterの処理をセットすることで、ブロックの処理前後にbeforeやafterの処理を実行できます。
今回perform_laterメソッドでスケジュール日時を引数で指定していないので、キューアダプターのenqueueを呼び出します。
このときのqueue_adapterを見てみます。
def queue_adapter
_queue_adapter
end
前述した_queue_adapterメソッドにAsyncAdatperクラスのインスタンスがセットされているので、queue_adapter.enqueueはAsyncAdatperクラスのインスタンスメソッドであるenqueueの呼び出しとなります。
AsyncAdatperクラスのenqueueメソッドを見てみます。
class AsyncAdapter
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
def enqueue(job) #:nodoc:
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
ここでは@schedulerのenqueueメソッドを呼び出すのみとなっています。
なお、@schedulerはRails起動時にAsyncAdapterクラスのインスタンスがセットされています。
引数にセットされているJobWrapperクラスのインスタンス作成では以下のようになっています。
class JobWrapper #:nodoc:
def initialize(job)
job.provider_job_id = SecureRandom.uuid
@job_data = job.serialize
end
特に処理は行っておらず、jobのidのセットと、GuestsCleanupJobクラスを表すjobのシリアライズ情報をセットしているのみとなります。
このserializeメソッドはCoreモジュールで定義されています。
def serialize
{
"job_class" => self.class.name,
"job_id" => job_id,
"provider_job_id" => provider_job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments_if_needed(arguments),
"executions" => executions,
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
"enqueued_at" => Time.now.utc.iso8601
}
end
ここでジョブのキュー名や優先度などをハッシュとして設定しています。
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_nameの処理に戻ります。
queue_nameに関しては、Rails起動時にQueueNameモジュールにてセットされたqueue_nameの値、つまりdefaultがセットされています。
それでは、引数や呼び出しメソッドがわかったので、@scheduler.enqueueの処理を見てみます。
def enqueue(job, queue_name:)
executor.post(job, &:perform)
end
executorはメソッドがあり、以下のように即時実行か非同期実行かを選べる処理となります。
def executor
immediate ? @immediate_executor : @async_executor
end
immediateはfalseなので、非同期のexecutor(Concurrent::ThreadPoolExecutor)が選ばれます。
postメソッド内部はRubyのコードになるため、以下内部処理の概略を載せます。
Concurrent::ThreadPoolExecutorのpostメソッドを実行することで、引数に渡したperformがpostメソッド内部でProcにセットされます。
またjobが引数で渡されていることにより、postメソッド内部でProcのcall時に、JobWrapperクラスのperformを呼び出すようになっています。
それではJobWrapperクラスのperformメソッドを見てみます。
def perform
Base.execute @job_data
end
@job_dataは、以前JobWrapperクラスのinitializeメソッドの引数で渡されたjob(GuestsCleanupJobクラスのインスタンス)をシリアライズしたものとなっています。
executeメソッドはExecutionモジュールで定義されているので、Executionモジュールを見てみます。
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
先程のシリアライズした@job_dataをデシリアライズし、perform_nowメソッドを呼び出してすぐさまジョブを実行するメソッドとなっています。
このデシリアライズはCoreモジュールで定義されています。
def deserialize(job_data)
self.job_id = job_data["job_id"]
self.provider_job_id = job_data["provider_job_id"]
self.queue_name = job_data["queue_name"]
self.priority = job_data["priority"]
self.serialized_arguments = job_data["arguments"]
self.executions = job_data["executions"]
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
self.enqueued_at = job_data["enqueued_at"]
end
シリアライズメソッドとは逆に、ハッシュとして持っていた情報をそれぞれセットしています。
次のperform_nowメソッドを見てみます。
def perform_now
# Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
self.executions = (executions || 0) + 1
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
このメソッドでは、ActiveJobを継承したクラスのperformメソッドに対し、引数をそえて実行する処理となります。
deserialize_arguments_if_neededメソッドはCoreモジュールで定義されています。
def deserialize_arguments_if_needed
if arguments_serialized?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
performメソッドのための引数が事前にセットされていればデシリアライズしてセットするメソッドとなります。
if文の前にarguments_serialized?メソッドがあるので、その呼び出し先を見てみます。
def arguments_serialized?
defined?(@serialized_arguments) && @serialized_arguments
end
引数がセットされていないので、ここはfalseとなります。そのため、deserialize_arguments_if_neededメソッドでの引数のセットは行われません。
perform_nowメソッドに戻り、perform(*arguments)メソッドに移ります。
def perform(*)
fail NotImplementedError
end
ActiveJobクラスの子クラスでperformメソッドを定義していなければ失敗となります。
今回はGuestsCleanupJobクラスでオーバーライドしているので、そのメソッドが呼び出されます。
上記の一連の流れが、perform_laterメソッドを実行してからperformメソッドを実行する流れとなります。
外部の非同期ライブラリとの連携
外部の非同期ライブラリとの連携は、config/application.rbに連携するライブラリ名をシンボルとして記載するのみとなります。
ここでは、Delayed::Jobを例に取ります。
module YourApp
class Application < Rails::Application
config.active_job.queue_adapter = :delayed_job
end
end
前述のコードリーディングで少し触れましたが、このキューアダプタの設定がどこで反映されるのか見てみます。
Rails起動時、上記queue_adapterに:delayed_jobをセットしているため以下メソッドが実行されます。
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
このときActiveJob::QueueAdapters.lookup(name_or_adapter).newにて、DelayedJobAdapterクラスのインスタンスが作られます。
このクラスでDelayed::Jobとのアダプタを実現しています。
このクラスが使用されるのは、Enqueueモジュールのqueueメソッドとなります。
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
false
end
end
この中でenqueue_at、もしくはenqueueメソッドを呼び出したときDelayedJobAdapterクラスのメソッドを呼び出します。
class DelayedJobAdapter
def enqueue(job) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
これらのメソッドにて、Delayed::JobとActiveJobをつないでいます。
enqueueメソッドを実行したとき、Delayed::Jobでどんな動きをしているのかは、こちらの記事を参照してください。
上記の流れが、外部の非同期ライブラリとの連携となります。
おわりに
Active Jobのジョブ登録と実行、また外部の非同期ライブラリとの連携についてコードリーディングを行いました。
挙動の違いを吸収するアダプターパターンが使われていますが、あくまでインターフェースでしかないので、非同期ライブラリの機能をしっかり使いたいならばそのライブラリを直接呼んだほうが良いのかなと感じました。
この記事が誰かのお役に立てれば幸いです。
Discussion