Laravel ジョブバッチからデータを取得
少しレアなケース
ジョブからデータをリターンしたい!と思ったりしたことがありませんか?
通常では、ジョブからデータをリターンすることができませんが、一応同期処理にする(dispatchSync)と可能となります。
// コントローラーとかで
$job = new SendEmail();
$this->dispatchSync($job);
$result = $job->getResult(); // getResultメソッドをジョブクラスに実装
ただ、ほとんどの場合、ジョブを使うのは、時間のかかる操作を非同期で処理することが目的です。
そのため、もし非同期で処理したいが、処理結果をなんとか取得したい場合、まずは、これがタスクキューを使う場面ではないのではないかと考えた方が良いでしょう。例えば、何かのAPIリクエストを並行に操作して結果を取得したい場合、\GuzzleHttp\Promise\all
とかに変えた方が正解です。
$client = new Client();
$promises = [];
foreach ($options as $option) {
$promises[] = $client->requestAsync($method, $url, $option);
}
$responses = \GuzzleHttp\Promise\all($promises)->wait();
今回はそのあまり通常のケースではない、ジョブを並行に処理して結果を取得する、タスクキューを使うやり方を紹介します。要注意するのは、並行に処理して処理時間を短縮するのが目的であれば、複数のワーカーが必須となります。今回の例は、複数のワーカーを前提にしています。
ジョブのバッチを作る
Laravel 8からはbatchのAPIが加えて、複数のジョブを一回のバッチにまとめることが可能となりました。まずはバッチのテーブルを作ります。
php artisan queue:batches-table
php artisan migrate
そして、コントローラとかで、ジョブの配列から、バッチを作ります。
$jobs = [new taskA($user->id), new taskB($user->id)];
$batch = Bus::batch($jobs)->name('test batch')->allowFailures()
->then(function (Batch $batch) {
Log::info("Batch {$batch->id} has completed");
})
->catch(function (Batch $batch) {
$failedJobs = json_encode($batch->failedJobIds);
Log::error("Batch {$batch->id} failed on job {$failedJobs}");
})
->dispatch();
ジョブをバッチに追加できるようにするには、batchable
のトレイトが必要となります。
use Illuminate\Bus\Queueable;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class TaskA implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private $user_id;
private $result;
public function __construct(int $user_id)
{
$this->user_id = $user_id;
$this->result = [];
}
public function handle()
{
if ($this->batch()->cancelled()) {
// バッチがキャンセルするとジョブの処理もされません
return;
}
}
}
一つのバッチの中で、どれかのジョブが失敗すると、残りのジョブがキャンセルとなってしまいます。そのため、allowFailures
をチェインすることで、失敗しても他のジョブをキャンセルしないように設定します。最後にdispatch
すると、Batch
のインスタンスがリターンされます。
ジョブの処理結果をDBに記録
直接ジョブからリターンすることが無理なので、迂回策として、一旦DBに処理の結果を記録し、全てのジョブが終わればまたDBから取得するようにします。
php artisan make:model JobResult -m
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
class CreateJobResultsTable extends Migration
{
public function up()
{
Schema::create('job_results', function (Blueprint $table) {
$table->id();
$table->string('job_id', 20);
$table->string('name', 30);
$table->boolean('status')->default(false);
$table->string('result')->default('');
$table->string('batch_id', 50)->nullable();
$table->timestamps();
});
}
public function down()
{
Schema::dropIfExists('job_results');
}
}
モデルファイルは特に変更はありませんが、$protected
または$guarded
を設定しておきましょう。
class JobResult extends Model
{
use HasFactory;
protected $guarded = ['id'];
}
ジョブに共通するDBへレコードを更新するメソッドをトレイトとして作ります:
namespace App\Jobs\Traits;
use App\Models\JobResult;
use Exception;
trait Trackable
{
private function writeResult($result, $status)
{
try {
JobResult::updateOrCreate(
[
'batch_id' => $this->batchId,
'job_id' => $this->job->getJobId(),
'name' => __CLASS__,
],
[
'status' => $status,
'result' => json_encode($result), //配列と想定
]
);
} catch (Exception $e) {
//Log::error($e->getMessage());
}
}
}
ジョブクラスに、トレイトとメソッドを追加。
class TaskA implements ShouldQueue
{
use Trackable, Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
// ...
public function handle()
{
$this->writeResult($this->result, false);
if ($this->batch()->cancelled()) {
// バッチがキャンセルするとジョブの処理もされません
return;
}
// 実際の処理
$this->result = [
//...
];
$status = !empty($this->result);
$this->writeResult($this->result, $status);
}
}
全てのジョブが終わるまで待つ
これまではジョブの実行結果をDBへ記録することができました。
ただ、残りの問題として、
- ジョブが終わるまで待つにはどうすれば良いか
- DBからの取得は何を目印にするか
があります。
バッチは元々このために作られているわけではないので、await/waitとかのメソッドがありませんが、一応全てのジョブが終わるまで待つことが可能です。
while (($batch = $batch->fresh()) && !$batch->finished()) {
sleep(0.1);
}
バッチを更新しながら進行状況をチェックしていきますので、事実上Promiseのawait/waitと近い効果になります。
次に、DBからの取得ですが、先ほどのマイグレーションファイルに、batch_idとのフィールドをつけていますが、そのためとなります。
// $this->getJobResults($batch->id)で取得
private function getJobResults($batch_id)
{
$results = DB::table('job_results')->where('batch_id', $batch_id)->pluck('result');
$res = [];
if ($results->isNotEmpty()) {
try {
foreach ($results as $result) {
$res += json_decode($result, true);
}
} catch (Exception $e) {
// Log::error($e->getMessage());
}
}
return $res;
}
まとめ
本来タスクキューは時間のかかる作業を非同期で「バックグランド」で処理するものです。同期処理のように結果をリターンすることが元々できません。その結果を反映するには、websocketを使って通知を出すなどの方法が一般的です。今回はあえて、ジョブを並行に処理し、dbから取得する例を挙げました。無理ではありませんが、やはりデメリットも色々とあるので、もしもこの方法を採用しようとする時があれば、よく検討した方が良いでしょう。
使うことを勧めないのになんのために書いたかというと、自分がやってしまったからメモとして残したいです(笑)。
Discussion