🔛

Deep Dive Laravel Queue

に公開

概要

Laravelでは キュー 機能を使うことで、Jobという処理単位を非同期処理できる。
Laravelフレームワークをコードリーディングする上で理解しづらい箇所を解説する。

著者の主な実績

  • 自称Laravelバリスタ
  • Laravel歴9年
  • Laravel5.1→8.x、8.x->10.x、10.x->12.xへのアップグレード主導
  • laravel/framework へPR採用

ジョブっぽいクラスたち

Illuminate\Queue\Jobs\Job

RedisJobDatabaseJobなど各キュードライバに対応したジョブクラスが用意されおり、
ジョブの取消などの操作を必要に応じてキュードライバに伝播する実装がされています。

この記事では「パケットクラス」と記載します。

namespace Illuminate\Queue\Jobs;

// コマンドクラスの例
class RedisJob extends Job implements JobContract {
}

make:job で作られるもの

例えば、ProcessPodcastのように具体的な非同期で行う処理内容を記述するクラスです。

ドキュメントで「ジョブクラス」と記載されていますが、Illuminate\Queue\Jobs\Jobを継承しておらず、JobContract(Illuminate\Contracts\Queue\Job)も実装していません。
クラスの構造 で説明されている通り、ShouldQueueを実装したクラスとなります。

この記事では「コマンドクラス」と記載します。

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

// コマンドクラスの例
class HelloJob implements ShouldQueue
{
    use Queueable;

    public function __construct(protected string $message){}

    public function handle(): void
    {
        // 非同期処理で実行したい内容
        \Log::info($this->message);
    }
}

CallQueuedHandler

コマンドクラスをdispatchすると、キューにはコマンドクラスがCallQueuedHandler@callにラップされて登録されます。

dispatch(new HelloJob("hoge"));
namespace Illuminate\Queue;

abstract class Queue {
    /**
     * @param  array  $job これはコマンドクラス
     */
    protected function createObjectPayload($job, $queue)
    {
        return $this->withCreatePayloadHooks($queue, [
            'uuid' => (string) Str::uuid(),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'timeout' => $job->timeout ?? null,
            'data' => [
                'commandName' => $job,
                'command' => "handle",
            ],
        ]);
    }
}

パケットクラスはgetRawBody()で文字列として取得できる形で上記ペイロードのデータを持っています。
理解の難しい点は、パケットクラスの実装でこのペイロードをjobという名前で持ちがちなことです。

namespace Illuminate\Queue\Jobs;

class RedisJob extends Job implements JobContract
{
    /**
     * これはペイロード
     * @var string
     */
    protected $job;

    /**
     * これはCallQueuedHandler
     */
    protected $instance;

    public function getRawBody()
    {
        return $this->job;
    }

    public function fire()
    {
        $payload = json_decode($this->getRawBody(), true);
        [$class, $method] = JobName::parse($payload['job']);
        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }
}

RedisJob::$jobの実態は、「コマンドクラスをラップしたCallQueuedHandlerのペイロード表現」です。
ペイロードを解決したCallQueuedHandlerインスタンスは$instanceプロパティに入ります。
コマンドクラスはCallQueuedHandlerの実行時(call)のみ展開してインスタンスになるため、
基本、パケットクラスはコマンドクラスをインスタンスとして持っていないといえます。

Queueable

ShouldQueue自体は単に空のインターフェースであり、Dispatcherがキュー送りにするか即時実行するかを判断するフラグになっています。

interface ShouldQueue
{
    // からっぽ
}
namespace Illuminate\Bus;

class Dispatcher
{
    // dispatchヘルパーメソッドの裏
    public function dispatch($command)
    {
        return $command instanceof ShouldQueue
            ? $this->dispatchToQueue($command)
            : $this->dispatchNow($command);
    }
}

ただし、dispatchでコマンドクラスを処理するためにはQueueableトレイト相当のインタフェースが必要です

namespace Illuminate\Foundation\Bus;

// dispatchヘルパーメソッドの裏
class PendingDispatch
{
    /**
     * @var mixed コマンドクラス
     */
    protected $job;

    public function onConnection($connection)
    {
        // jobはmixedだが、Queueableトレイト相当を組み込んでいる前提で書かれている
        $this->job->onConnection($connection);
        return $this;
    }
}

LaravelフレームワークのShouldQueueをimplementしているクラス(例: BroadcastEvent, ChainedBatchなど)はすべてQueueableを実装しています。
そこからも、ShouldQueueは空のインターフェースですが、実質Queueableトレイト相当の実装が必要であると理解するべきでしょう。

キューの処理の流れ

参考: Readoubleにおけるコマンドの説明

queue:work

  1. ループ継続判定
    • シグナルを受け取っていたり、メモリ超過していた場合はWorkerStoppingイベントを終了
  2. スコープのリセット
    • Provider経由でWorker(queue.worker)に設定されたスコープリセット処理を行う
    • デフォルトのProviderでは、ログのコンテキストとDBの設定リセット、ファサードのリセットが行われる。
  3. キューからパケットクラスを取得
  4. パケットクラスからCallQueuedHandlerを取り出し実行
  5. CallQueuedHandler経由でコマンドクラスを実行
  6. sleepが設定されている場合はsleep

注目するべきは、\Queue::beforeなどのキューのイベントはすべてパケットクラスに対して行われる

queue:listen

