📝

Railsのin_batchesを読み解く

2024/03/16に公開

in_batchesは分割してレコードを取得して処理するメソッドになります。
https://railsdoc.com/page/in_batches

User.where("age > 21").in_batches do |relation|
  relation.delete_all
  sleep(10) # Throttle the delete queries
end

分割して取得して削除

User.in_batches.delete_all

分割して取得して更新

User.in_batches.update_all(awesome: true)

また同様なメソッドとして、find)_eachfind_in_batchesがありますが、どちらも内部的にin_batchesが呼ばれています。
この3つのメソッドの違いとしては、どのような形で値を返すかの違いになります。
このメソッドの違いについて別の記事にまとめようと思っています。

では早速、in_batchesのコードを読み解いてていきます。
https://github.com/rails/rails/blob/984c3ef2775781d47efa9f541ce570daa2434a80/activerecord/lib/active_record/relation/batches.rb#L204-L265

def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: :asc)

オプションのそれぞれの内容はこのようになっています。

オプション デフォルト値 説明
of 1000 一度に処理するレコードのバッチサイズを指定する。この値に基づいて、レコードは小さなグループに分割され、各グループが個別に処理される。
start nil バッチ処理を開始するレコードのプライマリーキーの値を指定する。この値を設定すると、指定された値以上のプライマリーキーを持つレコードから処理が開始される。
finish nil バッチ処理を終了するレコードのプライマリーキーの値を指定する。この値を設定すると、指定された値以下のプライマリーキーを持つレコードまでが処理の対象となる。
load false レコードをメモリに事前ロードするかどうかを指定しする。trueに設定すると、各バッチのレコードがデータベースから読み込まれてから処理される。falseの場合は、レコードは必要に応じて後からロードされる。
error_on_ignore nil クエリに設定された順序が無視された場合にエラーを発生させるかどうかを指定する。trueに設定すると、順序が無視された場合にエラーが発生する。
order :asc レコードを処理する順序を指定する。:ascは昇順(プライマリーキーが小さい順)、:descは降順(プライマリーキーが大きい順)を意味する。

これらのオプションを使用することで、in_batchesメソッドの挙動を細かく制御し、大量のデータを効率的に扱うことができます。たとえば、特定の範囲のレコードのみを対象に処理を行ったり、処理の順序を逆にしたりすることが可能です。


relation = self

selfActiveRecord::Batchesモジュールをミックスインしているモデルのクラスインスタンスを指している。ActiveRecord::Batchesモジュールは、ActiveRecord::Baseから派生したモデルによって使用されることを意図しているため、このコンテキストではselfはそのようなモデルのクラスインスタンス(例えばPersonや他のActiveRecordモデル)を参照している


unless block_given?
  return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self)
end

unless block_given?: この行はRubyのblock_given?メソッドを使用している
https://docs.ruby-lang.org/ja/latest/method/Kernel/m/block_given=3f.html
block_given?メソッドは、現在のメソッド呼び出しにブロックが与えられているかどうかをチェックする。もしブロックが与えられていなければ、このunlessブロック内のコードが実行される。

  1. return BatchEnumerator.new(...): ブロックが与えられていない場合、BatchEnumeratorクラスの新しいインスタンスが作成されます。BatchEnumeratorは、バッチ処理のための列挙子(Enumerator)を提供するクラスです。このインスタンスは、in_batchesメソッドに渡された引数(of, start, finish)と、selfin_batchesメソッドを呼び出しているActiveRecordモデルのインスタンス)を使用して初期化されます。
  2. BatchEnumerator.new(of: of, start: start, finish: finish, relation: self): ここではBatchEnumeratorのコンストラクタに、バッチサイズ(of)、開始ID(start)、終了ID(finish)、および関連するレコードの集合(relation)を渡しています。relation: selfは、in_batchesが呼び出されたモデルの現在のクエリ(スコープ)をBatchEnumeratorに渡すために使用されます。

このロジックの主な目的は、in_batchesメソッドがブロックを受け取らずに呼び出された場合(例えばPerson.in_batchesのように)、バッチ処理を制御するための列挙子(Enumerator)を返すことです。これにより、メソッドの呼び出し側は、必要に応じて列挙子を使用して独自の処理を行うことができます。


unless [:asc, :desc].include?(order)
  raise ArgumentError, ":order must be :asc or :desc, got #{order.inspect}"
end

