Railsのin_batchesを読み解く
in_batches
は分割してレコードを取得して処理するメソッドになります。
例
User.where("age > 21").in_batches do |relation|
relation.delete_all
sleep(10) # Throttle the delete queries
end
分割して取得して削除
User.in_batches.delete_all
分割して取得して更新
User.in_batches.update_all(awesome: true)
また同様なメソッドとして、find)_each
やfind_in_batches
がありますが、どちらも内部的にin_batches
が呼ばれています。
この3つのメソッドの違いとしては、どのような形で値を返すかの違いになります。
このメソッドの違いについて別の記事にまとめようと思っています。
では早速、in_batches
のコードを読み解いてていきます。
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: :asc)
オプションのそれぞれの内容はこのようになっています。
オプション | デフォルト値 | 説明 |
---|---|---|
of |
1000 |
一度に処理するレコードのバッチサイズを指定する。この値に基づいて、レコードは小さなグループに分割され、各グループが個別に処理される。 |
start |
nil |
バッチ処理を開始するレコードのプライマリーキーの値を指定する。この値を設定すると、指定された値以上のプライマリーキーを持つレコードから処理が開始される。 |
finish |
nil |
バッチ処理を終了するレコードのプライマリーキーの値を指定する。この値を設定すると、指定された値以下のプライマリーキーを持つレコードまでが処理の対象となる。 |
load |
false |
レコードをメモリに事前ロードするかどうかを指定しする。true に設定すると、各バッチのレコードがデータベースから読み込まれてから処理される。false の場合は、レコードは必要に応じて後からロードされる。 |
error_on_ignore |
nil |
クエリに設定された順序が無視された場合にエラーを発生させるかどうかを指定する。true に設定すると、順序が無視された場合にエラーが発生する。 |
order |
:asc |
レコードを処理する順序を指定する。:asc は昇順(プライマリーキーが小さい順)、:desc は降順(プライマリーキーが大きい順)を意味する。 |
これらのオプションを使用することで、in_batches
メソッドの挙動を細かく制御し、大量のデータを効率的に扱うことができます。たとえば、特定の範囲のレコードのみを対象に処理を行ったり、処理の順序を逆にしたりすることが可能です。
relation = self
self
はActiveRecord::Batches
モジュールをミックスインしているモデルのクラスインスタンスを指している。ActiveRecord::Batches
モジュールは、ActiveRecord::Base
から派生したモデルによって使用されることを意図しているため、このコンテキストではself
はそのようなモデルのクラスインスタンス(例えばPerson
や他のActiveRecordモデル)を参照している
unless block_given?
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self)
end
unless block_given?
: この行はRubyのblock_given?
メソッドを使用している
block_given?
メソッドは、現在のメソッド呼び出しにブロックが与えられているかどうかをチェックする。もしブロックが与えられていなければ、このunless
ブロック内のコードが実行される。
-
return BatchEnumerator.new(...)
: ブロックが与えられていない場合、BatchEnumerator
クラスの新しいインスタンスが作成されます。BatchEnumerator
は、バッチ処理のための列挙子(Enumerator)を提供するクラスです。このインスタンスは、in_batches
メソッドに渡された引数(of
,start
,finish
)と、self
(in_batches
メソッドを呼び出しているActiveRecordモデルのインスタンス)を使用して初期化されます。 -
BatchEnumerator.new(of: of, start: start, finish: finish, relation: self)
: ここではBatchEnumerator
のコンストラクタに、バッチサイズ(of
)、開始ID(start
)、終了ID(finish
)、および関連するレコードの集合(relation
)を渡しています。relation: self
は、in_batches
が呼び出されたモデルの現在のクエリ(スコープ)をBatchEnumerator
に渡すために使用されます。
このロジックの主な目的は、in_batches
メソッドがブロックを受け取らずに呼び出された場合(例えばPerson.in_batches
のように)、バッチ処理を制御するための列挙子(Enumerator)を返すことです。これにより、メソッドの呼び出し側は、必要に応じて列挙子を使用して独自の処理を行うことができます。
unless [:asc, :desc].include?(order)
raise ArgumentError, ":order must be :asc or :desc, got #{order.inspect}"
end
:order
オプションが:asc
または:desc
以外の値である場合、ArgumentError
が発生させる
if arel.orders.present?
act_on_ignored_order(error_on_ignore)
end
-
arel.orders.present?
: この行では、arel
オブジェクトのorders
属性が存在し、かつ空でないかどうかをチェックしています。arel
は、Active Recordの下層にあるArel(A Relational Algebra)ライブラリを指し、SQLクエリの生成を担当しています。orders
属性は、レコードの並び順(ORDER BY句)に関する情報を含んでいます。このチェックにより、現在のクエリに何らかの並び順が設定されているかどうかを確認しています。 -
act_on_ignored_order(error_on_ignore)
: もしorders
属性が存在する場合(つまり、並び順がクエリに設定されている場合)、act_on_ignored_order
メソッドが呼び出されます。このメソッドは、バッチ処理において設定された並び順をどのように扱うかを決定します。error_on_ignore
パラメータは、並び順が無視されるべきかどうか、また無視される場合にエラーを発生させるべきかどうかを指定するために使用されます。
バッチ処理では、レコードは主キーの順序で処理されることが一般的です。したがって、ユーザーが設定した並び順は通常無視されます。しかし、この挙動はerror_on_ignore
オプションによってカスタマイズ可能であり、無視されるべき並び順が存在する場合にエラーを発生させることもできます。このように、act_on_ignored_order
メソッドは、並び順に関する設定を適切に処理し、バッチ処理の動作を制御するために重要な役割を果たします。
User.order(:desc).arel.orders.present?
=> true
batch_limit = of
if limit_value
remaining = limit_value
batch_limit = remaining if remaining < batch_limit
end
このコードブロックは、ActiveRecord::Batches
モジュールのin_batches
メソッド内で、バッチ処理を行う際のレコードの最大数を設定するために使用されます。具体的には、指定されたバッチサイズ(of
)と、クエリに設定されたレコードの上限数(limit_value
)を考慮して、実際のバッチ処理におけるレコードの最大数(batch_limit
)を決定します。
-
初期設定:
batch_limit = of
により、バッチ処理の最大レコード数は、初期値としてメソッド呼び出し時に指定されたバッチサイズ(of
)に設定されます。 -
limit_value
の存在チェック: 次に、if limit_value
によって、クエリにレコードの上限数が指定されているかどうかをチェックします。limit_value
は、クエリで.limit
メソッドを使用して設定されたレコードの上限数です。 -
バッチサイズとレコード上限数の比較: もしレコードの上限数(
limit_value
)が指定されている場合、それが現在のバッチサイズ(batch_limit
)よりも小さいかどうかをチェックします。もしlimit_value
がbatch_limit
よりも小さい場合、それはクエリで設定されたレコードの上限数が現在のバッチサイズよりも小さいことを意味します。そのため、バッチ処理におけるレコードの最大数(batch_limit
)をlimit_value
(残りのレコード数)に設定します。
このロジックにより、in_batches
メソッドは、クエリに設定されたレコードの上限数を超えないようにしつつ、指定されたバッチサイズを効率的に利用することができます。特に大量のデータを扱う場合には、この挙動によってメモリ使用量を適切に管理し、パフォーマンスを最適化することが可能になります。
relation = relation.reorder(batch_order(order)).limit(batch_limit)
relation = apply_limits(relation, start, finish, order)
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
batch_relation = relation
このコードブロックは、ActiveRecord::Batches
モジュールのin_batches
メソッド内で、バッチ処理のためのActive Recordリレーションを準備する過程を示しています。バッチ処理におけるレコードの取得順序、レコードの数、開始点と終了点の制限、そしてクエリキャッシュのスキップなど、処理に必要な設定を適用しています。
-
並び替えと制限の適用:
-
relation.reorder(batch_order(order)).limit(batch_limit)
: 最初に、relation
(現在のActive Recordリレーション)に対して、reorder
メソッドを使用して並び順を設定し直します。batch_order(order)
は、バッチ処理を実行する際の並び順(昇順または降順)を決定するメソッドで、order
引数に基づいています。その後、limit(batch_limit)
を適用して、一度に取得するレコードの最大数をbatch_limit
に設定します。
-
-
開始点と終了点の適用:
-
relation = apply_limits(relation, start, finish, order)
:apply_limits
メソッドを使用して、指定された開始点(start
)と終了点(finish
)に基づいてリレーションにさらなる制限を適用します。これにより、バッチ処理するレコードの範囲を限定します。
-
-
クエリキャッシュのスキップ:
-
relation.skip_query_cache!
: バッチ処理の性質上、取得したレコードをクエリキャッシュに保持すると、メモリ使用量が増加し、バッチ処理の利点が損なわれる可能性があります。この行で、リレーションがクエリキャッシュを使用しないように設定しています。これにより、各バッチの処理が完了するごとにメモリを解放できるため、メモリ使用量を抑えながら大量のレコードを処理することが可能になります。
-
-
バッチ処理用リレーションの設定:
-
batch_relation = relation
: 最後に、上記の設定を適用したrelation
をbatch_relation
に代入します。このbatch_relation
は、続く処理でバッチごとのレコード取得に使用されます。
-
このコードブロックにより、in_batches
メソッドは、指定されたパラメータに基づいて、バッチ処理に最適化されたリレーションを効率的に生成し、大量のレコードをメモリに優しく処理する準備を整えます。
loop do
if load
records = batch_relation.records
ids = records.map(&:id)
yielded_relation = where(primary_key => ids)
yielded_relation.load_records(records)
else
ids = batch_relation.pluck(primary_key)
yielded_relation = where(primary_key => ids)
end
このコードは、ActiveRecord::Batches
モジュールのin_batches
メソッド内で定義されており、データベースからバッチ単位でレコードを取得し、それらを順に処理するためのループを実装しています。特に、load
オプションに基づいてレコードを事前にロードするかどうかを制御し、各バッチで処理するレコードのIDセットを取得します。
-
無限ループの開始:
loop do
は無限ループを開始するRubyの構文です。このループ内でバッチ処理が行われ、条件に応じてループから抜け出します(このコード断片では抜け出す条件が示されていませんが、通常は特定の条件でbreak
が呼ばれることによってループを終了します)。 -
load
オプションの確認:if load
は、メソッド呼び出し時にload: true
が指定されているかどうかをチェックします。load
オプションがtrue
の場合、バッチ内のレコードはデータベースから直接ロードされます。 -
レコードのロードが必要な場合:
-
records = batch_relation.records
は、現在のバッチに対応するレコードをすべて取得します。 -
ids = records.map(&:id)
は、これらのレコードからIDを抽出して配列に格納します。 -
yielded_relation = where(primary_key => ids)
は、これらのIDを使用して、新たなクエリ(yielded_relation
)を作成します。このクエリは、後で使用者に渡される可能性があります。 -
yielded_relation.load_records(records)
は、yielded_relation
に先にロードしたレコードを関連付けます。
-
-
レコードのロードが不要な場合:
-
ids = batch_relation.pluck(primary_key)
は、現在のバッチに対応するレコードのIDのみをデータベースから取得します。 -
yielded_relation = where(primary_key => ids)
は、これらのIDを使用して新たなクエリ(yielded_relation
)を作成します。
-
この処理は、大量のレコードを効率的に扱うために、バッチ単位でデータベースからレコードを取得し、それらを順に処理することを目的としています。load
オプションを使用することで、必要に応じてレコードをメモリに事前ロードするか、IDのみを取得して後でレコードをロードするかを選択できます。これにより、大規模なデータセットを扱う際のメモリ使用量とパフォーマンスを最適化できます。
break if ids.empty?
-
終了条件のチェック:
ids.empty?
は、現在のバッチで処理するレコードのIDが存在するかどうかを確認します。もしids
配列が空(つまり、処理するレコードがこれ以上存在しない)場合、ループから抜け出し(break
)、バッチ処理を終了します。
primary_key_offset = ids.last
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
-
最後のレコードのプライマリーキーを取得:
ids.last
は、現在のバッチで処理されたレコードの中で最後のもののプライマリーキー(ID)を取得します。これは次のバッチの開始点として使用されます。 -
プライマリーキーの存在確認: もし何らかの理由でプライマリーキー(
primary_key_offset
)が取得できなかった場合(例えば、カスタムのselect
句を使用してIDが選択されていない場合)、ArgumentError
が発生します。これは、バッチ処理を正しく進めるためにはプライマリーキーが必須であることを保証します。
yield yielded_relation
-
バッチ処理の実行: この行は、得られた
yielded_relation
(現在のバッチに含まれるレコードのサブセットに対するクエリ)を、in_batches
メソッドにブロックが与えられている場合にそのブロックに渡します。これにより、呼び出し側は各バッチに対して独自の操作(例えば、レコードの更新や削除など)を行うことができます。
break if ids.length < batch_limit
-
バッチサイズに達していない場合の終了条件:
ids.length < batch_limit
は、現在のバッチに含まれるレコードの数が指定されたバッチサイズ(batch_limit
)未満であるかどうかを確認します。もし真(つまり、これ以上処理すべきレコードがない、または次のバッチで処理するレコードが残っていない)であれば、ループから抜け出し(break
)、バッチ処理を終了します。
このように、このコードブロックはバッチ処理の核となる部分であり、各バッチの処理とバッチ処理の終了条件を管理しています。
if limit_value
remaining -= ids.length
if remaining == 0
# Saves a useless iteration when the limit is a multiple of the
# batch size.
break
elsif remaining < batch_limit
relation = relation.limit(remaining)
end
end
このコードブロックは、ActiveRecord::Batches
モジュール内のin_batches
メソッドで使用されるループ内の一部で、バッチ処理中にクエリに設定されたレコードの上限数(limit_value
)を管理し、適切に処理を終了するためのロジックを提供します。
-
limit_value
のチェック: この部分では、クエリにlimit
が設定されているかどうかを確認しています。limit_value
は、クエリによって指定されたレコードの最大取得数です。これが設定されている場合、次のステップに進みます。 -
残りレコード数の計算:
remaining -= ids.length
で、現在のバッチで取得されたレコードの数をremaining
(残りのレコード数)から引きます。これにより、次のバッチを処理する前に処理すべき残りレコード数を更新します。 -
残りレコード数が0の場合:
if remaining == 0
の条件で、すべてのレコードが処理されたかどうかを確認しています。残りレコード数が0になった場合、これ以上処理するレコードがないため、ループをbreak
で終了します。これにより、無駄なイテレーションを避けることができます。 -
残りレコード数がバッチサイズより小さい場合:
elsif remaining < batch_limit
の条件では、残りのレコード数が次のバッチの最大サイズ(batch_limit
)よりも少ない場合に、次のバッチのサイズをremaining
の値に制限します。これにより、最後のバッチで不必要に多くのレコードを処理しようとすることを防ぎ、クエリの効率を最適化します。
このコードブロックは、特に大規模なデータセットをバッチ処理する際に、メモリ使用量と処理時間を効率的に管理するために重要です。レコードの上限数が設定されている場合、このロジックによって正確な数のレコードのみが処理され、リソースの無駄遣いを防ぎます。
batch_relation = relation.where(
predicate_builder[primary_key, primary_key_offset, order == :desc ? :lt : :gt]
)
このコードは、ActiveRecord::Batches
モジュールのin_batches
メソッド内のループの一部です。この部分では、次のバッチを取得するための新しいrelation
(クエリ)を構築しています。具体的には、既に処理されたレコードの後に続くレコードを選択するために、新しいクエリ条件を設定しています。
-
relation.where(...)
: 新しいrelation
(ActiveRecordのクエリオブジェクト)を生成します。このクエリは、特定の条件を満たすレコードを選択するために使用されます。 -
predicate_builder[...]
:predicate_builder
はActiveRecordの内部クラスの一つで、SQLの条件句(WHERE句など)を構築するために使用されます。この場合、predicate_builder
は指定されたプライマリーキーの値を基にした条件を生成します。 -
primary_key, primary_key_offset, order == :desc ? :lt : :gt
: この部分は、バッチ処理の次のステップで処理するレコードを選択するための条件を定義しています。具体的には、primary_key
(テーブルのプライマリーキー)、primary_key_offset
(現在のバッチで処理されたレコードの最後のプライマリーキーの値)、およびレコードの順序を指定するための条件を含んでいます。-
order == :desc ? :lt : :gt
は、order
オプションが降順(:desc
)の場合はプライマリーキーがprimary_key_offset
より小さい(:lt
)レコードを選択し、昇順(:asc
)の場合はプライマリーキーがそれより大きい(:gt
)レコードを選択する条件を設定します。
-
このロジックにより、各イテレーション(ループの繰り返し)で処理されるレコードのセットは、前のバッチで処理されたレコードの直後から開始されます。この方法で、全てのレコードが順番に、指定されたバッチサイズに従って処理されることが保証されます。また、このアプローチは、大量のデータを効率的に扱う際にパフォーマンスとメモリ使用量を最適化するのに役立ちます。
Discussion