symfony/messenger+league/csvで、CSVファイルを使ったバッチジョブを実装する
symfony/messengerとleague/csvを使って、アップロードされたCSVファイルを使ったバッチジョブを実装する手順を解説します。
symfony/messengerとは
symfony/messenger はSymfony 4.1で新たに追加されたコンポーネント で、メッセージキューイングの処理を実装するためのものです。
より具合的な使い方は以下のページに詳しいです。(というかこの記事の内容のほとんどが以下のページの要約です😅)
https://symfony.com/doc/current/messenger.html
league/csvとは
league/csvは、PHPでCSVデータを簡単かつ柔軟に扱うためのライブラリです。
こちらの記事 で詳しく紹介していますので、よろしければあわせてご参照ください。
ジョブキュー機構の実装例
というわけで早速、symfony/messengerを使っていわゆるジョブキューを実装する場合の例を示します。
大まかな流れとしては、
- symfony/messengerを設定する
- ジョブに必要な情報をシリアライズして渡すための
Message
を実装する - 受け取った
Message
を使って実際の処理を行うHandler
を実装する - コントローラなどから
Message
をディスパッチする
となります。
1. symfony/messengerを設定する
config/packages/messenger.yaml
で以下のように
-
transports
を(例えばasync
という名前で)1つ作成し -
routing
によって、特定のMessage
をそのtransports
に流す
という設定をします。
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
App\Messenger\Message\CsvMessage: async
ちなみに、 transports
の設定内容は '%env(MESSENGER_TRANSPORT_DSN)%'
となっていますが、これは .env
にて
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
MESSENGER_TRANSPORT_DSN=doctrine://default
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
などのように設定しておくことができます。(参考)
Message
を実装する
2. ジョブに必要な情報をシリアライズして渡すための messenger.yaml
の routing
で設定した Message
を実装します。
// src/Messenger/Message/CsvMessage.php
namespace App\Messenger\Message;
use League\Csv\Reader;
class CsvMessage
{
/**
* @var string
*/
private $csvContent;
public function __construct(string $csvContent)
{
$this->csvContent = $csvContent;
}
public function getCsv(): Reader
{
return Reader::createFromString($this->csvContent);
}
}
league/csvを使ってCSVを Message
に持たせるには、CSVファイルの内容を文字列として持たせて、後から Reader::createFromString()
でCSVインスタンスにすればOKです。
Message
を使って実際の処理を行う Handler
を実装する
3. 受け取った Symfony\Component\Messenger\Handler\MessageHandlerInterface
を実装し、 __invoke()
マジックメソッドの引数に目的の Message
クラスを渡すことで、その Message
に対応する Handler
になります。
// src/Messenger/Handler/CsvMessageHandler.php
namespace App\Messenger\Handler\CsvMessageHandler;
use App\Messenger\Message\CsvMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class CsvMessageHandler implements MessageHandlerInterface
{
public function __invoke(CsvMessage $message)
{
$csv = $message->getCsv();
// CSVを使った処理
}
}
こんな感じで、 Handler
の処理でCSVインスタンスを使用できます。
Message
をディスパッチする
4. コントローラなどから Message
と Handler
の実装と messenger.yaml
での設定が揃っていれば、あとは例えば以下のような感じでコントローラの処理で $this->dispatchMessage()
を使って Message
をディスパッチすれば、symfony/messengerが Message
を Handler
に渡してくれます。
public function index(Request $request)
{
$uploadedFile = request->files->get('csv');
$csv = Reader::createFromPath($uploadedFile->getPathname());
$this->dispatchMessage(new CsvMessage($csv->getContent()));
// ...
}
実際にキューが処理されるには?
実装については前章の内容のとおりで、あとは実際にディスパッチされた Message
が処理されるために、Webサーバーとは別にmessengerのワーカー(ジョブランナー)を起動しておく必要があります。(参考)
$ bin/console messenger:consume
を実行すると、ワーカープロセスが起動します。この状態で、Webサイト側で Message
がディスパッチされると、ワーカープロセス側で Handler
が処理を始めるという流れになります。
$ bin/console messenger:consume async
のように、特定のルーティングだけを処理させることもできます。
まとめ
- symfony/messengerを使うと、ジョブキュー機構を簡単に実装できる
- league/csvを使ってCSVデータを扱うようなジョブを実装したい場合は、
Message
にCSVコンテンツの文字列を渡すようにして、取り出すときにleague/csvのReader
クラスのインスタンスとして取り出すようにしておけばOK -
Message
を処理するワーカープロセスはbin/console messenger:consume
コマンドで起動する
Discussion