Deep Dive Laravel Queue
概要
Laravelでは キュー 機能を使うことで、Jobという処理単位を非同期処理できる。
Laravelフレームワークをコードリーディングする上で理解しづらい箇所を解説する。
著者の主な実績
- 自称Laravelバリスタ
- Laravel歴9年
- Laravel5.1→8.x、8.x->10.x、10.x->12.xへのアップグレード主導
- laravel/framework へPR採用
ジョブっぽいクラスたち
Illuminate\Queue\Jobs\Job
RedisJob
やDatabaseJob
など各キュードライバに対応したジョブクラスが用意されおり、
ジョブの取消などの操作を必要に応じてキュードライバに伝播する実装がされています。
この記事では「パケットクラス」と記載します。
namespace Illuminate\Queue\Jobs;
// コマンドクラスの例
class RedisJob extends Job implements JobContract {
}
make:job
で作られるもの
例えば、ProcessPodcast
のように具体的な非同期で行う処理内容を記述するクラスです。
ドキュメントで「ジョブクラス」と記載されていますが、Illuminate\Queue\Jobs\Job
を継承しておらず、JobContract(Illuminate\Contracts\Queue\Job)
も実装していません。
クラスの構造 で説明されている通り、ShouldQueue
を実装したクラスとなります。
この記事では「コマンドクラス」と記載します。
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
// コマンドクラスの例
class HelloJob implements ShouldQueue
{
use Queueable;
public function __construct(protected string $message){}
public function handle(): void
{
// 非同期処理で実行したい内容
\Log::info($this->message);
}
}
CallQueuedHandler
コマンドクラスをdispatch
すると、キューにはコマンドクラスがCallQueuedHandler@call
にラップされて登録されます。
dispatch(new HelloJob("hoge"));
namespace Illuminate\Queue;
abstract class Queue {
/**
* @param array $job これはコマンドクラス
*/
protected function createObjectPayload($job, $queue)
{
return $this->withCreatePayloadHooks($queue, [
'uuid' => (string) Str::uuid(),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'timeout' => $job->timeout ?? null,
'data' => [
'commandName' => $job,
'command' => "handle",
],
]);
}
}
パケットクラスはgetRawBody()
で文字列として取得できる形で上記ペイロードのデータを持っています。
理解の難しい点は、パケットクラスの実装でこのペイロードをjobという名前で持ちがちなことです。
namespace Illuminate\Queue\Jobs;
class RedisJob extends Job implements JobContract
{
/**
* これはペイロード
* @var string
*/
protected $job;
/**
* これはCallQueuedHandler
*/
protected $instance;
public function getRawBody()
{
return $this->job;
}
public function fire()
{
$payload = json_decode($this->getRawBody(), true);
[$class, $method] = JobName::parse($payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
}
RedisJob::$job
の実態は、「コマンドクラスをラップしたCallQueuedHandler
のペイロード表現」です。
ペイロードを解決したCallQueuedHandler
インスタンスは$instance
プロパティに入ります。
コマンドクラスはCallQueuedHandler
の実行時(call
)のみ展開してインスタンスになるため、
基本、パケットクラスはコマンドクラスをインスタンスとして持っていないといえます。
Queueable
ShouldQueue
自体は単に空のインターフェースであり、Dispatcher
がキュー送りにするか即時実行するかを判断するフラグになっています。
interface ShouldQueue
{
// からっぽ
}
namespace Illuminate\Bus;
class Dispatcher
{
// dispatchヘルパーメソッドの裏
public function dispatch($command)
{
return $command instanceof ShouldQueue
? $this->dispatchToQueue($command)
: $this->dispatchNow($command);
}
}
ただし、dispatch
でコマンドクラスを処理するためにはQueueable
トレイト相当のインタフェースが必要です
namespace Illuminate\Foundation\Bus;
// dispatchヘルパーメソッドの裏
class PendingDispatch
{
/**
* @var mixed コマンドクラス
*/
protected $job;
public function onConnection($connection)
{
// jobはmixedだが、Queueableトレイト相当を組み込んでいる前提で書かれている
$this->job->onConnection($connection);
return $this;
}
}
LaravelフレームワークのShouldQueue
をimplementしているクラス(例: BroadcastEvent
, ChainedBatch
など)はすべてQueueable
を実装しています。
そこからも、ShouldQueue
は空のインターフェースですが、実質Queueable
トレイト相当の実装が必要であると理解するべきでしょう。
キューの処理の流れ
queue:work
- ループ継続判定
- シグナルを受け取っていたり、メモリ超過していた場合は
WorkerStopping
イベントを終了
- シグナルを受け取っていたり、メモリ超過していた場合は
- スコープのリセット
- Provider経由で
Worker(queue.worker)
に設定されたスコープリセット処理を行う - デフォルトのProviderでは、ログのコンテキストとDBの設定リセット、ファサードのリセットが行われる。
- Provider経由で
- キューからパケットクラスを取得
- パケットクラスから
CallQueuedHandler
を取り出し実行 -
CallQueuedHandler
経由でコマンドクラスを実行 - sleepが設定されている場合はsleep
注目するべきは、\Queue::before
などのキューのイベントはすべてパケットクラスに対して行われる
queue:listen
Readoubleの説明通り、こちらはデバッグ用のコマンドになります。
処理としてはqueue:work --once
をサブプロセスで無限に起動しつづけるものになるため、
コードの変更が逐次反映されますが、アプリケーション起動のオーバーヘッドが大きくなります。
namespace Illuminate\Queue;
class Listener
{
public function listen($connection, $queue, ListenerOptions $options)
{
$process = new Process(["php", "artisan", "queue:work", "--once"]);
while (true) {
$process->run();
sleep($options->rest);
}
}
}
ジョブチェーン
\Bus::chain([
new HelloJob("foo"),
new HelloJob("bar"),
new HelloJob("baz"),
])->dispatch();
の実行を考えます。
PendingChain
\Bus::chain
によりジョブチェーン処理用のクラスPendingChain
に経由で、
1つめのコマンドジョブのchain
として残りのコマンドジョブが付属した形でキューに渡されます。
namespace Illuminate\Foundation\Bus;
// \Bus::chainの裏側
class Dispatcher{
/**
* @param array $jobs コマンドクラスの配列
*/
public function chain($jobs = null)
{
$jobs = Collection::wrap($jobs);
return new PendingChain($jobs->shift(), $jobs->toArray());
}
}
class PendingChain{
// 上記でshiftした1つめのコマンドクラス
public $job;
// 残りのコマンドクラス
public $class;
public function __construct($job, $chain)
{
$this->job = $job;
$this->chain = $chain;
}
public function dispatch()
{
$firstJob = $this->job;
$firstJob->chain($this->chain);
return app(Dispatcher::class)->dispatch($firstJob);
}
}
コマンドクラスのchain
の実装ですが、しれっとQueuable
トレイトに同梱されています
namespace Illuminate\Bus;
trait Queueable
{
public $chained = [];
/**
* @param array $chain コマンドクラスの配列
*/
public function chain($chain)
{
$this->chained = $chain;
return $this;
}
/**
* チェーンの1つめを取り出して残りのコマンドクラスをチェーンさせてdispatchする
*/
public function dispatchNextJobInChain()
{
dispatch(tap(unserialize(array_shift($this->chained)), function ($next) {
$next->chained = $this->chained;
}));
}
}
コマンドクラスがCallQueuedHandler
経由で実行されるときに、
コマンドクラスの処理後にchained
から次のコマンドクラスを取り出し連鎖させていきます。
namespace Illuminate\Queue;
class CallQueuedHandler{
/**
* @param Job $job このJobはパケットクラス
*/
public function call(Job $job, array $data)
{
// コマンドクラスの解決
$command = $this->setJobInstanceIfNecessary(
$job, $this->getCommand($data)
);
// コマンドクラスの実行
$command->handle();
if (! $job->hasFailed() && ! $job->isReleased()) {
// 次のジョブへ連鎖
$command->dispatchNextJobInChain();
// ???の処理
}
}
}
バッチ
上記説明のとおり、バッチにはRDBかDynamoDBでジョブバッチを保存するバックエンドが必要になります。
また、コマンドクラスにBatchable
を組み込む必要があります。
\Bus::batch([
new HelloJob("piyopiyo_1"),
new HelloJob("piyopiyo_2"),
new HelloJob("piyopiyo_3"),
])->dispatch();
Batchable
コマンドクラスへバッチの情報を持たせます
namespace Illuminate\Bus;
trait Batchable
{
/**
* The batch ID (if applicable).
* @var string
*/
public $batchId;
/**
* @return \Illuminate\Bus\Batch|null
*/
public function batch()
}
Batch
\Bus::batch
によりPendingBatch
に経由でBatch
クラスが作成され、Batchable
コマンドクラスが登録されます。
Batch
クラスは上記バックエンドをストアとしたバッチのレコードです。
namespace Illuminate\Bus;
class Batch implements Arrayable, JsonSerializable {
/**
* @param array $jobs Batchableコマンドクラスの配列
*/
public function add($jobs)
{
$count = 0;
// コマンドクラスへバッチ情報登録
$jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
// Batchable->withBatchId
$job->withBatchId($this->id);
return $job;
});
$this->repository->transaction(function () use ($jobs, $count) {
// バッチのジョブ数増加
$this->repository->incrementTotalJobs($this->id, $count);
// コマンドクラスをキューへ送信
$this->queue->bulk($jobs->all());
});
}
}
ジョブチェーンと同様に、コマンドクラスがCallQueuedHandler
経由で実行されるときに、
コマンドクラスの処理後にBatch
へのインクリメントや完了・失敗処理が行われます。
// コマンドクラスの実行後
if (! $job->hasFailed() && ! $job->isReleased()) {
// 次のジョブへ連鎖
$command->dispatchNextJobInChain();
// バッチの処理
if (in_array(Batchable::class, class_uses_recursive($command))) {
if ($batch = $command->batch()) {
$batch->recordSuccessfulJob($command->job->uuid());
}
}
}
まとめ
Laravelのキュー機能は昔(例: 5.1)と比べて非常に高機能になっていますが、
そのどれもが非同期処理システムの運用上で必要になりやすいものだと感じます。
Laravelへの理解を深めて、堂々と運用できるように精進しましょう。
Discussion