Python x DDD x Clean Architecture
はじめに
PythonでDDDでクリーンアーキテクチャで実装してみました。
良くも悪くも、"Pythonでは"あまりネット上で見かけない構成になったので共有しようと思います。
つくったものは、めちゃ簡単なTodoアプリです。
経緯
- DDDについて全く知らないので、とりあえずPythonで軽量DDDやってみよう
- ついでにクリーンアーキテクチャも全く知らないのでコレもやろう
という軽い気持ちで始めました。
参考
nrslibさんの記事を最も参考にさせていただきました。
(というより、実装していくとほぼPythonに書き直しただけのようになりました)
使用ライブラリ
今回作成したものは使用ライブラリに大きく依存するため、先に使用ライブラリを記載しておきます。
DDDのため
今回、メインで紹介する部分です。
-
pydantic
- 型チェックや設定管理に富んだライブラリ
- 個人的にすごくお気に入り
-
injector
- DI(Dependency injection)を楽にするためのライブラリ
- 今回初めて使ってみたがとても便利だった
- pydanticと併用する場合には注意点あり
外部API化のため
主なディレクトリ構成
大きく、以下の二つのモジュールに大別されます。
- コアモジュール
- 外部APIモジュール
todo
コアモジュール群:Todoアプリの実体となるモジュール群です。
ココがほぼメインの話になり、軽量DDD&クリーンアーキテクチャで実装しています。
./src/todo/
├── application # Application Services
│ ├── abc
│ └── tasks
│ ├── commons
│ ├── delete
│ ├── get_all
│ ├── get_by_id
│ ├── register
│ └── update
├── config
├── domain
│ └── models # Domain Models
│ └── tasks
├── infrastructure # Concrete Factory, QueryService and Repository by infrastructure
│ └── in_memory
│ └── tasks
└── injector # IoC Container
いま思えば、application
という名前が分かりにくいのでuse_casesとかにすれば良かったなと思っています。
外部APIモジュール
todo
モジュールをAPI化するため(外部から扱えるようにするため)のラッパーモジュールです。
rest_api
./src/rest_api/
├── main.py # Endpoints
└── models # View Models
├── healthz
└── tasks
├── commons
├── delete
├── get
├── patch
└── post
grpc
TODO: そのうち埋めるかも
共通ルール:基底クラス、抽象基底クラス
各モジュールの中を見ていく前に、どのモジュールでも共通して適用している"ゆるい"ルールがあります。
"ゆるい"とつけているのは、試行錯誤で実装していったため、完全にこのルールに従っている自信がないからです…
- 原則
pydantic.BaseModel
を使用 -
pydantic.BaseModel
のカスタムコンフィグとしてBaseConfig
とBaseFrozenConfig
を定義 - Value Objectなど不変なものは
BaseFrozenConfig
、Entityなど可変なものはBaseConfig
- IFRepositoryやIFQueryServiceなどのインターフェースクラス用に
Interface
クラスを定義(実質ABCと同義)
BaseConfig
とBaseFrozenConfig
は、今回以下のように実装しています。
詳しくはpydanticの公式ドキュメントをご参照ください。
from typing import Literal
from pydantic.config import BaseConfig as PydanticBaseConfig
class BaseConfig(PydanticBaseConfig):
validate_all = True
validate_assignment = True
copy_on_model_validation: Literal["none", "deep", "shallow"] = "deep"
class BaseFrozenConfig(PydanticBaseConfig):
validate_all: bool = True
allow_mutation: bool = False
copy_on_model_validation: Literal["none", "deep", "shallow"] = "deep"
todo
モジュール
DDDの視点から見たここからはtodo
モジュール群の中身について、DDDの視点からざっくり見ていきたいと思います。
コード自体がシンプルなため、コードをペタペタ貼りながら説明していきます。
Value Object
まずは、Value Objectの基底クラスとなるValueObjectModel
を実装します。
Value Objectは不変なので、BaseFrozenConfig
を継承します。
from pydantic import BaseModel
from custom_pydantic.config import BaseFrozenConfig
class ValueObjectModel(BaseModel):
class Config(BaseFrozenConfig):
...
以下が、Value Objectの例です。
from pydantic import StrictStr
from ..base_model import ValueObjectModel
class TaskId(ValueObjectModel):
value: StrictStr
なお、今回は実装していませんが、Value Object同士の比較を容易にするために__eq__()
を定義した方が良いと思っています。
Entity
次はEntityの説明です。
まずは、Entityの基底クラスとなるEntityModel
を実装します。
Entityは可変なので、BaseConfig
を継承します。
from pydantic import BaseModel
from custom_pydantic.config import BaseConfig
class EntityModel(BaseModel):
class Config(BaseConfig):
...
Entityクラスには、以下の特徴があります。
- フィールドのtype hintは
ValueObjectModel
- 各フィールドを変更するためのメソッドを定義
from ..base_model import EntityModel
from .task_id import TaskId
from .task_name import TaskName
from .task_status import TaskStatus
class Task(EntityModel):
task_id: TaskId
name: TaskName
status: TaskStatus
def change_name(self, name: TaskName) -> None:
self.name = name
def change_status_to_todo(self) -> None:
self.status = TaskStatus.TODO
def change_status_to_done(self) -> None:
self.status = TaskStatus.DONE
厳密には、Value Objectを勝手に変えられないように各フィールドをprivateにしてsetter, getterを指定した方が良いのかもしれませんが、書くのが面倒なうえにpydanticのうまみが半減するので、そのようにはしていません。
Repository
Repositoryは、以下のルールに従って実装しています。
- ドメインモデルを引数にとり、ドメインモデルを戻り値とする
- IFRepository(Repositoryのインターフェースクラス(抽象基底クラス))は、
domain
モジュール配下 - Concrete Repository(Repositoryの具象クラス)は、
infrastructure
モジュール配下
import abc
from ....interface import Interface
from .task import Task
from .task_id import TaskId
class IFTaskRepository(Interface):
@abc.abstractmethod
def find_by_id(self, task_id: TaskId) -> Task | None:
...
@abc.abstractmethod
def save(self, task: Task) -> None:
...
@abc.abstractmethod
def delete(self, task_id: TaskId) -> None:
...
from ....domain.models.tasks import IFTaskRepository, Task, TaskId, TaskName, TaskStatus
from .in_memory_data import TaskTuple, data
class InMemoryTaskRepository(IFTaskRepository):
def find_by_id(self, task_id: TaskId) -> Task | None:
task_tuple = data.get(task_id.value, None)
if task_tuple is None:
return None
return Task(
task_id=task_id,
name=TaskName(value=task_tuple.name),
status=TaskStatus(task_tuple.status),
)
def save(self, task: Task) -> None:
data[task.task_id.value] = TaskTuple(
name=task.name.value, status=task.status.value
)
def delete(self, task_id: TaskId) -> None:
data.pop(task_id.value, None)
今回はDictに保存していくInMemoryしか実装していませんが、IFRepositoryを実現していれば、RedisだろうがMySQLだろうがFirestoreだろうが何だってOKです。
Factory
Factoryは便利なものですがあくまで副産物なので、説明は割愛します。
実装方法としてはほぼRepositoryと同様です。
以上がドメインモデルの説明になります。
ほぼコードペタペタ貼っただけで説明がないですが…コードが説明と思ってください!
シーケンス図
さて、ここからドメインモデルを使った実装の全体を見ていきたいのですが、急に複雑になります。
全体の処理の流れを先に見ていただいたほうが分かりやすいと思うので、REST APIのシーケンス図をに載せます。
ざっくり流れを説明すると、
- クライアントからリクエストくる
- リクエストからユースケースに対応する
InputData
インスタンスをつくる -
InputData
インスタンスをInteractorBus.handle()
に渡すと、対応するユースケースのInteractor
インスタンスに処理を委譲する -
Interactor
インスタンスはドメインモデルを使って処理を実行し、OutputData
を返す -
InteractorBus
はOutputData
を返す -
OutputData
からレスポンスをつくる - レスポンスを返す
といった感じです。
InteractorBus
など細かい部分は後ほど説明しますが、各ユースケースには対応する以下の三つが必ず存在するようになります。
-
InputData
: ユースケースの入力 -
Interactor
: ユースケースのビジネスロジック -
OutputData
: ユースケースの出力
例:タスクを登録するユースケース
Todoタスクを登録するユースケースを例に説明していきます。
例に漏れず、ソースコードをペタペタ貼っていきます。
TaskRegisterInputData
タスクを登録するために必要な入力を表します。
なお、ABCInputData
は全てのInputData
クラスが継承する抽象基底クラスです。
from pydantic import StrictStr
from ...abc import ABCInputData
class TaskRegisterInputData(ABCInputData):
name: StrictStr
TaskRegisterOutputData
タスク登録後に、作成したタスクのIDや名称、ステータスを結果として出力するようにしています。
なお、ABCOutputData
は全てのOutputData
クラスが継承する抽象基底クラスです。
from pydantic import BaseModel, StrictStr
from custom_pydantic.config import BaseFrozenConfig
class TaskData(BaseModel):
task_id: StrictStr
name: StrictStr
status: StrictStr
class Config(BaseFrozenConfig):
...
from ...abc import ABCOutputData
from ..commons.task_data import TaskData
class TaskRegisterOutputData(ABCOutputData):
data: TaskData
TaskRegisterInteractor
いまの段階では、ひとまずhandle()
メソッドの中だけをご覧ください。
- 入力である
TaskRegisterInputData
から、Value ObjectであるTaskName
をつくる -
TaskName
からFactoryクラスを使ってEntityであるTask
をつくる -
Task
をRepositoryクラスに渡し、データを登録する - 登録したデータすなわち
Task
からTaskRegisterOutputData
をつくって返す
という流れになっています。
なお、ABCInteractor
は全てのInteractor
クラスが継承する抽象基底クラスです。
from typing import Any, cast
from injector import inject, singleton
from pydantic import PrivateAttr
from custom_pydantic.config import BaseFrozenConfig
from ....domain.models.tasks import IFTaskFactory, IFTaskRepository, TaskName
from ...abc import ABCInputData, ABCInteractor, ABCOutputData
from ..commons.task_data import TaskData
from .task_register_input_data import TaskRegisterInputData
from .task_register_output_data import TaskRegisterOutputData
@singleton
class TaskRegisterInteractor(ABCInteractor):
__task_factory: IFTaskFactory = PrivateAttr()
__task_repository: IFTaskRepository = PrivateAttr()
@inject
def __init__(
self,
task_factory: IFTaskFactory,
task_repository: IFTaskRepository,
**data: Any,
) -> None:
super().__init__(**data)
self.__task_factory = task_factory
self.__task_repository = task_repository
def handle(self, input_data: ABCInputData) -> ABCOutputData:
input_data = cast(TaskRegisterInputData, input_data)
name = TaskName(value=input_data.name)
task = self.__task_factory.create(name)
self.__task_repository.save(task)
return TaskRegisterOutputData(
data=TaskData(
task_id=task.task_id.value,
name=task.name.value,
status=task.status.value,
)
)
class Config(BaseFrozenConfig):
...
REST APIエンドポイント
InputData
, Interactor
, OutputData
をそれぞれ見ていきました。
せっかくなので、このタイミングでAPIへリクエストが来たところの処理も見ておきましょう。
# ...
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = cast(TaskRegisterOutputData, InteractorBus.handle(input_data))
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
まだInteractorBus
について詳しく説明していないので、いま説明している段階のソースコードに書き直すと以下のようになります。
# ...
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = TaskRegisterInteractor.handle(input_data)
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
ビジネスロジック部分を完全にInteractorに任せて、単にルーティングに専念していることが分かります。
一旦まとめ
端的にまとめると、InputData
, Interactor
, OutputData
を設けることにより以下のメリットが享受できます。
- ドメインに依存するのは
Interactor
のみ -
Interactor
はクライアントとのインターフェース部分を気にする必要がなく、ビジネスロジックに集中できる - クライアントとのインターフェース部分はビジネスロジックと完全に切り離すことができる
IoC Containerの導入
ここまで説明した内容で、軽量DDDとクリーンアーキテクチャの実装自体は行うことができます。
が、今回はさらにイケてる実装にするため、IoC Containerを導入しました。
inject
ライブラリによるDependency Injection
まず、injector
ライブラリを使用することで、Dependency Injectionが必要なクラスのインスタンス化を簡単にします。
実装の流れは以下のようになります。
ほとんどinjector
ライブラリの説明となるため、詳しくは公式のドキュメントをご参照ください。
-
__init__()
にinjector.inject
のデコレータをつける - バインディングを定義し、
Injector
インスタンス経由で取得する
__init__()
にinjector.inject
のデコレータをつける
前述のInteractorの説明の際、読み飛ばした部分がこれにあたります。
from typing import Any, cast
from injector import inject, singleton
from pydantic import PrivateAttr
from custom_pydantic.config import BaseFrozenConfig
from ....domain.models.tasks import IFTaskFactory, IFTaskRepository, TaskName
from ...abc import ABCInputData, ABCInteractor, ABCOutputData
from ..commons.task_data import TaskData
from .task_register_input_data import TaskRegisterInputData
from .task_register_output_data import TaskRegisterOutputData
@singleton
class TaskRegisterInteractor(ABCInteractor):
__task_factory: IFTaskFactory = PrivateAttr()
__task_repository: IFTaskRepository = PrivateAttr()
@inject
def __init__(
self,
task_factory: IFTaskFactory,
task_repository: IFTaskRepository,
**data: Any,
) -> None:
super().__init__(**data)
self.__task_factory = task_factory
self.__task_repository = task_repository
# ...
Injector
インスタンス経由で取得する
バインディングを定義し、バインディングの定義は、injector.Module
クラスを継承して行います。
configure()
メソッド内のbind.bind()
の部分です。
bind.bind(IFA, A)
とすることによって、IFA
の箇所にA
が使われるようになります。
from injector import Binder, Module
from ..domain.models.tasks import IFTaskFactory, IFTaskRepository
from ..infrastructure.in_memory.tasks import (
InMemoryTaskFactory,
InMemoryTaskQueryService,
InMemoryTaskRepository,
)
class TaskModule(Module):
def configure(self, binder: Binder) -> None:
binder.bind(IFTaskFactory, InMemoryTaskFactory)
binder.bind(IFTaskRepository, InMemoryTaskRepository)
Module
をInjector
をインスタンス化する際の引数に加えることで、バインディングの設定が反映されます。
from injector import Injector
from .task_module import TaskModule
injector = Injector(
[
TaskModule,
]
)
そして、injector.get(XxxInteractor)
を行うと、バインディングされた状態でXxxInteractor
インスタンスを取得することができます。
Interactorのクラスに@singleton
のデコレータがついていることに気づいた方も多いと思います。
このデコレータを付けておくと、injector.get()
を何回実行しても同じインスタンスを使い回してくれるようになります。
from todo.application.tasks.register import TaskRegisterInteractor
from todo.injector import injector
interactor = injector.get(TaskRegisterInteractor)
print(id(interactor) == injector.get(TaskRegisterInteractor)) # True
IoC Containerの実装
上記で説明したinjector
ライブラリとpydantic.Settings
を組み合わせることで、IoC Containerを実装しています。
実装方法は以下の流れです。
-
pydantic.Settings
により、環境変数から情報を読み込む - 読み込んだ情報により、バインディングする設定を変更する
pydantic.Settings
により、環境変数から情報を読み込む
pydantic.Settings
を使うことで、環境変数から情報を読み込んでくれます。
from pydantic import BaseSettings
class TaskSettings(BaseSettings):
task_adapter: TaskAdapter = "in_memory"
例えば、export TASK_ADAPTER=hogehoge"を設定してから上記の
TaskSettingsをインスタンス化すると、
task_adapterには
hogehogeが代入されます。 何も設定しないと、デフォルト値の
in_memory`が代入されます。
読み込んだ情報により、バインディングする設定を変更する
injector
の説明で載せたTaskModule
を以下のように改修します。
from injector import Binder, Module
from ..application.tasks import IFTaskQueryService
from ..config.task_settings import TaskAdapter, TaskSettings
from ..domain.models.tasks import IFTaskFactory, IFTaskRepository
from ..infrastructure.in_memory.tasks import (
InMemoryTaskFactory,
InMemoryTaskQueryService,
InMemoryTaskRepository,
)
from .exc import AdapterNotFoundError
class TaskModule(Module):
def configure(self, binder: Binder) -> None:
settings = TaskSettings()
match settings.task_adapter:
case TaskAdapter.in_memory:
binder.bind(IFTaskFactory, InMemoryTaskFactory) # type: ignore[type-abstract]
binder.bind(IFTaskRepository, InMemoryTaskRepository) # type: ignore[type-abstract]
binder.bind(IFTaskQueryService, InMemoryTaskQueryService) # type: ignore[type-abstract]
case _:
raise AdapterNotFoundError(
f"{settings.task_adapter} adapter was not found."
)
こうすることで、環境変数に応じてバインディングする情報を変更することができます。
今回in_memory
しか実装していないので少し分かりにくいので補足すると、例えば本番環境ではFirestoreを使用するとなった場合、以下のような手順で対応することができます。
export TASK_ADAPTER=firestore
-
TASK_ADAPTER=firestore
の場合にFirestoreTaskFactory
やFirestoreTaskRepository
などをバインディングするようにTaskModule
にコードを足す
IoC Containerを使うことで、バインドするInfrastructure等が変わったとしても、API化している部分などの呼び出し側のコードを変更しなくて済むようになります。
Bus Patternの適用
続いて、もう一つの改善点である「Bus Patternの適用」について説明します。
こちらは、nrslibさんの記事を読んで「なるほど!」となったので真似して実装・導入しました。
適用する場所
ここまでのソースコードで、FastAPIのルーティング部分を実装してみると、以下のようになります。
input_data
, interactor
, output_data
のところにご注目ください。
# ...
@app.get(Path.todo_by_id.value)
async def get_todo_by_id(task_id: str) -> TaskGetResponse:
input_data = TaskGetByIdInputData(task_id=task_id)
interactor = injector.get(TaskGetByIdInteractor)
output_data = interactor.handle(input_data)
if output_data.data is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return TaskGetResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
@app.get(Path.base.value)
async def get_todo_all() -> TaskGetAllResponse:
input_data = TaskGetAllInputData()
interactor = injector.get(TaskGetAllInteractor)
output_data = interactor.handle(input_data)
data = [
TaskGetResponse(id=data.task_id, name=data.name, status=data.status)
for data in output_data.data
]
return TaskGetAllResponse(data=data)
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
interactor = injector.get(TaskRegisterInteractor)
output_data = interactor.handle(input_data)
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
使っているクラスはユースケースごとに違っていても、書き方が全く一緒なことが分かると思います。
ここに手を加えてみます。
適用
input_data
, interactor
, output_data
はユースケースごとに異なります。
言い換えれば、input_data
が決まれば、自ずとinteractor
とoutput_data
も決まるということです。
この性質を利用すると、以下のように一箇所に集約することができ、加えてoutput_data
のreturnまでワンラインで実装することができます。
※ワンラインが良いとするかどうかはチームや個人の考え方によりますが
from ..application.abc import ABCInputData, ABCOutputData
from ..application.tasks.delete import TaskDeleteInputData, TaskDeleteInteractor
from ..application.tasks.get_all import TaskGetAllInputData, TaskGetAllInteractor
from ..application.tasks.get_by_id import TaskGetByIdInputData, TaskGetByIdInteractor
from ..application.tasks.register import TaskRegisterInputData, TaskRegisterInteractor
from ..application.tasks.update import TaskUpdateInputData, TaskUpdateInteractor
from .exc import InteractorNotFoundError
from .injector import injector
class InteractorBus:
@staticmethod
def handle(input_data: ABCInputData) -> ABCOutputData:
match input_data.__class__.__name__:
case TaskDeleteInputData.__name__:
return injector.get(TaskDeleteInteractor).handle(input_data)
case TaskGetByIdInputData.__name__:
return injector.get(TaskGetByIdInteractor).handle(input_data)
case TaskGetAllInputData.__name__:
return injector.get(TaskGetAllInteractor).handle(input_data)
case TaskRegisterInputData.__name__:
return injector.get(TaskRegisterInteractor).handle(input_data)
case TaskUpdateInputData.__name__:
return injector.get(TaskUpdateInteractor).handle(input_data)
case _:
raise InteractorNotFoundError(
f"Interactor corresponding to {input_data.__class__.__name__} was not found."
)
このInteractorBus
を使うと、ルーティング部分を以下のように書き換えることができます。
# ...
@app.get(Path.todo_by_id.value)
async def get_todo_by_id(task_id: str) -> TaskGetResponse:
input_data = TaskGetByIdInputData(task_id=task_id)
output_data = cast(TaskGetByIdOutputData, InteractorBus.handle(input_data))
if output_data.data is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return TaskGetResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
@app.get(Path.base.value)
async def get_todo_all() -> TaskGetAllResponse:
input_data = TaskGetAllInputData()
output_data = cast(TaskGetAllOutputData, InteractorBus.handle(input_data))
data = [
TaskGetResponse(id=data.task_id, name=data.name, status=data.status)
for data in output_data.data
]
return TaskGetAllResponse(data=data)
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = cast(TaskRegisterOutputData, InteractorBus.handle(input_data))
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
どうでしょうか。
...typing.cast
を書いているせいで改善されたのかどうか分かりにくいですね!!
結構好き嫌いが分かれると思いますが、これはこれで良いんじゃないかなあといった感じです。
外部APIが増えてきたり、もっと複雑になるとより良さが分かるかもしれません。
おわりに
以上、PythonでDDDでクリーンアーキテクチャで実装してみました。
今回実装してみて、個人的に感じた一番のことは、
ずばり importとcastが面倒 です。
書き疲れたので雑に終わりますが、ここまでPythonで実装しているものをあまり見かけなかったので、今回共有させていただきました。
Discussion