📃

LaravelのJobの進行状況をJob Middlewareを使って簡単に表示する

2024/12/01に公開

今年もアドベントカレンダーの季節がやってきました。
今回はLaravelのQueueにおける、非同期処理をする際に便利なJob Middlewareについての記事です。

Job Middlewareとは

Jobの実行にあたって、

  • Jobの実行を秒間xx回までにスロットルしたい
  • Jobを直列実行したい
  • 特定の条件下でJobの実行をスキップしたい

等々、Jobのメインの処理ではなく、Jobを実行制御等を実装する場合があります。
これらはJobごとで同じコードを書くことになり、コードの重複が生まれる他、Jobが本当に処理したいコードと周辺のコードが混ざることで可読性が低下してしまいます。

そこで、Jobの処理とJobの周辺のコードを分離できるのがJob Middlewareです。

例)Jobの並列実行を防止するWithoutOverlapping

例として、Laravelが提供しているWithoutOverlappingを見てみます。
WithoutOverlappingはジョブのオーバーラップの防止を実現します。


例えば、ユーザーごとに直列で実行したいJobがあるとします。
その場合は、下記のようにJobのmiddlewareメソッドにWithoutOverlappingをキーとともに定義するだけです。

public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

これによって、Jobのメインの処理とJobの実行制御を効果的に分離&再利用ができ、複雑なJobの実行制御を簡単に実現できます。

では、WithoutOverlappingの実装を見てみましょう。

WithoutOverlapping.php
// 中略
public function handle($job, $next)
{
    // 1. 任意のキーを使ってロックの取得を試行
    $lock = Container::getInstance()->make(Cache::class)->lock(
        $this->getLockKey($job), $this->expiresAfter
    );

    if ($lock->get()) {
        try {
            // 2-1. 1でロックが取得できたらメイン処理を実行
            $next($job);
        } finally {
            // 2-2. 処理終了後にロックを開放
            $lock->release();
        }
    } elseif (! is_null($this->releaseAfter)) {
        // 3. 1でロックが取得できなければ、Jobをキューに戻す
        $job->release($this->releaseAfter);
    }
}
// 中略

WithoutOverlapping.php

上記のようにJobのメイン処理の前後に実行制御のロジックが書かれているだけです。

Job Middlewareをカスタマイズする

少し慣れるためにWithoutOverlappingをカスタマイズしてみます。
本来直列実行される上記のJobですが、例えばJobの実行時間が想定よりも長くなってしまい、ロックのTTLが切れてしまったらどうしましょう?
この場合、予期せぬ処理を実行される可能性があるため、ログを出して気づきたいですね。

そこで下記のようにWithoutOverlappingextendsしてhandleメソッドをオーバーライドします。

CustomWithoutOverlapping.php
  <?php
  
  namespace App\Jobs\Middleware;
  
  use Illuminate\Container\Container;
  use Illuminate\Contracts\Cache\Repository as Cache;
  use Illuminate\Queue\Middleware\WithoutOverlapping as LaravelWithoutOverlapping;
  use Illuminate\Support\Facades\Log;
  
  class CustomWithoutOverlapping extends LaravelWithoutOverlapping
  {
      public function handle($job, $next)
      {
          $lock = Container::getInstance()->make(Cache::class)->lock(
              $this->getLockKey($job), $this->expiresAfter
          );
  
          if ($lock->get()) {
              try {
                  $next($job);
              } finally {
                  // オリジナルからの変更箇所
-                 $lock->release();
+                 $isReleased = $lock->release();
+                 if (!$isReleased) {
+                     // Job自体が成功していても、意図せずロックが外れてしまっている可能性があるのでログを出す。
+                     Log::alert('ロックが外れていました。意図しない処理が発生していないか確認してください。');
+                 }
              }
          } elseif (! is_null($this->releaseAfter)) {
              $job->release($this->releaseAfter);
          }
      }
  }

こうすることで、Jobを実装する人がWithoutOverlappingを使えば、ロックのTTL超過等を検知でき、運用側としても助かります。

Middlewareを自作してみる

Laravelが提供してくれているMiddleware以外に自分でもMiddlewareを作ってみましょう。
時間のかかる処理を非同期処理にする関係上、ユーザー側に進行状況を表示したいことはよくあると思います。
そこで、今回はユーザーの画面上に非同期処理の進行状況を表示する場合を考えてみます。

Jobの進行状況の表示方法を検討する

さて、ではJobの進行状況をユーザーに通知する方法を検討します。
Jobの進行状況は下記のように考えられるので、

Jobの進行状況(%) 
= (処理済み) / (総数) * 100
= (総数) - (処置待ち + 処理中) / (総数) * 100

必要な数としては下記の3つです。

  • 処理待ちのJob数
  • 処理中のJob数
  • Jobの総数

これらをRedis上で管理することを考えます。
用意するキー空間としては下記の3つです。

  • 処理待ちのJob数→処理待ちSetを用意してJobのIDを格納
  • 処理中のJob数→処理中Setを用意してJobのIDを格納
  • Jobの総数→Stringに格納

上記のキー空間に、下記のタイミングでデータを格納/削除します

  1. Jobのdispatchに、
    1-1. JobのIDを処理待ちSetに格納
    1-2. Jobの総数をStringに格納
  2. Jobの処理開始時に、自身のJobのIDを処理待ちSet→処理中Setに移動
  3. Jobの処理終了後に、
    3-1. 自身のJobのIDを処理中Setから削除
    3-2. 処理待ちSetと処理中Set内に要素が残ってなければ、処理を完了したとみなしJobの総数を破棄

