symfony/messenger+league/csvで、CSVファイルを使ったバッチジョブを実装する
symfony/messengerとleague/csvを使って、アップロードされたCSVファイルを使ったバッチジョブを実装する手順を解説します。
symfony/messengerとは
symfony/messenger はSymfony 4.1で新たに追加されたコンポーネント で、メッセージキューイングの処理を実装するためのものです。
より具合的な使い方は以下のページに詳しいです。(というかこの記事の内容のほとんどが以下のページの要約です😅)
https://symfony.com/doc/current/messenger.html
league/csvとは
league/csvは、PHPでCSVデータを簡単かつ柔軟に扱うためのライブラリです。
こちらの記事 で詳しく紹介していますので、よろしければあわせてご参照ください。
ジョブキュー機構の実装例
というわけで早速、symfony/messengerを使っていわゆるジョブキューを実装する場合の例を示します。
大まかな流れとしては、
- symfony/messengerを設定する
- ジョブに必要な情報をシリアライズして渡すための
Messageを実装する - 受け取った
Messageを使って実際の処理を行うHandlerを実装する - コントローラなどから
Messageをディスパッチする
となります。
1. symfony/messengerを設定する
config/packages/messenger.yaml で以下のように
-
transportsを(例えばasyncという名前で)1つ作成し -
routingによって、特定のMessageをそのtransportsに流す
という設定をします。
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
App\Messenger\Message\CsvMessage: async
ちなみに、 transports の設定内容は '%env(MESSENGER_TRANSPORT_DSN)%' となっていますが、これは .env にて
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messagesMESSENGER_TRANSPORT_DSN=doctrine://defaultMESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
などのように設定しておくことができます。(参考)
2. ジョブに必要な情報をシリアライズして渡すための Message を実装する
messenger.yaml の routing で設定した Message を実装します。
// src/Messenger/Message/CsvMessage.php
namespace App\Messenger\Message;
use League\Csv\Reader;
class CsvMessage
{
/**
* @var string
*/
private $csvContent;
public function __construct(string $csvContent)
{
$this->csvContent = $csvContent;
}
public function getCsv(): Reader
{
return Reader::createFromString($this->csvContent);
}
}
league/csvを使ってCSVを Message に持たせるには、CSVファイルの内容を文字列として持たせて、後から Reader::createFromString() でCSVインスタンスにすればOKです。
3. 受け取った Message を使って実際の処理を行う Handler を実装する
Symfony\Component\Messenger\Handler\MessageHandlerInterface を実装し、 __invoke() マジックメソッドの引数に目的の Message クラスを渡すことで、その Message に対応する Handler になります。
// src/Messenger/Handler/CsvMessageHandler.php
namespace App\Messenger\Handler\CsvMessageHandler;
use App\Messenger\Message\CsvMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class CsvMessageHandler implements MessageHandlerInterface
{
public function __invoke(CsvMessage $message)
{
$csv = $message->getCsv();
// CSVを使った処理
}
}
こんな感じで、 Handler の処理でCSVインスタンスを使用できます。
4. コントローラなどから Message をディスパッチする
Message と Handler の実装と messenger.yaml での設定が揃っていれば、あとは例えば以下のような感じでコントローラの処理で $this->dispatchMessage() を使って Message をディスパッチすれば、symfony/messengerが Message を Handler に渡してくれます。
public function index(Request $request)
{
$uploadedFile = request->files->get('csv');
$csv = Reader::createFromPath($uploadedFile->getPathname());
$this->dispatchMessage(new CsvMessage($csv->getContent()));
// ...
}
実際にキューが処理されるには?
実装については前章の内容のとおりで、あとは実際にディスパッチされた Message が処理されるために、Webサーバーとは別にmessengerのワーカー(ジョブランナー)を起動しておく必要があります。(参考)
$ bin/console messenger:consume
を実行すると、ワーカープロセスが起動します。この状態で、Webサイト側で Message がディスパッチされると、ワーカープロセス側で Handler が処理を始めるという流れになります。
$ bin/console messenger:consume async
のように、特定のルーティングだけを処理させることもできます。
まとめ
- symfony/messengerを使うと、ジョブキュー機構を簡単に実装できる
- league/csvを使ってCSVデータを扱うようなジョブを実装したい場合は、
MessageにCSVコンテンツの文字列を渡すようにして、取り出すときにleague/csvのReaderクラスのインスタンスとして取り出すようにしておけばOK -
Messageを処理するワーカープロセスはbin/console messenger:consumeコマンドで起動する
Discussion