😅

Laravel ジョブバッチからデータを取得

2021/11/17に公開

少しレアなケース

ジョブからデータをリターンしたい!と思ったりしたことがありませんか?

通常では、ジョブからデータをリターンすることができませんが、一応同期処理にする(dispatchSync)と可能となります。

// コントローラーとかで

$job = new SendEmail();
$this->dispatchSync($job);
$result = $job->getResult(); // getResultメソッドをジョブクラスに実装

ただ、ほとんどの場合、ジョブを使うのは、時間のかかる操作を非同期で処理することが目的です。

そのため、もし非同期で処理したいが、処理結果をなんとか取得したい場合、まずは、これがタスクキューを使う場面ではないのではないかと考えた方が良いでしょう。例えば、何かのAPIリクエストを並行に操作して結果を取得したい場合、\GuzzleHttp\Promise\allとかに変えた方が正解です。

$client = new Client();
$promises = [];
foreach ($options as $option) {
    $promises[] = $client->requestAsync($method, $url, $option);
}
$responses = \GuzzleHttp\Promise\all($promises)->wait();

今回はそのあまり通常のケースではない、ジョブを並行に処理して結果を取得する、タスクキューを使うやり方を紹介します。要注意するのは、並行に処理して処理時間を短縮するのが目的であれば、複数のワーカーが必須となります。今回の例は、複数のワーカーを前提にしています。

ジョブのバッチを作る

Laravel 8からはbatchのAPIが加えて、複数のジョブを一回のバッチにまとめることが可能となりました。まずはバッチのテーブルを作ります。

php artisan queue:batches-table
php artisan migrate

そして、コントローラとかで、ジョブの配列から、バッチを作ります。

$jobs = [new taskA($user->id), new taskB($user->id)];

$batch = Bus::batch($jobs)->name('test batch')->allowFailures()
    ->then(function (Batch $batch) {
        Log::info("Batch {$batch->id} has completed");
    })
    ->catch(function (Batch $batch) {
        $failedJobs = json_encode($batch->failedJobIds);
        Log::error("Batch {$batch->id} failed on job {$failedJobs}");
    })
    ->dispatch();

ジョブをバッチに追加できるようにするには、batchableのトレイトが必要となります。

 use Illuminate\Bus\Queueable;
 use Illuminate\Bus\Batchable;
 use Illuminate\Contracts\Queue\ShouldBeUnique;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
 
 class TaskA implements ShouldQueue
 {
     use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
     private $user_id;
     private $result;

     public function __construct(int $user_id)
     {
         $this->user_id = $user_id;
         $this->result = [];
     }

     public function handle()
     {
         if ($this->batch()->cancelled()) {
             // バッチがキャンセルするとジョブの処理もされません
             return;
         }
     }
 }

一つのバッチの中で、どれかのジョブが失敗すると、残りのジョブがキャンセルとなってしまいます。そのため、allowFailuresをチェインすることで、失敗しても他のジョブをキャンセルしないように設定します。最後にdispatchすると、Batchのインスタンスがリターンされます。

ジョブの処理結果をDBに記録

直接ジョブからリターンすることが無理なので、迂回策として、一旦DBに処理の結果を記録し、全てのジョブが終わればまたDBから取得するようにします。

php artisan make:model JobResult -m
 use Illuminate\Database\Migrations\Migration;
 use Illuminate\Database\Schema\Blueprint;
 use Illuminate\Support\Facades\Schema;
 
 class CreateJobResultsTable extends Migration
 {
     public function up()
     {
         Schema::create('job_results', function (Blueprint $table) {
             $table->id();
             $table->string('job_id', 20);
             $table->string('name', 30);
             $table->boolean('status')->default(false);
             $table->string('result')->default('');
             $table->string('batch_id', 50)->nullable();
             $table->timestamps();
         });
     }

     public function down()
     {
         Schema::dropIfExists('job_results');
     }
 }

モデルファイルは特に変更はありませんが、$protectedまたは$guardedを設定しておきましょう。

 class JobResult extends Model
 {
     use HasFactory;
 
     protected $guarded = ['id'];
 }

ジョブに共通するDBへレコードを更新するメソッドをトレイトとして作ります:

 namespace App\Jobs\Traits;
 
 use App\Models\JobResult;
 use Exception;
 
 trait Trackable
 {
     private function writeResult($result, $status)
     {
         try {
             JobResult::updateOrCreate(
                 [
                     'batch_id' => $this->batchId,
                     'job_id' => $this->job->getJobId(),
                     'name' => __CLASS__,
                 ],
                 [
                     'status' => $status,
                     'result' => json_encode($result), //配列と想定
                 ]
             );
         } catch (Exception $e) {
             //Log::error($e->getMessage());
         }
     }
 }

ジョブクラスに、トレイトとメソッドを追加。

 class TaskA implements ShouldQueue
 {
     use Trackable, Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
     // ...
     public function handle()
     {
         $this->writeResult($this->result, false);
         if ($this->batch()->cancelled()) {
             // バッチがキャンセルするとジョブの処理もされません
             return;
         }
	 // 実際の処理
	 $this->result = [
             //...
         ];
	 $status = !empty($this->result);
         $this->writeResult($this->result, $status);
     }
 }

全てのジョブが終わるまで待つ

これまではジョブの実行結果をDBへ記録することができました。
ただ、残りの問題として、

  • ジョブが終わるまで待つにはどうすれば良いか
  • DBからの取得は何を目印にするか
    があります。

バッチは元々このために作られているわけではないので、await/waitとかのメソッドがありませんが、一応全てのジョブが終わるまで待つことが可能です。

while (($batch = $batch->fresh()) && !$batch->finished()) {
    sleep(0.1);
}

バッチを更新しながら進行状況をチェックしていきますので、事実上Promiseのawait/waitと近い効果になります。

次に、DBからの取得ですが、先ほどのマイグレーションファイルに、batch_idとのフィールドをつけていますが、そのためとなります。

// $this->getJobResults($batch->id)で取得
private function getJobResults($batch_id)
{
    $results = DB::table('job_results')->where('batch_id', $batch_id)->pluck('result');
    $res = [];
    if ($results->isNotEmpty()) {
        try {
            foreach ($results as $result) {
                $res += json_decode($result, true);
            }
        } catch (Exception $e) {
            // Log::error($e->getMessage());
        }
    }
    return $res;
}

まとめ

本来タスクキューは時間のかかる作業を非同期で「バックグランド」で処理するものです。同期処理のように結果をリターンすることが元々できません。その結果を反映するには、websocketを使って通知を出すなどの方法が一般的です。今回はあえて、ジョブを並行に処理し、dbから取得する例を挙げました。無理ではありませんが、やはりデメリットも色々とあるので、もしもこの方法を採用しようとする時があれば、よく検討した方が良いでしょう。

使うことを勧めないのになんのために書いたかというと、自分がやってしまったからメモとして残したいです(笑)。

GitHubで編集を提案

Discussion