Laravelキューの流量と排他の制御とセマフォによる同時実行数の制御
Social Databank Advent Calendar 2025 の2日目です。
概要
Laravelでジョブを非同期処理する際、キュー を使用する。
キューは Supervisor などによって起動された php artisan queue:work プロセスによって処理される。
プロセスがジョブをバランスよく処理するためには流量の制御や排他の制御などが必要になる。
キューの使い分けを使った優先度の制御
php artisan queue:work コマンドでキューを指定した順でジョブが取り出される。
(キューの優先度)
Worker の挙動はシンプルである。
1つ目のキューでpopしてジョブを取得出来たらそれを返す。取得できなければ2つ目のキューに進むというものだ。
namespace Illuminate\Queue;
class Worker
{
/**
* 次処理するジョブを取得
*/
protected function getNextJob(Queue $connection, string $queue): Job|null
{
foreach (explode(',', $queue) as $index => $queue) {
if (! is_null($job = $connection->pop($queue, $index))) {
return $job;
}
}
}
}
前述の例のように、highキューとlowキューを作り、プロセスを占有してしまう恐れがあるジョブ群をlowキューにしてしまえば、
他のジョブが滞留する心配もなくなる。
Laravel Horizonによるプロセス数のバランシング
Laravel Horizon を導入することで、キューを処理するプロセス数自体を制御できる。
実装の紹介は割愛する。
Laravel組み込みミドルウェアを使った制御
Laravelのジョブには ミドルウェア の仕組みがあり、組み込みのミドルウェアを使って流量と排他の制御ができる。
RateLimited ミドルウェアを使った流量制御
RateLimited により、レート制限を実装できる。
導入は ミドルウェアの説明 自体が詳しいのでそちらに譲るが、
ルーティングのレート制限機能 などに用いられる、キャッシュの RateLimiter クラスを用いて流量制御する。
namespace Illuminate\Cache;
class RateLimiter
{
protected Cache\Repository $cache;
/**
* レート制限内ならばコールバックを実行
*
* @param string $key
* @param int $maxAttempts 設定時間内で実行可能な最大回数
* @param \Closure $callback
* @param int $decaySeconds 設定時間
*/
public function attempt($key, $maxAttempts, Closure $callback, $decaySeconds = 60)
{
if ($this->tooManyAttempts($key, $maxAttempts)) {
return ;
}
$callback();
$this->increment($key, $decaySeconds);
}
/**
* レート超過かどうか
*/
public function tooManyAttempts($key, $maxAttempts): bool
{
// $keyには試行回数が格納されている
// 試行回数以内か && タイマーが存在するか
return ($this->cache->get($key, 0) >= $maxAttempts) && $this->cache->has($key.':timer');
}
/**
* 実行回数に+1
*/
public function increment($key, $decaySeconds = 60)
{
// 設定時間内かどうかを判定するタイマーをセット
$this->cache->add(
$key.':timer', $this->availableAt($decaySeconds), $decaySeconds
);
// インクリメント
$this->cache->increment($key, 1);
}
}
前述のミドルウェアの説明の例で登場する Limit::perMinute(50)->by($job->user->id) を考えると、
あるユーザー(ID=1)の、ジョブを処理してから1分以内には計50回までしかジョブを実行できないことになる。
各分0秒~59秒にかかる制限ではないことに注意。
RateLimitedWithRedis ミドルウェアを使った流量制御
RateLimited の補足として
Redisを使用している場合は、
Illuminate\Queue\Middleware\RateLimitedWithRedisミドルウェアを使用できます。これは、Redis用に微調整されており、基本的なレート制限ミドルウェアよりも効率的です。
とあるように、キャッシュにRedisを使っている場合は、RateLimitedWithRedisに差し替えることでより効率的になる。
こちらでは RateLimiter の代わりに DurationLimiter が使われている。
DurationLimiterではLUAスクリプトを使って、時間帯と回数をハッシュとして保存している。
(このLUAスクリプトには、意味の無い返り値や一貫性の無い返り値などが散見されるため、ミリ秒単位での制御など将来の拡張を見越して作られている可能性もある)
WithoutOverlapping ミドルウェアを使った排他制御
ジョブのオーバーラップ、同種のジョブが同時に複数のプロセスによって処理されることを
WithoutOverlapping ミドルウェアにより防止できる。
これはキャッシュのLock機能を使っており、例えばRedisによる実装 RedisLock では SETNX コマンドを使ってロックを実装している。
namespace Illuminate\Cache;
class RedisLock extends Lock
{
protected Redis\Connections\Connection $redis;
protected string $name;
/**
* ロック取得
* @return bool
*/
public function acquire(): bool
{
return $this->redis->setnx($this->name, 1) === 1;
}
/**
* ロック開放
* @return bool
*/
public function release()
{
return (bool) $this->redis->del($this->name);
}
}
セマフォの導入
組み込みミドルウェアを使った流量と排他の制御だけではジョブの並列実行数を制御できない場合がある。
RateLimited ミドルウェアを使った流量制御は、ジョブの開始を制御するのみであるため、
ジョブの処理時間がレートの設定秒数を上回る場合、ジョブがレート制限数以上のプロセスを占有する可能性がある。
WithoutOverlapping ミドルウェアを使った排他制御は、キーを占有できるのは1つのプロセスのみであり、nつのプロセスまで占有を許すといった処理を書けない。
例: RateLimited で1分5つまでに制限しているジョブの処理に2分かかる場合、同時に10つのジョブが処理中となることがある。
常にプロセスで実行されるジョブの数を一定数以下に抑える必要がある場合は セマフォ の導入が必要になる。
RedisをバックエンドとしたセマフォをLaravelに導入しよう。
Redis in Action(2013)におけるセマフォの実装
Redis in Action では、ソート済みセットを用いたタイムアウト処理付きのセマフォの実装が紹介されている。
ただしこの実装はサーバーの時刻同期ズレがある場合、ロックを取得できるサーバーが偏るとして、
ソート済みセットをタイムアウト判定用と、占有判定用の2つ用意した改善版の実装を 6.3.2 Fair Semaphore として紹介している。
注釈を付けて実装を引用する。
# 占有
def acquire_fair_semaphore(conn, semname, limit, timeout=10):
identifier = str(uuid.uuid4())
# semname # タイムアウト判定セット
czset = semname + ':owner' # 占有判定セット
ctr = semname + ':counter' # カウンタ
now = time.time()
pipeline = conn.pipeline(True) # トランザクション処理の開始
pipeline.zremrangebyscore(semname, '-inf', now - timeout) # タイムアウト済みのIDを消去
# タイムアウト処理
# タイムアウト済みのIDを占有判定セットから消去
# (タイムアウト判定セットに登録されているIDのみ残す)
pipeline.zinterstore(czset, {czset: 1, semname: 0})
pipeline.incr(ctr) # カウンタ取得
counter = pipeline.execute()[-1]
pipeline.zadd(semname, identifier, now) # タイムアウト判定セットに登録
pipeline.zadd(czset, identifier, counter) # 占有判定セットに登録
pipeline.zrank(czset, identifier) # 占有判定セットでの順位を取得
if pipeline.execute()[-1] < limit: # 順位が最大実行数を下回るなら占有とする
return identifier
# 占有失敗
pipeline.zrem(semname, identifier) # タイムアウト判定セットから削除
pipeline.zrem(czset, identifier) # 占有判定セットから削除
pipeline.execute()
return None
# 解放
def release_fair_semaphore(conn, semname, identifier):
pipeline = conn.pipeline(True) # トランザクション処理の開始
pipeline.zrem(semname, identifier) # タイムアウトセットから削除
pipeline.zrem(semname + ':owner', identifier) # 占有判定セットから削除
return pipeline.execute()[0]
now がサーバーの時刻ズレによって前後する場合があるので、ctr カウンタへのインクリメント順の並びで占有判定をすることによって、
サーバーの時刻ズレによる占有のしやすさに差が出ないようにしている。
(pipeline.zrank(semname, identifier) で順位を判定した場合、サーバーの時刻が早くズレているサーバーが占有しやすくなってしまう。)
現行のRedisAPIを用いた改善
ただしこのスクリプトはRedis2.x時代のものであることに注意が必要だ。
2.x時代に追加された機能である TIME コマンドを使うとRedisサーバーの時刻で判定されるため、ctr カウンタを用いなくても同様の効果が得られるだろう。
# 占有
def acquire_fair_semaphore(conn, semname, limit, timeout=10):
identifier = str(uuid.uuid4())
# semname # 占有・タイムアウト判定セット
nowsec, nowusec = redis_client.time() # サーバーの秒数を取得
now = nowsec + nowusec / 1_000_000
pipeline = conn.pipeline(True) # トランザクション処理の開始
pipeline.zremrangebyscore(semname, '-inf', now - timeout) # タイムアウト済みのIDを消去
pipeline.zadd(semname, identifier, now) # 判定セットに登録
pipeline.zrank(semname, identifier) # 判定セットでの順位を取得
if pipeline.execute()[-1] < limit:
return identifier
# 占有失敗
conn.zrem(semname, identifier) # 判定セットから削除
return None
# 解放
def release_fair_semaphore(conn, semname, identifier):
return conn.zrem(semname, identifier) # 判定セットから削除
1つのソート済みセットで占有とタイムアウトを両方判定でき、シンプルになった。
複数のタイムアウト時間に対応するアレンジ
先述の例まで、semname セットは、スコアはセマフォ占有時刻である。
占有はソート済みセットの順位で判定し、タイムアウトは一律に古い占有時刻のデータを消すことによって処理していた。
今回のスクリプトはトランザクション内で行われているため、アトミック特性が得られている。
そのためパイプラインの処理中に別の処理が割り込まれない。
semname セットをスコアはタイムアウト時刻にして、占有はソート済みセットの個数で判定することで、複数のタイムアウト時刻に対応できる。
LUAスクリプトで実装しても同様にアトミック特性は得られるので、LUAスクリプトとしてこのアレンジしたものを記載する。
-- KEYS 1=semname 2=identifier 3=timeout 4=limit
-- RETURN 1=acquired 0=failed
local tm = redis.call('TIME')
local now = tonumber(tm[1])
local expired_at = now + KEYS[3]
redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', now) -- タイムアウト処理
redis.call('ZADD', KEYS[1], expired_at, KEYS[2]) -- 判定セットに登録
local card = redis.call('ZCARD', KEYS[1])
if card <= tonumber(KEYS[4]) then -- 判定セットのID数が最大実行数を下回るなら占有とする
return 1
end
-- 占有失敗
redis.call('ZREM', KEYS[1], KEYS[2]) -- 判定セットから削除
return 0
ミドルウェアとしてLaravelへ導入
セマフォを RedisSemaphore としてLaravelへ導入したとすると、セマフォを使ってジョブの同時実行数を制御するミドルウェア WithSemaphore はこのように書ける。
interface RedisSemaphore {
// 占有できたかどうか
public function acquire(string $identifier): bool
// 解放
public function release(string $identifier): bool
}
class WithSemaphore
{
protected RedisSemaphore $semaphore; // 作成したセマフォ
protected string $jobId; // ジョブごとに固有のID
protected \DateTimeInterface|int|null $releaseAfter = 0; // 占有失敗時にやり直す時間
/**
* @param mixed $job
* @param callable $next
*/
public function handle($job, $next)
{
if ($this->semaphore->acquire($this->jobId)) {
try {
$next($job);
} finally {
$this->semaphore->release($this->jobId);
}
} elseif (! is_null($this->releaseAfter)) {
$job->release($this->releaseAfter);
}
}
}
まとめ
Laravelに組み込みの機能を使ったジョブの流量の制御や排他の制御の方法を取り上げ、
それらで行えない同時実行数の制御をセマフォの導入によって実現した。
特定のジョブ群によるWorkerプロセスの独占の防止として強力な道具となるだろう。
Discussion