MiddlewareはJobのメイン処理の実行前後で処理されるため、2と3はMiddlewareに記述できそうです。
1はJobをdispatchする側のコードとJobのコンストラクタに記述できそうですね。

実装

実際にコードに起こしてみました。
https://github.com/paix26875/laravel_job

Jobの進行状況を表示するために追加する箇所

Job側にはInterfaceとその実装のTraitmiddleware__consturctを定義します。

ExampleJob.php
- class ExampleJob implements ShouldQueue
+ class ExampleJob implements ShouldQueue, Contracts\ShowProgressInterface
  {
      use Queueable;
+     use Traits\ShowProgressTrait;
  
+     private string $jobProgressId;
  
+     public function middleware(): array
+     {
+         return [new Middleware\ShowProgressMiddleware()];
+     }
+ 
+     public function __construct()
+     {
+         $this->jobProgressId = (string) Str::ulid();
+         $this->markAsWaiting(); // 1-1. JobのIDを処理待ちSetに格納
+     }
  }

Jobのdispatch時にJobの総数をインクリメントします。

QueuingController.php
+ ExampleJob::dispatch();
+ Redis::command('INCR', ['total']); // 1-2. Jobの総数をStringに格納(インクリメント)

進行状況を確認するには、それぞれの値を取得します。

$waitingJobCount = Redis::command('SCARD', ['waiting']); // 処理待ちのJob数
$wipJobCount = Redis::command('SCARD', ['wip']); // 処理中のJob数
$totalJobCount = Redis::command('GET', ['total']); // Jobの総数

($totalJobCount - ($waitingJobCount + $wipJobCount)) / $totalJobCount * 100; // Jobの進行状況(%)

これで任意のJobの進行状況を確認できるようになります。

残りの処理

上記のコードでは2〜4の処理が出てきませんが、それらはMiddlewareで実行します。

  1. Jobの処理開始時に、自身のJobのIDを処理待ちSet→処理中Setに移動
  2. Jobの処理終了後に、
    3-1. 自身のJobのIDを処理中Setから削除
    3-2. 処理待ちSetと処理中Set内に要素が残ってなければ、処理を完了したとみなしJobの総数を破棄
ShowProgressMiddleware.php
public function handle(ShowProgressInterface $job, Closure $next): void
{
    $job->markAsWip(); // 2. 自身のJobのIDを処理待ちSet→処理中Setに移動
    try {
        $next($job);
    } finally {
        $job->markAsFinished(); // 3. Jobの処理終了後の処理
    }
}

Interfaceと具体の処理を記述したTraitは下記になります。

ShowProgressInterface.php
interface ShowProgressInterface
{
    public function markAsWaiting();
    public function markAsWip();
    public function markAsFinished();
}
ShowProgressTraits.php
trait ShowProgressTraits
{
    public function markAsWaiting(): void
    {
        // 1-1. JobのIDを処理待ちSetに格納
        Redis::command('SADD', ['waiting', $this->jobProgressId]);
    }

    public function markAsWip(): void
    {
        // 2. 自身のJobのIDを処理待ちSet→処理中Setに移動
        Redis::command('SMOVE', ['waiting', 'wip', $this->jobProgressId]);
    }

    public function markAsFinished(): void
    {
        // 3-1. 自身のJobのIDを処理中Setから削除
        Redis::command('SREM', ['wip', $this->jobProgressId]);
        
        // 3-2. 処理待ちSetと処理中Set内に要素が残ってなければ、処理を完了したとみなしJobの総数を破棄
        if (Redis::command('SCARD', ['wip']) === 0 && Redis::command('SCARD', ['waiting']) === 0) {
            Redis::command('DEL', ['total']);
        }
    }
}

他にもRedisのキー空間をJobやユーザーごとに切ることで、細かな制御ができそうです。

Middlewareを通らずにJobが終了するパターンを検討する

注意点としては、Jobがfailするときのハンドリングです。

Jobがfailした場合、Middlewareを通らずにJobが失敗したものとみなされます。
処理待ちSetにJobのIDが残った状態でJobが破棄されるため、ユーザーの画面からはJobがずっと処理中に見えます。

この場合のハンドリングはfailedメソッドに処理を書きましょう。

  1. 処理待ちSetからJobのIDを削除
  2. ユーザー側に何かしら通知
  3. エラーログを出す

などの後処理を記述してあげるとよさそうですね。

ShowProgressTraits.php
+ public function failed(?Throwable $exception): void
+ {
+     $this->markAsFinished();
+     Log::debug(get_class($this) . ': 処理が失敗しました。');
+ }

さいごに

書いた後に気づきましたが、Middlewareの自作の件はBatchを使うだけで実現できましたね…。

// バッチの完了率(0-100)
$batch->progress();

(まあMiddlewareなら手軽にオプトインできるということで…。)

LaravelのQueuesのドキュメントを見ると 4.211.x で分量が全く異なります。それだけLaravelのQueuesが進化したということでしょう。非同期処理は考えることが本当に多く自前で実装しようとするとかなり大変になりますが、Laravelがいい感じにサポートしてくれていますね。

ソーシャルデータバンク テックブログ

Discussion