Active Job の引数に Active Record オブジェクトを渡したときに何が起きているか
Sidekiq はジョブの引数を enqueue 時に JSON.dump
して、dequeue 時に JSON.load
している。このため、引数には string, integer, float, boolean, array, hash などの JSON で表現できる型のオブジェクトしか安全に渡すことができない。
Active Record のオブジェクトは渡すことができないため、代わりにレコードの ID を渡して、ジョブの中で find しなおす形になる。
class SomeJob
include Sidekiq::Worker
def perform(user_id)
user = User.find(user_id)
user.do_something
end
end
SomeJob.perform_async(user_id)
が、Active Job やそれに基づいている Action Mailer の引数には以下のように Active Record のオブジェクトを渡すことができる。
class SomeJob < ApplicationJob
def perform(user)
user.do_something
end
end
SomeJob.perform_later(user)
Active Job の引数に Active Record のオブジェクトを渡したとき、ジョブの enqueue 時, 実行時に具体的には何が起きているのかを調べた。
先に結論
enqueue 時に引数を serialize するとき、Active Record オブジェクトは Global Id というもので保存される。
Global ID はモデルのレコードに Rails アプリケーションの中で一意な ID を URI 形式で付与するライブラリ。
user = User.find(1)
user.to_global_id.to_s
# => "gid://foo-app/User/1"
Redis にはこの "gid://foo-app/User/1"
だけが入る。
ジョブが dequeue されて引数が deserialize される際は、この Global ID を元にモデル名とIDが復元されて、ActiveRecord の find が叩かれて DB からレコードが再取得される。
ジョブ enqueue 時の処理の流れ
ジョブの引数は ActiveJob::Arguments.serialize_argument
の中でオブジェクトの型に応じて再帰的に serialize される。
def serialize_argument(argument)
case argument
when *PERMITTED_TYPES
argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
serialize_indifferent_hash(argument)
when Hash
symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
RUBY2_KEYWORDS_KEY
else
SYMBOL_KEYS_KEY
end
result = serialize_hash(argument)
result[aj_hash_key] = symbol_keys
result
when -> (arg) { arg.respond_to?(:permitted?) }
serialize_indifferent_hash(argument.to_h)
else
Serializers.serialize(argument)
end
end
def convert_to_global_id_hash(argument)
{ GLOBALID_KEY => argument.to_global_id.to_s }
rescue ...
...
end
ここで GlobalID::Identification
が ActiveRecord に include されている ため、Active Record オブジェクトは Global Id で保存される。具体的には { GLOBALID_KEY => "gid://foo-app/User/1" }
の形に serialize される。
ジョブ実行時の処理の流れ
-
Sidekiq::Processor#execute_job (*1)
-
ActiveJob::QueueAdapters::SidekiqAdapters::JobWrapper#perform
-
ActiveJob::Execution.execute
-
ActiveJob::Core.deserialize
job_data["job_class"].constantize.new
- ActiveJob::Core#deserialize
- *この段階ではジョブがインスタンス化されただけで、引数はまだ serialize された状態のまま
-
ActiveJob::Execution#perform_now
-
ActiveJob::Core#deserialize_arguments_if_needed
-
ActiveJob::Arguments.deserialize
-
ActiveJob::Arguments.deserialize_argument
-
ActiveJob::Arguments.deserialize_global_id
GlobalID::Locator.locate hash[GLOBALID_KEY]
-
ActiveJob::Arguments.deserialize_global_id
-
ActiveJob::Arguments.deserialize_argument
-
ActiveJob::Arguments.deserialize
-
ActiveJob::Core#deserialize_arguments_if_needed
-
ActiveJob::Core.deserialize
-
ActiveJob::Execution.execute
-
ActiveJob::QueueAdapters::SidekiqAdapters::JobWrapper#perform
dequeue して引数が deserialize される際は、ActiveJob::Arguments.deserialize_argument
の中で Global Id で保存されていることを確認して、GlobalID::Locator.locate
を実行する。
def deserialize_argument(argument)
case argument
when String
argument
when *PERMITTED_TYPES
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
elsif custom_serialized?(argument)
Serializers.deserialize(argument)
else
deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
end
def serialized_global_id?(hash)
hash.size == 1 && hash.include?(GLOBALID_KEY)
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
GlobalID::Locator.locate
は最終的にはモデルの find
を呼ぶ。
# https://github.com/rails/globalid/blob/3ddb0f87fd5c22b3330ab2b4e5c41a85953ac886/lib/global_id/locator.rb#L129-L131
def locate(gid)
gid.model_class.find gid.model_id
end
(*1) Sidekiq が queue からジョブを fetch して実行するまでの流れはこちら。
引数で渡したレコードのDBへの保存が commit される前にジョブが実行されるとエラーになる
このように、ジョブの実行時にはDBからレコードが再取得されるため、create した Active Record オブジェクトをジョブの引数に渡したとして、その DB への insert が commit される前にジョブが dequeue・実行されると、レコードが取得できずにエラーになる。
具体的には例えば、transaction の中で Action Mailer の deliver_later
を呼んで、transaction が完了する前にジョブが dequeue されたときなどに起こる。(そもそもメールはロールバックできないので、その意味でも transaction が完了してから呼ばないといけない。)
create に限らず update でも、それが commit される前にジョブが dequeue されてしまえば、そのレコードは update 前の古い状態になる。
Discussion