Symfony(Doctrine)で1つのデータベースに対して複数のエンティティマネージャーを用意して使い分ける方法
Symfony(Doctrine)で1つのデータベースに対して複数のエンティティマネージャーを用意して使い分ける方法について解説します。
はじめに
先日、以下のような仕様のコードを書いていました。
- いくつかのエンティティがある
- エンティティそれぞれに対して一括生成のインポート処理がある
- インポート処理はバッチジョブとして非同期に実行する
- ジョブの状態や結果もエンティティとして保存する
まずは、ジョブエンティティのことを考えずに、単純にエンティティのインポート処理を書いてみると、以下のようなコードになるかなと思います。
symfony/messenger の Handler として実装しているイメージです。
class ImportationHandler implements MessageHandlerInterface
{
private $em;
public function __construct(EntityManagerInterface $em)
{
$this->em = $em;
}
public function __invoke(Importation $message)
{
foreach ($message->getCsv() as $row) {
$entity = new Entity();
$entity->setName($row[Csv::COL_NAME]);
$this->em->persist($entity);
}
$this->em->flush();
}
}
このコードに、ジョブの状態や結果をジョブエンティティとして保存する処理を追加してみます。
class ImportationHandler implements MessageHandlerInterface
{
private $em;
public function __construct(EntityManagerInterface $em)
{
$this->em = $em;
}
public function __invoke(Importation $message)
{
// ジョブを「待ち状態」として保存
$job = (new Job())->setState(Job::STATE_QUEUED);
$this->em->persist($job);
$this->em->flush();
foreach ($message->getCsv() as $row) {
$entity = new Entity();
$entity->setName($row[Csv::COL_NAME]);
$this->em->persist($entity);
}
// ジョブを「完了状態」に更新
$job->setState(Job::STATE_COMPLETE);
// インポート処理とジョブの更新をまとめてflush
$this->em->flush();
}
}
この時点でだいぶ良くなさそうな匂いがしていますが、さらにインポート処理が失敗したときのことを考慮しようとすると、
class ImportationHandler implements MessageHandlerInterface
{
private $em;
public function __construct(EntityManagerInterface $em)
{
$this->em = $em;
}
public function __invoke(Importation $message)
{
// ジョブを「待ち状態」として保存
$job = (new Job())->setState(Job::STATE_QUEUED);
$this->em->persist($job);
$this->em->flush();
try {
foreach ($message->getCsv() as $row) {
$entity = new Entity();
$entity->setName($row[Csv::COL_NAME]);
$this->em->persist($entity);
}
// ジョブを「完了状態」に更新
$job->setState(Job::STATE_COMPLETE);
// インポート処理とジョブの更新をまとめてflush
$this->em->flush();
} catch (\Exception $e) {
// ジョブを「エラー状態」に更新
$job->setState(Job::STATE_ERROR);
// ※インポート処理も一緒にflushされてしまう
$this->em->flush();
}
}
}
こんな感じになって詰みます。
以下のように、ジョブをflushする前にエンティティマネージャーをclearしておけばいいのでは?と思うかもしれません。
} catch (\Exception $e) {
$job->setState(Job::STATE_ERROR);
$this->em->clear();
$this->em->persist($job);
$this->em->flush();
}
しかしこれだと、最初に作った「待ち状態」のジョブと最後にpersistした「エラー状態」のジョブが、エンティティマネージャーから見て同一のエンティティではなくなってしまっているため、データベース上に「待ち状態」のジョブが残り続けてしまいます。
ジョブ用にもう1つエンティティマネージャーを用意する
前置きが長くなりましたが、このような問題を解決するために、今回はジョブ用にエンティティマネージャーをもう1つ用意するという方法でそれなりにきれいに対応できたので、その手順をご紹介しようというものです。
そもそもジョブとその他のエンティティが、ドメインが異なるのに同一のデータベースで管理されていることが問題だとか色々ツッコミが聞こえてきそうですが、今回はちょっと手を抜いてこういう対応になっています💨
一旦、そもそも設計が悪いという話はここでは目をつぶってください🙏
というわけで、具体的な方法についてですが、以下のドキュメントにあるとおり、Symfonyではもともと複数のエンティティマネージャーを併用することが想定されています。
How to Work with multiple Entity Managers and Connections
https://symfony.com/doc/current/doctrine/multiple_entity_managers.html
ドキュメントに書かれている例は複数のデータベースがあって、それぞれに対してエンティティマネージャーを用意するという内容ですが、実は対象とするデータベースは同一のものでも大丈夫です。
その場合、 config/packages/doctrine.yaml
の内容は例えば以下のようになります。
doctrine:
dbal:
url: '%env(resolve:DATABASE_URL)%'
# symfony/messenger用の設定
schema_filter: '~^(?!messenger_messages)~'
orm:
auto_generate_proxy_classes: true
default_entity_manager: default
entity_managers:
default:
naming_strategy: doctrine.orm.naming_strategy.underscore_number_aware
auto_mapping: true
mappings:
App:
is_bundle: false
type: annotation
dir: '%kernel.project_dir%/src/Entity'
prefix: 'App\Entity'
alias: App
job:
naming_strategy: doctrine.orm.naming_strategy.underscore_number_aware
mappings:
Job:
is_bundle: false
type: annotation
dir: '%kernel.project_dir%/src/Entity'
prefix: 'App\Entity'
alias: Job
このように、 default
と job
という2つのエンティティマネージャーを、(デフォルトなので明記していませんが)同一のデータベースコネクションに対して作成します。
準備としてはこれだけです。
複数のエンティティマネージャーを使い分ける
新しく作った job
エンティティマネージャーを使うように、先ほどのインポート処理を書き換えてみましょう。
class ImportationHandler implements MessageHandlerInterface
{
private $em;
private $jem;
public function __construct(EntityManagerInterface $em, ManagerRegistry $doctrine)
{
$this->em = $em;
$this->jem = $doctrine->getManager('job');
}
public function __invoke(Importation $message)
{
// ジョブを「待ち状態」として保存
$job = (new Job())->setState(Job::STATE_QUEUED);
$this->jem->persist($job);
$this->jem->flush();
try {
foreach ($message->getCsv() as $row) {
$entity = new Entity();
$entity->setName($row[Csv::COL_NAME]);
$this->em->persist($entity);
}
// インポート処理をflush
$this->em->flush();
// ジョブを「完了状態」に更新してflush
$job->setState(Job::STATE_COMPLETE);
$this->jem->flush();
} catch (\Exception $e) {
// ジョブを「エラー状態」に更新してflush
$job->setState(Job::STATE_ERROR);
$this->jem->flush();
}
}
}
こんな感じで、デフォルトのエンティティマネージャーとは別に job
エンティティマネージャーを取得しておいて、インポート処理とジョブの保存それぞれでエンティティマネージャーを使い分けます。
これで、
- インポート処理が失敗したときは途中までのインポートがflushされてしまうことはない
- 待ち状態のジョブが残り続けてしまうこともない
という要件を満たすことができました。
まとめ
- Symfonyではエンティティマネージャーを複数使うことが可能
- 複数のエンティティマネージャーが同一のデータベースコネクションを持つことも可能
- それを利用すると、インポート処理とジョブの更新という2つのデータベース操作を独立して管理したりできる
Discussion