初めてのSolid Queue
先日、会社メンバーと Kaigi on Rails 2025 に参加してきました。
Solid Queueを実務で本格的に触ったことがなかったのですが、非同期処理に関するトークを聞いているうちに学んでみたい欲が出てきたので、ジョブをキューに登録してから実行されるまでの流れを手元で動かしながら確認し、学びになったと感じたことを簡単にまとめてみました📝
Solid Queueとは?
Active Job用のDBベースのキューイングバックエンド。
基本的な機能
- 通常のジョブのエンキューと処理
- 並行性制御(concurrency controls)
- 繰り返しジョブ(recurring jobs)のサポート など
その他の機能
- キューの一時停止(pausing queues)
- ジョブごとの数値優先度設定
- キュー順序による優先度設定
- バルクエンキュー(enqueue_all for Active Job's perform_all_later) など
特徴
- MySQL、PostgreSQL、SQLiteなどのSQLデータベースに対応
-
FOR UPDATE SKIP LOCKED句を活用してポーリング時のブロッキングを回避 - Active Jobによる再試行、破棄、エラーハンドリング、シリアライゼーション、遅延処理
- Ruby on Railsのマルチスレッド対応
4つのアクター
Solid Queueには4種類のアクターが存在します。
ワーカー(Worker)
役割: 実行準備が整ったジョブをキューから取得し、処理を実行する
-
solid_queue_ready_executionsテーブルからジョブを取得 - 実際にジョブの処理を実行する
- マルチスレッドで並列処理が可能
ディスパッチャ(Dispatcher)
役割: 将来実行予定のジョブのうち、実行時刻が到来したものを選択し、振り分ける
-
solid_queue_scheduled_executionsからsolid_queue_ready_executionsへジョブを移動 - ワーカーがジョブを取得できるように準備
スケジューラ(Scheduler)
役割: 繰り返しタスク(recurring tasks)を管理し、実行時刻が到来したときにジョブをエンキューする
- 定期実行タスクのスケジュール管理
- 実行時刻の監視とジョブの自動生成
スーパーバイザ(Supervisor)
役割: 設定に従ってワーカーとディスパッチャを実行し、制御する
- ワーカーとディスパッチャの起動・停止制御
- ハートビートの管理
- 必要に応じてプロセスの停止・開始
- 各監視対象(ワーカー/ディスパッチャ/スケジューラ)に対して別々のプロセスをフォークして実行
DBに作成されるテーブルと役割
Solid Queueを導入すると、下記のように様々なテーブルがデータベースに作成されます。
それぞれの役割は下記の通りです。
1. solid_queue_jobs
- ジョブのメタデータを格納するメインテーブル
- カラム:
id,queue_name,class_name,arguments,priority,active_job_id,scheduled_at,finished_at,concurrency_key,created_at,updated_at - 役割: ジョブの基本情報(ジョブのクラス名、キュー名、優先度など)を永続化
2. solid_queue_ready_executions
- 実行可能なジョブのキュー
- カラム:
id,job_id,queue_name,priority,created_at - 役割: DispatcherがWorkerに振り分けるジョブを格納
3. solid_queue_scheduled_executions
- 将来実行予定のジョブ
- カラム:
id,job_id,queue_name,priority,scheduled_at,created_at - 役割: 遅延実行やスケジュール実行のジョブを管理
4. solid_queue_claimed_executions
- ワーカーが取得したジョブ
- カラム:
id,job_id,process_id,created_at - 役割: 現在実行中のジョブを追跡
5. solid_queue_blocked_executions
- ブロックされたジョブ
- カラム:
id,job_id,queue_name,priority,concurrency_key,expires_at,created_at - 役割: 同時実行制御によりブロックされたジョブを管理
6. solid_queue_failed_executions
- 失敗したジョブの詳細情報
- カラム:
id,job_id,error,created_at - 役割: エラー情報を保存
7. solid_queue_pauses
- キューやタスクの一時停止状態を管理
- カラム:
id,queue_name,created_at - 役割: 特定のキューやジョブの実行を一時停止する情報を保存
8. solid_queue_processes
- ワーカーやスケジューラーなどのプロセスを監視
- カラム:
id,kind,last_heartbeat_at,supervisor_id,pid,hostname,metadata,created_at,name - 役割: 各プロセスのハートビート情報を記録
9. solid_queue_semaphores
- 同時実行制御(Concurrency Control)のためのセマフォを管理
- カラム:
id,key,value,expires_at,created_at,updated_at - 役割: 特定のジョブやジョブグループに対して同時に実行可能なジョブの数を制御
10. solid_queue_recurring_tasks
- 定期的なタスク(Recurring Tasks)の設定を管理
- カラム:
id,key,schedule,command,class_name,arguments,queue_name,priority,static,description,created_at,updated_at - 役割: 定期実行タスクのキー、ジョブクラス、スケジュール、引数などを保存
11. solid_queue_recurring_executions
- 定期タスクの実行履歴を管理
- カラム:
id,job_id,task_key,run_at,created_at - 役割: 各定期タスクの実行時刻を記録し、タスクが重複して実行されるのを防ぐ
ジョブの実行フロー
Rails8でrails newし、Solid Queueの導入や設定は公式のREADMEに記載されている通り行いました。
スケジューリングされたジョブがどのような流れで処理されるのか、下記のようなシンプルなジョブを作成し、挙動を確認してみました。
# app/jobs/sample_job.rb
class SampleJob < ApplicationJob
queue_as :default
def perform(message)
puts "=" * 50
puts "SampleJob 実行開始: #{Time.current}"
puts "メッセージ: #{message}"
puts "job_id: #{job_id}"
puts "キュー: #{queue_name}"
puts "=" * 50
rescue => e
puts "エラー発生: #{e.message}"
raise e
end
end
ジョブをキューに登録
コンソールで下記を実行すると
SampleJob.set(wait: 60.seconds).perform_later("test")
下記のSQLが発行され、ジョブがキューに登録されます
BEGIN immediate TRANSACTION;
-- 1. solid_queue_jobsテーブルにジョブ情報を格納
INSERT INTO "solid_queue_jobs" (
"queue_name",
"class_name",
"arguments",
"priority",
"active_job_id",
"scheduled_at",
"finished_at",
"concurrency_key",
"created_at",
"updated_at"
) VALUES (
'default',
'SampleJob',
'{
"job_class": "SampleJob",
"job_id": "f9a0a29f-5c06-4759-abab-b4f9c27648ce",
"provider_job_id": null,
"queue_name": "default",
"priority": null,
"arguments": ["test"],
"executions": 0,
"exception_executions": {},
"locale": "en",
"timezone": "UTC",
"enqueued_at": "2025-10-05T05:08:27.583649505Z",
"scheduled_at": "2025-10-05T05:09:27.581574440Z"
}',
0,
'f9a0a29f-5c06-4759-abab-b4f9c27648ce',
'2025-10-05 05:09:27.581574',
NULL,
NULL,
'2025-10-05 05:08:27.591065',
'2025-10-05 05:08:27.591065'
) RETURNING "id";
SAVEPOINT active_record_1;
-- 2. 作成したジョブを取得
SELECT "solid_queue_jobs".*
FROM "solid_queue_jobs"
WHERE "solid_queue_jobs"."id" = 24
LIMIT 1;
-- 3. solid_queue_scheduled_executionsテーブルにスケジュール情報を格納
INSERT INTO "solid_queue_scheduled_executions" (
"job_id",
"queue_name",
"priority",
"scheduled_at",
"created_at"
) VALUES (
24,
'default',
0,
'2025-10-05 05:09:27.581574',
'2025-10-05 05:08:27.633359'
) RETURNING "id";
RELEASE SAVEPOINT active_record_1;
COMMIT TRANSACTION;
Dispatcherによるジョブの振り分け
スケジュールされたジョブが実行時刻に達すると、Dispatcherが該当ジョブをsolid_queue_scheduled_executionsからsolid_queue_ready_executionsへ移動します。
ログは下記の通りで、実際に実行されているSQLは表示されていませんでしたが、
SolidQueue-1.2.1 Dispatch scheduled jobs (29.0ms) batch_size: 500, size: 1
おそらく下記のようなSQLが実行されているはず。
1. 実行時刻が到来したジョブを検索
SELECT "solid_queue_scheduled_executions".*
FROM "solid_queue_scheduled_executions"
WHERE "solid_queue_scheduled_executions"."scheduled_at" <= NOW()
ORDER BY "solid_queue_scheduled_executions"."priority" ASC,
"solid_queue_scheduled_executions"."scheduled_at" ASC
LIMIT 500;
2. 実行可能なジョブをready_executionsに移動
INSERT INTO "solid_queue_ready_executions" (
"job_id",
"queue_name",
"priority",
"created_at"
)
SELECT
"job_id",
"queue_name",
"priority",
NOW()
FROM "solid_queue_scheduled_executions"
WHERE "scheduled_at" <= NOW()
LIMIT 500;
3. scheduled_executionsから削除
DELETE FROM "solid_queue_scheduled_executions"
WHERE "scheduled_at" <= NOW()
LIMIT 500;
Workerによるジョブ実行
solid_queue_ready_executionsに格納されていたジョブは、solid_queue_claimed_executionsに移動し、実行中状態となります。(ログは下記の通り)
SolidQueue-1.2.1 Claim jobs (9.1ms) process_id: 29, job_ids: [24], claimed_job_ids: [24], size: 1
1. ジョブの実行
ジョブの情報が取得され、ジョブが実行されます。
SolidQueue::Job Load (0.2ms) SELECT "solid_queue_jobs".* FROM "solid_queue_jobs" WHERE "solid_queue_jobs"."id" = 24 LIMIT 1
[ActiveJob] [SampleJob] [f9a0a29f-5c06-4759-abab-b4f9c27648ce] Performing SampleJob...
==================================================
SampleJob 実行開始: 2025-10-05 05:09:28 UTC
メッセージ: test
job_id: f9a0a29f-5c06-4759-abab-b4f9c27648ce
キュー: default
==================================================
[ActiveJob] [SampleJob] [f9a0a29f-5c06-4759-abab-b4f9c27648ce] Performed SampleJob... in 3.05ms
2. 実行完了後の処理
ジョブの実行が完了すると、solid_queue_jobsテーブルのfinished_atカラムが更新され、solid_queue_claimed_executionsテーブルに格納されていたジョブ情報が削除されます。
発行されるSQLは下記の通り。
BEGIN immediate TRANSACTION
-- ジョブの実行完了日時を保存
UPDATE "solid_queue_jobs"
SET "updated_at" = '2025-10-05 05:09:28.468817',
"finished_at" = '2025-10-05 05:09:28.468817'
WHERE "solid_queue_jobs"."id" = 24
-- solid_queue_claimed_executionsテーブルに格納していたジョブ情報を削除
DELETE FROM "solid_queue_claimed_executions"
WHERE "solid_queue_claimed_executions"."id" = 24
COMMIT TRANSACTION
処理フロー図
ここまで確認してきた処理の流れを図でも整理してみました。
ジョブをキューに登録
Dispatcherによるジョブの振り分け
Workerによる実行
まとめ
以上、今回学んだことをまとめると下記の通りです。
- Solid QueueはActive Job用のDBベースのキューイングバックエンド。
- 生成される11個のテーブルがそれぞれ異なる役割を持ち、ジョブのライフサイクル全体を管理している
- Solid Queueには4つのアクター(ワーカー、ディスパッチャ、スケジューラ、スーパーバイザ)がある
- ディスパッチャは、実行予約されたJobが実行時期を迎えた際、ワーカーが処理できるよう振り分ける。
- ワーカーは、実行可能になったジョブをキューから選択して処理する。
今回確認できたのはジョブの登録から実行までの基本的な流れのみだけど、今後は下記事項についても学んでいきたい。
- ジョブが失敗した時の挙動
- 同時実行制御(Concurrency Control)の仕組み
- FOR UPDATE SKIP LOCKED
- キューの一時停止
- 定期的なタスクの管理
- Sidekiqとの比較
- スレッドとプロセスの設定
Discussion