🎻

symfony/messenger+league/csvで、CSVファイルを使ったバッチジョブを実装する

2020/03/29に公開

symfony/messengerとleague/csvを使って、アップロードされたCSVファイルを使ったバッチジョブを実装する手順を解説します。

symfony/messengerとは

symfony/messengerSymfony 4.1で新たに追加されたコンポーネント で、メッセージキューイングの処理を実装するためのものです。

より具合的な使い方は以下のページに詳しいです。(というかこの記事の内容のほとんどが以下のページの要約です😅)

https://symfony.com/doc/current/messenger.html

league/csvとは

league/csvは、PHPでCSVデータを簡単かつ柔軟に扱うためのライブラリです。

こちらの記事 で詳しく紹介していますので、よろしければあわせてご参照ください。

ジョブキュー機構の実装例

というわけで早速、symfony/messengerを使っていわゆるジョブキューを実装する場合の例を示します。

大まかな流れとしては、

  1. symfony/messengerを設定する
  2. ジョブに必要な情報をシリアライズして渡すための Message を実装する
  3. 受け取った Message を使って実際の処理を行う Handler を実装する
  4. コントローラなどから Message をディスパッチする

となります。

1. symfony/messengerを設定する

config/packages/messenger.yaml で以下のように

  • transports を(例えば async という名前で)1つ作成し
  • routing によって、特定の Message をその transports に流す

という設定をします。

# config/packages/messenger.yaml

framework:
  messenger:
      transports:
          async: '%env(MESSENGER_TRANSPORT_DSN)%'

      routing:
          App\Messenger\Message\CsvMessage: async

ちなみに、 transports の設定内容は '%env(MESSENGER_TRANSPORT_DSN)%' となっていますが、これは .env にて

  • MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
  • MESSENGER_TRANSPORT_DSN=doctrine://default
  • MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

などのように設定しておくことができます。(参考

2. ジョブに必要な情報をシリアライズして渡すための Message を実装する

messenger.yamlrouting で設定した Message を実装します。

// src/Messenger/Message/CsvMessage.php

namespace App\Messenger\Message;

use League\Csv\Reader;

class CsvMessage
{
    /**
     * @var string
     */
    private $csvContent;

    public function __construct(string $csvContent)
    {
        $this->csvContent = $csvContent;
    }

    public function getCsv(): Reader
    {
        return Reader::createFromString($this->csvContent);
    }
}

league/csvを使ってCSVを Message に持たせるには、CSVファイルの内容を文字列として持たせて、後から Reader::createFromString() でCSVインスタンスにすればOKです。

3. 受け取った Message を使って実際の処理を行う Handler を実装する

Symfony\Component\Messenger\Handler\MessageHandlerInterface を実装し、 __invoke() マジックメソッドの引数に目的の Message クラスを渡すことで、その Message に対応する Handler になります。

// src/Messenger/Handler/CsvMessageHandler.php

namespace App\Messenger\Handler\CsvMessageHandler;

use App\Messenger\Message\CsvMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CsvMessageHandler implements MessageHandlerInterface
{
    public function __invoke(CsvMessage $message)
    {
        $csv = $message->getCsv();

        // CSVを使った処理
    }
}

こんな感じで、 Handler の処理でCSVインスタンスを使用できます。

4. コントローラなどから Message をディスパッチする

MessageHandler の実装と messenger.yaml での設定が揃っていれば、あとは例えば以下のような感じでコントローラの処理で $this->dispatchMessage() を使って Message をディスパッチすれば、symfony/messengerが MessageHandler に渡してくれます。

public function index(Request $request)
{
    $uploadedFile = request->files->get('csv');
    $csv = Reader::createFromPath($uploadedFile->getPathname());

    $this->dispatchMessage(new CsvMessage($csv->getContent()));

    // ...
}

実際にキューが処理されるには?

実装については前章の内容のとおりで、あとは実際にディスパッチされた Message が処理されるために、Webサーバーとは別にmessengerのワーカー(ジョブランナー)を起動しておく必要があります。(参考

$ bin/console messenger:consume

を実行すると、ワーカープロセスが起動します。この状態で、Webサイト側で Message がディスパッチされると、ワーカープロセス側で Handler が処理を始めるという流れになります。

$ bin/console messenger:consume async

のように、特定のルーティングだけを処理させることもできます。

まとめ

  • symfony/messengerを使うと、ジョブキュー機構を簡単に実装できる
  • league/csvを使ってCSVデータを扱うようなジョブを実装したい場合は、 Message にCSVコンテンツの文字列を渡すようにして、取り出すときにleague/csvの Reader クラスのインスタンスとして取り出すようにしておけばOK
  • Message を処理するワーカープロセスは bin/console messenger:consume コマンドで起動する
GitHubで編集を提案

Discussion