Active Job の引数に Active Record オブジェクトを渡したときに何が起きているか

に公開

Sidekiq はジョブの引数を enqueue 時に JSON.dump して、dequeue 時に JSON.load している。このため、引数には string, integer, float, boolean, array, hash などの JSON で表現できる型のオブジェクトしか安全に渡すことができない

Active Record のオブジェクトは渡すことができないため、代わりにレコードの ID を渡して、ジョブの中で find しなおす形になる。

class SomeJob
  include Sidekiq::Worker

  def perform(user_id)
    user = User.find(user_id)
    user.do_something
  end
end

SomeJob.perform_async(user_id)

が、Active Job やそれに基づいている Action Mailer の引数には以下のように Active Record のオブジェクトを渡すことができる。

class SomeJob < ApplicationJob
  def perform(user)
    user.do_something
  end
end

SomeJob.perform_later(user)

Active Job の引数に Active Record のオブジェクトを渡したとき、ジョブの enqueue 時, 実行時に具体的には何が起きているのかを調べた。

先に結論

enqueue 時に引数を serialize するとき、Active Record オブジェクトは Global Id というもので保存される。

Global ID はモデルのレコードに Rails アプリケーションの中で一意な ID を URI 形式で付与するライブラリ。
https://github.com/rails/globalid

user = User.find(1)
user.to_global_id.to_s
# => "gid://foo-app/User/1"

Redis にはこの "gid://foo-app/User/1" だけが入る。

ジョブが dequeue されて引数が deserialize される際は、この Global ID を元にモデル名とIDが復元されて、ActiveRecord の find が叩かれて DB からレコードが再取得される。

ジョブ enqueue 時の処理の流れ

ジョブの引数は ActiveJob::Arguments.serialize_argument の中でオブジェクトの型に応じて再帰的に serialize される。

  def serialize_argument(argument)
    case argument
    when *PERMITTED_TYPES
      argument
    when GlobalID::Identification
      convert_to_global_id_hash(argument)
    when Array
      argument.map { |arg| serialize_argument(arg) }
    when ActiveSupport::HashWithIndifferentAccess
      serialize_indifferent_hash(argument)
    when Hash
      symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
      aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
        RUBY2_KEYWORDS_KEY
      else
        SYMBOL_KEYS_KEY
      end
      result = serialize_hash(argument)
      result[aj_hash_key] = symbol_keys
      result
    when -> (arg) { arg.respond_to?(:permitted?) }
      serialize_indifferent_hash(argument.to_h)
    else
      Serializers.serialize(argument)
    end
  end

  def convert_to_global_id_hash(argument)
    { GLOBALID_KEY => argument.to_global_id.to_s }
  rescue ...
    ...
  end

ここで GlobalID::Identification が ActiveRecord に include されている ため、Active Record オブジェクトは Global Id で保存される。具体的には { GLOBALID_KEY => "gid://foo-app/User/1" } の形に serialize される。

ジョブ実行時の処理の流れ

dequeue して引数が deserialize される際は、ActiveJob::Arguments.deserialize_argument の中で Global Id で保存されていることを確認して、GlobalID::Locator.locate を実行する。

  def deserialize_argument(argument)
    case argument
    when String
      argument
    when *PERMITTED_TYPES
      argument
    when Array
      argument.map { |arg| deserialize_argument(arg) }
    when Hash
      if serialized_global_id?(argument)
        deserialize_global_id argument
      elsif custom_serialized?(argument)
        Serializers.deserialize(argument)
      else
        deserialize_hash(argument)
      end
    else
      raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
    end
  end

  def serialized_global_id?(hash)
    hash.size == 1 && hash.include?(GLOBALID_KEY)
  end

  def deserialize_global_id(hash)
    GlobalID::Locator.locate hash[GLOBALID_KEY]
  end

GlobalID::Locator.locate は最終的にはモデルの find を呼ぶ。

# https://github.com/rails/globalid/blob/3ddb0f87fd5c22b3330ab2b4e5c41a85953ac886/lib/global_id/locator.rb#L129-L131
def locate(gid)
  gid.model_class.find gid.model_id
end

(*1) Sidekiq が queue からジョブを fetch して実行するまでの流れはこちら

引数で渡したレコードのDBへの保存が commit される前にジョブが実行されるとエラーになる

このように、ジョブの実行時にはDBからレコードが再取得されるため、create した Active Record オブジェクトをジョブの引数に渡したとして、その DB への insert が commit される前にジョブが dequeue・実行されると、レコードが取得できずにエラーになる。

具体的には例えば、transaction の中で Action Mailer の deliver_later を呼んで、transaction が完了する前にジョブが dequeue されたときなどに起こる。(そもそもメールはロールバックできないので、その意味でも transaction が完了してから呼ばないといけない。)

create に限らず update でも、それが commit される前にジョブが dequeue されてしまえば、そのレコードは update 前の古い状態になる。

Discussion