Active Job の引数に Active Record オブジェクトを渡したときに何が起きているか

2022/01/09に公開約7,600字

Sidekiq はジョブの引数を enqueue 時に JSON.dump して、dequeue 時に JSON.load している。このため、引数には string, integer, float, boolean, array, hash などの JSON で表現できる型のオブジェクトしか安全に渡すことができない

Active Record のオブジェクトは渡すことができないため、代わりにレコードの ID を渡して、ジョブの中で find しなおす形になる。

class SomeJob
  include Sidekiq::Worker

  def perform(user_id)
    user = User.find(user_id)
    user.do_something
  end
end

SomeJob.perform_async(user_id)

が、Active Job やそれに基づいている Action Mailer の引数には以下のように Active Record のオブジェクトを渡すことができる。

class SomeJob < ApplicationJob
  def perform(user)
    user.do_something
  end
end

SomeJob.perform_later(user)

Active Job の引数に Active Record のオブジェクトを渡したとき、ジョブの enqueue 時, 実行時に具体的には何が起きているのかを調べた。

先に結論

enqueue 時に引数を serialize するとき、Active Record オブジェクトは Global Id というもので保存される。

Global ID はモデルのレコードに Rails アプリケーションの中で一意な ID を URI 形式で付与するライブラリ。
https://github.com/rails/globalid

user = User.find(1)
user.to_global_id.to_s
# => "gid://foo-app/User/1"

Redis にはこの "gid://foo-app/User/1" だけが入る。

ジョブが dequeue されて引数が deserialize される際は、この Global ID を元にモデル名とIDが復元されて、ActiveRecord の find が叩かれて DB からレコードが再取得される。

ジョブ enqueue 時の処理の流れ

ジョブの引数は ActiveJob::Arguments.serialize_argument の中でオブジェクトの型に応じて再帰的に serialize される。

  def serialize_argument(argument)
    case argument
    when *PERMITTED_TYPES
      argument
    when GlobalID::Identification
      convert_to_global_id_hash(argument)
    when Array
      argument.map { |arg| serialize_argument(arg) }
    when ActiveSupport::HashWithIndifferentAccess
      serialize_indifferent_hash(argument)
    when Hash
      symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
      aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
        RUBY2_KEYWORDS_KEY
      else
        SYMBOL_KEYS_KEY
      end
      result = serialize_hash(argument)
      result[aj_hash_key] = symbol_keys
      result
    when -> (arg) { arg.respond_to?(:permitted?) }
      serialize_indifferent_hash(argument.to_h)
    else
      Serializers.serialize(argument)
    end
  end

  def convert_to_global_id_hash(argument)
    { GLOBALID_KEY => argument.to_global_id.to_s }
  rescue ...
    ...
  end

ここで GlobalID::Identification が ActiveRecord に include されている ため、Active Record オブジェクトは Global Id で保存される。具体的には { GLOBALID_KEY => "gid://foo-app/User/1" } の形に serialize される。

ジョブ実行時の処理の流れ

dequeue して引数が deserialize される際は、ActiveJob::Arguments.deserialize_argument の中で Global Id で保存されていることを確認して、GlobalID::Locator.locate を実行する。

  def deserialize_argument(argument)
    case argument
    when String
      argument
    when *PERMITTED_TYPES
      argument
    when Array
      argument.map { |arg| deserialize_argument(arg) }
    when Hash
      if serialized_global_id?(argument)
        deserialize_global_id argument
      elsif custom_serialized?(argument)
        Serializers.deserialize(argument)
      else
        deserialize_hash(argument)
      end
    else
      raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
    end
  end

  def serialized_global_id?(hash)
    hash.size == 1 && hash.include?(GLOBALID_KEY)
  end

  def deserialize_global_id(hash)
    GlobalID::Locator.locate hash[GLOBALID_KEY]
  end

GlobalID::Locator.locate は最終的にはモデルの find を呼ぶ。

# https://github.com/rails/globalid/blob/3ddb0f87fd5c22b3330ab2b4e5c41a85953ac886/lib/global_id/locator.rb#L129-L131
def locate(gid)
  gid.model_class.find gid.model_id
end

(*1) Sidekiq が queue からジョブを fetch して実行するまでの流れはこちら

引数で渡したレコードのDBへの保存が commit される前にジョブが実行されるとエラーになる

このように、ジョブの実行時にはDBからレコードが再取得されるため、create した Active Record オブジェクトをジョブの引数に渡したとして、その DB への insert が commit される前にジョブが dequeue・実行されると、レコードが取得できずにエラーになる。

具体的には例えば、transaction の中で Action Mailer の deliver_later を呼んで、transaction が完了する前にジョブが dequeue されたときなどに起こる。(そもそもメールはロールバックできないので、その意味でも transaction が完了してから呼ばないといけない。)

create に限らず update でも、それが commit される前にジョブが dequeue されてしまえば、そのレコードは update 前の古い状態になる。

Discussion

ログインするとコメントできます