Active Job Continuationsがリリースされたので調査してみた
こんにちは! 株式会社メドレーでエンジニアをしています、FY25新卒のringchangと申します。
こちらの記事は「MEDLEY Summer Tech Blog Relay」の16日目の記事です🎉
同期の仲間に「FY25新卒エンジニア4人でこのブログの4日間分をジャックしよう!」と誘われたことがきっかけで、今回執筆することになりました。(同期怖い…笑)
というわけで、今日から4日間、FY25新卒エンジニア4人による記事が続きます。
はじめに
私が所属する 医療プラットフォーム本部 CLINICS 開発グループ で使っている Rails に、先日のリリース(Rails 8.1 Beta)で便利そうな新機能 Active Job Continuations が加わりました。
今回は自分の勉強を兼ねて、気になったこの機能を簡単にまとめてみます。
参考:Rails 8.1 Beta 1: Job continuations, structured events, local CI
Active Job Continuations の概要
Rails 8.1 Beta で追加された Active Job Continuations は、長時間実行されるジョブを「途中で中断して、後から続きから再開できる」ようにする仕組みです。
Shopify の job-iteration gem に強く影響を受けているそうです。
コード例
class ProcessImportJob
include ActiveJob::Continuable
def perform(import_id)
@import = Import.find(import_id)
# 初期フォーマット用step
step :initialize do
@import.initialize
end
# カーソル付きのステップ。ジョブが中断されたときに、そのカーソルが保存される。
step :process do |step|
@import.records.find_each(start: step.cursor) do |record|
record.process
step.advance! from: record.id
end
end
# 最終フォーマット用step
step :finalize
private
def finalize
@import.finalize
end
end
end
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
ActiveJob::Continuable を include すると、ジョブの処理をstep
単位で書けるようになります。各 step は「完了した位置」を記録でき、失敗や中断から途中再開が可能になります。
この例では、
step :initialize
→ 初期化
step :process
→ レコード処理(途中までのcursor
を保存して再開可能)
step :finalize
→ 最終処理
という流れでジョブが定義されています。
step
上記の例のように、step
メソッドでジョブ内のステップを定義します。ジョブが中断された場合、完了済みのステップはスキップされ、進行中のステップは、最後に記録されたcursor
位置から再開されます。
その他の仕様として以下のものがあります。
- ステップ外のコードは、ジョブが再実行されるたびに毎回実行される
- メソッドにはブロックまたはメソッド名を渡せる
- ブロックには
step
オブジェクトが渡される - メソッドの場合は引数なし、または
step
オブジェクトを1つ受け取る形が可能
cursors
(AIコードエディタではありません)
step
にはオプションでcursor
を使うことができ、ステップ内の進捗を追跡・記録できます。ActiveJob::Base.serialize
でシリアライズ可能なオブジェクトであれば何でも利用でき、デフォルトはnil
です。ステップが再開されると、最後に記録されたcursor
値が復元されます。ステップ内のコードは、このcursor
を使って「どこから再開するか」を制御します。
初期値の設定
ステップ定義時にstart:
で初期のcursor値を指定できます。
step :iterate_items, start: 0 do |step|
items[step.cursor..].each do |item|
process(item)
step.set! step.cursor + 1
end
end
set!
cursor
に特定の値を設定することもできます。
step :iterate_items do |step|
items[step.cursor..].each do |item|
process(item)
step.set! (step.cursor || 0) + 1
end
end
advance!
内部的にcursor.succ
を呼び出して次の値に進めます。
step :iterate_items, start: 0 do |step|
items[step.cursor..].each do |item|
process(item)
step.advance!
end
end
advance!(from: value) オプション
cursor
をレコードIDのように連番でない値に明示的に更新する際に便利です。
step :process_records do |step|
import.records.find_each(start: step.cursor) do |record|
record.process
step.advance! from: record.id
end
end
ネストしたレコードの反復
配列cursorを使うことで二重ループの進捗も管理できます。
step :process_nested_records, start: [ 0, 0 ] do |step|
Account.find_each(start: step.cursor[0]) do |account|
account.records.find_each(start: step.cursor[1]) do |record|
record.process
step.set! [ account.id, record.id + 1 ]
end
step.set! [ account.id + 1, 0 ]
end
end
なにが嬉しいか
仕様の概要をご紹介しましたが、具体的にどのような点がメリットとなる機能でしょうか。
2025/9/4のリリースニュースには、以下のように記載されています。
“Long-running jobs can now be broken into discrete steps that allow execution to continue from the last completed step rather than the beginning after a restart.”
引用元: Rails 8.1 Beta 1: Job continuations, structured events, local CI
つまり、これまでの Active Job では、ジョブは基本的に最初から最後まで一度で処理される前提でした。そのため、処理が途中で失敗した場合は最初からやり直しになり、無駄が発生しやすいという課題がありました。
大量レコードを処理するバッチで途中失敗すると、毎回最初からやり直しになって時間もDB負荷も無駄になる…。こうしたシーンで Active Job Continuations が威力を発揮します。
現状の運用と新機能に対する期待
前段で触れた「処理途中で失敗すると最初からやり直しになってしまう問題」に対して、私が所属する開発グループでは、DBに状態を保存しながら処理を進める方法で対応してきました。
具体的には ApplicationJob 内で以下のようにステート管理を行っています。
class ProcessOrderJob < ApplicationJob
def perform(workflow_id)
wf = OrderWorkflow.find(workflow_id)
case wf.state
when "init"
lock_inventory!(wf)
wf.update!(state: "charged") # DBに状態を保存
Rails.logger.info "Completed: Inventory locked."
when "charged"
charge_customer!(wf)
Rails.logger.info "Completed: Customer charged."
wf.update!(state: "finished") # DBに状態を保存
Rails.logger.info "finished!"
when "finished"
Rails.logger.info "finished!"
end
end
end
class OrderWorkflow < ApplicationRecord
enum state: { init: 0, charged: 1, finished: 2 }
end
この方法では「どこまで進んだか」をDBに書き込みながら進行を管理しますが、
アプリケーション外部に依存する必要がある点が運用上の負担でした。
一方で、Rails 8.1 から導入された Active Job Continuations を使えば、これをRailsアプリケーション内部に完結させられます。
以下は、その対比を示すシンプルな例です。
class ProcessOrderWithContinuationsJob < ApplicationJob
include ActiveJob::Continuable
def perform(workflow_id)
wf = OrderWorkflow.find(workflow_id)
# "init" ステートに対応する処理
step :init do
Rails.logger.info "Starting process for OrderWorkflow ##{wf.id}..."
lock_inventory!(wf)
Rails.logger.info "Step 'init' completed: Inventory locked."
end
# 以前の "charged" ステートに対応する処理
step :charge do
charge_customer!(wf)
Rails.logger.info "Step 'charge' completed: Customer charged."
end
# 以前の "finished" ステートに対応する最終処理
step :finalize do
Rails.logger.info "Process for OrderWorkflow ##{wf.id} finished!"
end
end
end
元々シンプルな例なので、実感が薄いですが、これまで必要だった 「進捗を保存するテーブル」「state管理」「DB更新処理」 が不要になるため、実装がシンプルになり、責務の分離も明確になる点がメリットと言えそうです。
おわりに
今回は扱いませんでしたが、checkpoint!
を利用することでさらにJobの中断再開の粒度を上げることができるようです。Active Job Continuations は、Shopify の job-iteration gem に機能を追いつけるよう、アップデートが継続される予定とのことですので、今後の進化に注目したいところですね。
次回は、人材プラットフォームのホープであり、新卒エンジニア2人目のoginoshikibuさんです!お楽しみに!!👏
Discussion