:orderオプションが:ascまたは:desc以外の値である場合、ArgumentErrorが発生させる


if arel.orders.present?
  act_on_ignored_order(error_on_ignore)
end
  1. arel.orders.present?: この行では、arelオブジェクトのorders属性が存在し、かつ空でないかどうかをチェックしています。arelは、Active Recordの下層にあるArel(A Relational Algebra)ライブラリを指し、SQLクエリの生成を担当しています。orders属性は、レコードの並び順(ORDER BY句)に関する情報を含んでいます。このチェックにより、現在のクエリに何らかの並び順が設定されているかどうかを確認しています。
  2. act_on_ignored_order(error_on_ignore): もしorders属性が存在する場合(つまり、並び順がクエリに設定されている場合)、act_on_ignored_orderメソッドが呼び出されます。このメソッドは、バッチ処理において設定された並び順をどのように扱うかを決定します。error_on_ignoreパラメータは、並び順が無視されるべきかどうか、また無視される場合にエラーを発生させるべきかどうかを指定するために使用されます。

バッチ処理では、レコードは主キーの順序で処理されることが一般的です。したがって、ユーザーが設定した並び順は通常無視されます。しかし、この挙動はerror_on_ignoreオプションによってカスタマイズ可能であり、無視されるべき並び順が存在する場合にエラーを発生させることもできます。このように、act_on_ignored_orderメソッドは、並び順に関する設定を適切に処理し、バッチ処理の動作を制御するために重要な役割を果たします。

User.order(:desc).arel.orders.present?
=> true

batch_limit = of
if limit_value
  remaining = limit_value
  batch_limit = remaining if remaining < batch_limit
end

このコードブロックは、ActiveRecord::Batchesモジュールのin_batchesメソッド内で、バッチ処理を行う際のレコードの最大数を設定するために使用されます。具体的には、指定されたバッチサイズ(of)と、クエリに設定されたレコードの上限数(limit_value)を考慮して、実際のバッチ処理におけるレコードの最大数(batch_limit)を決定します。

  1. 初期設定: batch_limit = ofにより、バッチ処理の最大レコード数は、初期値としてメソッド呼び出し時に指定されたバッチサイズ(of)に設定されます。
  2. limit_valueの存在チェック: 次に、if limit_valueによって、クエリにレコードの上限数が指定されているかどうかをチェックします。limit_valueは、クエリで.limitメソッドを使用して設定されたレコードの上限数です。
  3. バッチサイズとレコード上限数の比較: もしレコードの上限数(limit_value)が指定されている場合、それが現在のバッチサイズ(batch_limit)よりも小さいかどうかをチェックします。もしlimit_valuebatch_limitよりも小さい場合、それはクエリで設定されたレコードの上限数が現在のバッチサイズよりも小さいことを意味します。そのため、バッチ処理におけるレコードの最大数(batch_limit)をlimit_value(残りのレコード数)に設定します。

このロジックにより、in_batchesメソッドは、クエリに設定されたレコードの上限数を超えないようにしつつ、指定されたバッチサイズを効率的に利用することができます。特に大量のデータを扱う場合には、この挙動によってメモリ使用量を適切に管理し、パフォーマンスを最適化することが可能になります。


relation = relation.reorder(batch_order(order)).limit(batch_limit)
relation = apply_limits(relation, start, finish, order)
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
batch_relation = relation

このコードブロックは、ActiveRecord::Batchesモジュールのin_batchesメソッド内で、バッチ処理のためのActive Recordリレーションを準備する過程を示しています。バッチ処理におけるレコードの取得順序、レコードの数、開始点と終了点の制限、そしてクエリキャッシュのスキップなど、処理に必要な設定を適用しています。

  1. 並び替えと制限の適用:
    • relation.reorder(batch_order(order)).limit(batch_limit): 最初に、relation(現在のActive Recordリレーション)に対して、reorderメソッドを使用して並び順を設定し直します。batch_order(order)は、バッチ処理を実行する際の並び順(昇順または降順)を決定するメソッドで、order引数に基づいています。その後、limit(batch_limit)を適用して、一度に取得するレコードの最大数をbatch_limitに設定します。
  2. 開始点と終了点の適用:
    • relation = apply_limits(relation, start, finish, order): apply_limitsメソッドを使用して、指定された開始点(start)と終了点(finish)に基づいてリレーションにさらなる制限を適用します。これにより、バッチ処理するレコードの範囲を限定します。
  3. クエリキャッシュのスキップ:
    • relation.skip_query_cache!: バッチ処理の性質上、取得したレコードをクエリキャッシュに保持すると、メモリ使用量が増加し、バッチ処理の利点が損なわれる可能性があります。この行で、リレーションがクエリキャッシュを使用しないように設定しています。これにより、各バッチの処理が完了するごとにメモリを解放できるため、メモリ使用量を抑えながら大量のレコードを処理することが可能になります。
  4. バッチ処理用リレーションの設定:
    • batch_relation = relation: 最後に、上記の設定を適用したrelationbatch_relationに代入します。このbatch_relationは、続く処理でバッチごとのレコード取得に使用されます。