Readoubleの説明通り、こちらはデバッグ用のコマンドになります。
処理としてはqueue:work --onceをサブプロセスで無限に起動しつづけるものになるため、
コードの変更が逐次反映されますが、アプリケーション起動のオーバーヘッドが大きくなります。

namespace Illuminate\Queue;

class Listener
{
    public function listen($connection, $queue, ListenerOptions $options)
    {
        $process = new Process(["php", "artisan", "queue:work", "--once"]);

        while (true) {            
            $process->run();
            sleep($options->rest);
        }
    }
}

ジョブチェーン

参考: Readoubleにおけるジョブチェーンの説明

\Bus::chain([
        new HelloJob("foo"),
        new HelloJob("bar"),
        new HelloJob("baz"),
])->dispatch();

の実行を考えます。

PendingChain

\Bus::chainによりジョブチェーン処理用のクラスPendingChainに経由で、
1つめのコマンドジョブのchainとして残りのコマンドジョブが付属した形でキューに渡されます。

namespace Illuminate\Foundation\Bus;

// \Bus::chainの裏側
class Dispatcher{
    /**
     * @param array $jobs コマンドクラスの配列
     */
    public function chain($jobs = null)
    {
        $jobs = Collection::wrap($jobs);
        return new PendingChain($jobs->shift(), $jobs->toArray());
    }
}

class PendingChain{
    // 上記でshiftした1つめのコマンドクラス
    public $job;
    // 残りのコマンドクラス
    public $class;
    public function __construct($job, $chain)
    {
        $this->job = $job;
        $this->chain = $chain;
    }
    public function dispatch()
    {
        $firstJob = $this->job;
        $firstJob->chain($this->chain);
        return app(Dispatcher::class)->dispatch($firstJob);
    }
}

コマンドクラスのchainの実装ですが、しれっとQueuableトレイトに同梱されています

namespace Illuminate\Bus;

trait Queueable
{
    public $chained = [];

    /**
     * @param  array  $chain コマンドクラスの配列
     */
    public function chain($chain)
    {
        $this->chained = $chain;
        return $this;
    }
    /**
     * チェーンの1つめを取り出して残りのコマンドクラスをチェーンさせてdispatchする
     */
    public function dispatchNextJobInChain()
    {
        dispatch(tap(unserialize(array_shift($this->chained)), function ($next) {
            $next->chained = $this->chained;
        }));
    }
}

コマンドクラスがCallQueuedHandler経由で実行されるときに、
コマンドクラスの処理後にchainedから次のコマンドクラスを取り出し連鎖させていきます。

namespace Illuminate\Queue;

class CallQueuedHandler{
    /**
     * @param Job $job このJobはパケットクラス
     */
    public function call(Job $job, array $data)
    {
        // コマンドクラスの解決
        $command = $this->setJobInstanceIfNecessary(
            $job, $this->getCommand($data)
        );
        // コマンドクラスの実行
        $command->handle();
    
        if (! $job->hasFailed() && ! $job->isReleased()) {
            // 次のジョブへ連鎖
            $command->dispatchNextJobInChain();
            // ???の処理
        }
    }
}

バッチ

参考: Readoubleにおけるジョブバッチの説明

上記説明のとおり、バッチにはRDBかDynamoDBでジョブバッチを保存するバックエンドが必要になります。
また、コマンドクラスにBatchableを組み込む必要があります。

\Bus::batch([
    new HelloJob("piyopiyo_1"),
    new HelloJob("piyopiyo_2"),
    new HelloJob("piyopiyo_3"),
])->dispatch();

Batchable

コマンドクラスへバッチの情報を持たせます

namespace Illuminate\Bus;

trait Batchable
{
    /**
     * The batch ID (if applicable).
     * @var string
     */
    public $batchId;

    /**
     * @return \Illuminate\Bus\Batch|null
     */
    public function batch()
}

Batch

\Bus::batchによりPendingBatchに経由でBatchクラスが作成され、Batchableコマンドクラスが登録されます。
Batchクラスは上記バックエンドをストアとしたバッチのレコードです。

namespace Illuminate\Bus;

class Batch implements Arrayable, JsonSerializable {
    /**
     * @param array $jobs Batchableコマンドクラスの配列
     */
    public function add($jobs)
    {
        $count = 0;

        // コマンドクラスへバッチ情報登録 
        $jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
            // Batchable->withBatchId
            $job->withBatchId($this->id);
            return $job;
        });

        $this->repository->transaction(function () use ($jobs, $count) {
            // バッチのジョブ数増加
            $this->repository->incrementTotalJobs($this->id, $count);
            // コマンドクラスをキューへ送信
            $this->queue->bulk($jobs->all());
        });
    }
}

ジョブチェーンと同様に、コマンドクラスがCallQueuedHandler経由で実行されるときに、
コマンドクラスの処理後にBatchへのインクリメントや完了・失敗処理が行われます。

// コマンドクラスの実行後
if (! $job->hasFailed() && ! $job->isReleased()) {
    // 次のジョブへ連鎖
    $command->dispatchNextJobInChain();
    // バッチの処理
    if (in_array(Batchable::class, class_uses_recursive($command))) {
        if ($batch = $command->batch()) {
            $batch->recordSuccessfulJob($command->job->uuid());
        }
    }
}

まとめ

Laravelのキュー機能は昔(例: 5.1)と比べて非常に高機能になっていますが、
そのどれもが非同期処理システムの運用上で必要になりやすいものだと感じます。
Laravelへの理解を深めて、堂々と運用できるように精進しましょう。

ソーシャルデータバンク テックブログ

Discussion