このコードブロックにより、in_batchesメソッドは、指定されたパラメータに基づいて、バッチ処理に最適化されたリレーションを効率的に生成し、大量のレコードをメモリに優しく処理する準備を整えます。


loop do
  if load
    records = batch_relation.records
    ids = records.map(&:id)
    yielded_relation = where(primary_key => ids)
    yielded_relation.load_records(records)
  else
    ids = batch_relation.pluck(primary_key)
    yielded_relation = where(primary_key => ids)
  end

このコードは、ActiveRecord::Batchesモジュールのin_batchesメソッド内で定義されており、データベースからバッチ単位でレコードを取得し、それらを順に処理するためのループを実装しています。特に、loadオプションに基づいてレコードを事前にロードするかどうかを制御し、各バッチで処理するレコードのIDセットを取得します。

  1. 無限ループの開始: loop doは無限ループを開始するRubyの構文です。このループ内でバッチ処理が行われ、条件に応じてループから抜け出します(このコード断片では抜け出す条件が示されていませんが、通常は特定の条件でbreakが呼ばれることによってループを終了します)。
  2. loadオプションの確認: if loadは、メソッド呼び出し時にload: trueが指定されているかどうかをチェックします。loadオプションがtrueの場合、バッチ内のレコードはデータベースから直接ロードされます。
  3. レコードのロードが必要な場合:
    • records = batch_relation.recordsは、現在のバッチに対応するレコードをすべて取得します。
    • ids = records.map(&:id)は、これらのレコードからIDを抽出して配列に格納します。
    • yielded_relation = where(primary_key => ids)は、これらのIDを使用して、新たなクエリ(yielded_relation)を作成します。このクエリは、後で使用者に渡される可能性があります。
    • yielded_relation.load_records(records)は、yielded_relationに先にロードしたレコードを関連付けます。
  4. レコードのロードが不要な場合:
    • ids = batch_relation.pluck(primary_key)は、現在のバッチに対応するレコードのIDのみをデータベースから取得します。
    • yielded_relation = where(primary_key => ids)は、これらのIDを使用して新たなクエリ(yielded_relation)を作成します。

この処理は、大量のレコードを効率的に扱うために、バッチ単位でデータベースからレコードを取得し、それらを順に処理することを目的としています。loadオプションを使用することで、必要に応じてレコードをメモリに事前ロードするか、IDのみを取得して後でレコードをロードするかを選択できます。これにより、大規模なデータセットを扱う際のメモリ使用量とパフォーマンスを最適化できます。


break if ids.empty?
  • 終了条件のチェック: ids.empty?は、現在のバッチで処理するレコードのIDが存在するかどうかを確認します。もしids配列が空(つまり、処理するレコードがこれ以上存在しない)場合、ループから抜け出し(break)、バッチ処理を終了します。

primary_key_offset = ids.last
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
  • 最後のレコードのプライマリーキーを取得: ids.lastは、現在のバッチで処理されたレコードの中で最後のもののプライマリーキー(ID)を取得します。これは次のバッチの開始点として使用されます。
  • プライマリーキーの存在確認: もし何らかの理由でプライマリーキー(primary_key_offset)が取得できなかった場合(例えば、カスタムのselect句を使用してIDが選択されていない場合)、ArgumentErrorが発生します。これは、バッチ処理を正しく進めるためにはプライマリーキーが必須であることを保証します。

yield yielded_relation
  • バッチ処理の実行: この行は、得られたyielded_relation(現在のバッチに含まれるレコードのサブセットに対するクエリ)を、in_batchesメソッドにブロックが与えられている場合にそのブロックに渡します。これにより、呼び出し側は各バッチに対して独自の操作(例えば、レコードの更新や削除など)を行うことができます。

break if ids.length < batch_limit
  • バッチサイズに達していない場合の終了条件: ids.length < batch_limitは、現在のバッチに含まれるレコードの数が指定されたバッチサイズ(batch_limit)未満であるかどうかを確認します。もし真(つまり、これ以上処理すべきレコードがない、または次のバッチで処理するレコードが残っていない)であれば、ループから抜け出し(break)、バッチ処理を終了します。

このように、このコードブロックはバッチ処理の核となる部分であり、各バッチの処理とバッチ処理の終了条件を管理しています。


if limit_value
  remaining -= ids.length

  if remaining == 0
    # Saves a useless iteration when the limit is a multiple of the
    # batch size.
    break
  elsif remaining < batch_limit
    relation = relation.limit(remaining)
  end
end

このコードブロックは、ActiveRecord::Batchesモジュール内のin_batchesメソッドで使用されるループ内の一部で、バッチ処理中にクエリに設定されたレコードの上限数(limit_value)を管理し、適切に処理を終了するためのロジックを提供します。

  1. limit_valueのチェック: この部分では、クエリにlimitが設定されているかどうかを確認しています。limit_valueは、クエリによって指定されたレコードの最大取得数です。これが設定されている場合、次のステップに進みます。
  2. 残りレコード数の計算: remaining -= ids.lengthで、現在のバッチで取得されたレコードの数をremaining(残りのレコード数)から引きます。これにより、次のバッチを処理する前に処理すべき残りレコード数を更新します。
  3. 残りレコード数が0の場合: if remaining == 0の条件で、すべてのレコードが処理されたかどうかを確認しています。残りレコード数が0になった場合、これ以上処理するレコードがないため、ループをbreakで終了します。これにより、無駄なイテレーションを避けることができます。
  4. 残りレコード数がバッチサイズより小さい場合: elsif remaining < batch_limitの条件では、残りのレコード数が次のバッチの最大サイズ(batch_limit)よりも少ない場合に、次のバッチのサイズをremainingの値に制限します。これにより、最後のバッチで不必要に多くのレコードを処理しようとすることを防ぎ、クエリの効率を最適化します。

このコードブロックは、特に大規模なデータセットをバッチ処理する際に、メモリ使用量と処理時間を効率的に管理するために重要です。レコードの上限数が設定されている場合、このロジックによって正確な数のレコードのみが処理され、リソースの無駄遣いを防ぎます。


batch_relation = relation.where(
  predicate_builder[primary_key, primary_key_offset, order == :desc ? :lt : :gt]
)

このコードは、ActiveRecord::Batchesモジュールのin_batchesメソッド内のループの一部です。この部分では、次のバッチを取得するための新しいrelation(クエリ)を構築しています。具体的には、既に処理されたレコードの後に続くレコードを選択するために、新しいクエリ条件を設定しています。

  1. relation.where(...): 新しいrelation(ActiveRecordのクエリオブジェクト)を生成します。このクエリは、特定の条件を満たすレコードを選択するために使用されます。
  2. predicate_builder[...]: predicate_builderはActiveRecordの内部クラスの一つで、SQLの条件句(WHERE句など)を構築するために使用されます。この場合、predicate_builderは指定されたプライマリーキーの値を基にした条件を生成します。
  3. primary_key, primary_key_offset, order == :desc ? :lt : :gt: この部分は、バッチ処理の次のステップで処理するレコードを選択するための条件を定義しています。具体的には、primary_key(テーブルのプライマリーキー)、primary_key_offset(現在のバッチで処理されたレコードの最後のプライマリーキーの値)、およびレコードの順序を指定するための条件を含んでいます。
    • order == :desc ? :lt : :gtは、orderオプションが降順(:desc)の場合はプライマリーキーがprimary_key_offsetより小さい(:lt)レコードを選択し、昇順(:asc)の場合はプライマリーキーがそれより大きい(:gt)レコードを選択する条件を設定します。

このロジックにより、各イテレーション(ループの繰り返し)で処理されるレコードのセットは、前のバッチで処理されたレコードの直後から開始されます。この方法で、全てのレコードが順番に、指定されたバッチサイズに従って処理されることが保証されます。また、このアプローチは、大量のデータを効率的に扱う際にパフォーマンスとメモリ使用量を最適化するのに役立ちます。

Discussion