Python x DDD x Clean Architecture
はじめに
PythonでDDDでクリーンアーキテクチャで実装してみました。
良くも悪くも、"Pythonでは"あまりネット上で見かけない構成になったので共有しようと思います。
つくったものは、めちゃ簡単なTodoアプリです。
経緯
- DDDについて全く知らないので、とりあえずPythonで軽量DDDやってみよう
- ついでにクリーンアーキテクチャも全く知らないのでコレもやろう
という軽い気持ちで始めました。
参考
nrslibさんの記事を最も参考にさせていただきました。
(というより、実装していくとほぼPythonに書き直しただけのようになりました)
使用ライブラリ
今回作成したものは使用ライブラリに大きく依存するため、先に使用ライブラリを記載しておきます。
DDDのため
今回、メインで紹介する部分です。
-
pydantic
- 型チェックや設定管理に富んだライブラリ
- 個人的にすごくお気に入り
-
injector
- DI(Dependency injection)を楽にするためのライブラリ
- 今回初めて使ってみたがとても便利だった
- pydanticと併用する場合には注意点あり
外部API化のため
主なディレクトリ構成
大きく、以下の二つのモジュールに大別されます。
- コアモジュール
- 外部APIモジュール
コアモジュール群:todo
Todoアプリの実体となるモジュール群です。
ココがほぼメインの話になり、軽量DDD&クリーンアーキテクチャで実装しています。
./src/todo/
├── application # Application Services
│ ├── abc
│ └── tasks
│ ├── commons
│ ├── delete
│ ├── get_all
│ ├── get_by_id
│ ├── register
│ └── update
├── config
├── domain
│ └── models # Domain Models
│ └── tasks
├── infrastructure # Concrete Factory, QueryService and Repository by infrastructure
│ └── in_memory
│ └── tasks
└── injector # IoC Container
いま思えば、applicationという名前が分かりにくいのでuse_casesとかにすれば良かったなと思っています。
外部APIモジュール
todoモジュールをAPI化するため(外部から扱えるようにするため)のラッパーモジュールです。
rest_api
./src/rest_api/
├── main.py # Endpoints
└── models # View Models
├── healthz
└── tasks
├── commons
├── delete
├── get
├── patch
└── post
grpc
TODO: そのうち埋めるかも
共通ルール:基底クラス、抽象基底クラス
各モジュールの中を見ていく前に、どのモジュールでも共通して適用している"ゆるい"ルールがあります。
"ゆるい"とつけているのは、試行錯誤で実装していったため、完全にこのルールに従っている自信がないからです…
- 原則
pydantic.BaseModelを使用 -
pydantic.BaseModelのカスタムコンフィグとしてBaseConfigとBaseFrozenConfigを定義 - Value Objectなど不変なものは
BaseFrozenConfig、Entityなど可変なものはBaseConfig - IFRepositoryやIFQueryServiceなどのインターフェースクラス用に
Interfaceクラスを定義(実質ABCと同義)
BaseConfigとBaseFrozenConfigは、今回以下のように実装しています。
詳しくはpydanticの公式ドキュメントをご参照ください。
from typing import Literal
from pydantic.config import BaseConfig as PydanticBaseConfig
class BaseConfig(PydanticBaseConfig):
validate_all = True
validate_assignment = True
copy_on_model_validation: Literal["none", "deep", "shallow"] = "deep"
class BaseFrozenConfig(PydanticBaseConfig):
validate_all: bool = True
allow_mutation: bool = False
copy_on_model_validation: Literal["none", "deep", "shallow"] = "deep"
DDDの視点から見たtodoモジュール
ここからはtodoモジュール群の中身について、DDDの視点からざっくり見ていきたいと思います。
コード自体がシンプルなため、コードをペタペタ貼りながら説明していきます。
Value Object
まずは、Value Objectの基底クラスとなるValueObjectModelを実装します。
Value Objectは不変なので、BaseFrozenConfigを継承します。
from pydantic import BaseModel
from custom_pydantic.config import BaseFrozenConfig
class ValueObjectModel(BaseModel):
class Config(BaseFrozenConfig):
...
以下が、Value Objectの例です。
from pydantic import StrictStr
from ..base_model import ValueObjectModel
class TaskId(ValueObjectModel):
value: StrictStr
なお、今回は実装していませんが、Value Object同士の比較を容易にするために__eq__()を定義した方が良いと思っています。
Entity
次はEntityの説明です。
まずは、Entityの基底クラスとなるEntityModelを実装します。
Entityは可変なので、BaseConfigを継承します。
from pydantic import BaseModel
from custom_pydantic.config import BaseConfig
class EntityModel(BaseModel):
class Config(BaseConfig):
...
Entityクラスには、以下の特徴があります。
- フィールドのtype hintは
ValueObjectModel - 各フィールドを変更するためのメソッドを定義
from ..base_model import EntityModel
from .task_id import TaskId
from .task_name import TaskName
from .task_status import TaskStatus
class Task(EntityModel):
task_id: TaskId
name: TaskName
status: TaskStatus
def change_name(self, name: TaskName) -> None:
self.name = name
def change_status_to_todo(self) -> None:
self.status = TaskStatus.TODO
def change_status_to_done(self) -> None:
self.status = TaskStatus.DONE
厳密には、Value Objectを勝手に変えられないように各フィールドをprivateにしてsetter, getterを指定した方が良いのかもしれませんが、書くのが面倒なうえにpydanticのうまみが半減するので、そのようにはしていません。
Repository
Repositoryは、以下のルールに従って実装しています。
- ドメインモデルを引数にとり、ドメインモデルを戻り値とする
- IFRepository(Repositoryのインターフェースクラス(抽象基底クラス))は、
domainモジュール配下 - Concrete Repository(Repositoryの具象クラス)は、
infrastructureモジュール配下
import abc
from ....interface import Interface
from .task import Task
from .task_id import TaskId
class IFTaskRepository(Interface):
@abc.abstractmethod
def find_by_id(self, task_id: TaskId) -> Task | None:
...
@abc.abstractmethod
def save(self, task: Task) -> None:
...
@abc.abstractmethod
def delete(self, task_id: TaskId) -> None:
...
from ....domain.models.tasks import IFTaskRepository, Task, TaskId, TaskName, TaskStatus
from .in_memory_data import TaskTuple, data
class InMemoryTaskRepository(IFTaskRepository):
def find_by_id(self, task_id: TaskId) -> Task | None:
task_tuple = data.get(task_id.value, None)
if task_tuple is None:
return None
return Task(
task_id=task_id,
name=TaskName(value=task_tuple.name),
status=TaskStatus(task_tuple.status),
)
def save(self, task: Task) -> None:
data[task.task_id.value] = TaskTuple(
name=task.name.value, status=task.status.value
)
def delete(self, task_id: TaskId) -> None:
data.pop(task_id.value, None)
今回はDictに保存していくInMemoryしか実装していませんが、IFRepositoryを実現していれば、RedisだろうがMySQLだろうがFirestoreだろうが何だってOKです。
Factory
Factoryは便利なものですがあくまで副産物なので、説明は割愛します。
実装方法としてはほぼRepositoryと同様です。
以上がドメインモデルの説明になります。
ほぼコードペタペタ貼っただけで説明がないですが…コードが説明と思ってください!
シーケンス図
さて、ここからドメインモデルを使った実装の全体を見ていきたいのですが、急に複雑になります。
全体の処理の流れを先に見ていただいたほうが分かりやすいと思うので、REST APIのシーケンス図をに載せます。
ざっくり流れを説明すると、
- クライアントからリクエストくる
- リクエストからユースケースに対応する
InputDataインスタンスをつくる -
InputDataインスタンスをInteractorBus.handle()に渡すと、対応するユースケースのInteractorインスタンスに処理を委譲する -
Interactorインスタンスはドメインモデルを使って処理を実行し、OutputDataを返す -
InteractorBusはOutputDataを返す -
OutputDataからレスポンスをつくる - レスポンスを返す
といった感じです。
InteractorBusなど細かい部分は後ほど説明しますが、各ユースケースには対応する以下の三つが必ず存在するようになります。
-
InputData: ユースケースの入力 -
Interactor: ユースケースのビジネスロジック -
OutputData: ユースケースの出力
例:タスクを登録するユースケース
Todoタスクを登録するユースケースを例に説明していきます。
例に漏れず、ソースコードをペタペタ貼っていきます。
TaskRegisterInputData
タスクを登録するために必要な入力を表します。
なお、ABCInputDataは全てのInputDataクラスが継承する抽象基底クラスです。
from pydantic import StrictStr
from ...abc import ABCInputData
class TaskRegisterInputData(ABCInputData):
name: StrictStr
TaskRegisterOutputData
タスク登録後に、作成したタスクのIDや名称、ステータスを結果として出力するようにしています。
なお、ABCOutputDataは全てのOutputDataクラスが継承する抽象基底クラスです。
from pydantic import BaseModel, StrictStr
from custom_pydantic.config import BaseFrozenConfig
class TaskData(BaseModel):
task_id: StrictStr
name: StrictStr
status: StrictStr
class Config(BaseFrozenConfig):
...
from ...abc import ABCOutputData
from ..commons.task_data import TaskData
class TaskRegisterOutputData(ABCOutputData):
data: TaskData
TaskRegisterInteractor
いまの段階では、ひとまずhandle()メソッドの中だけをご覧ください。
- 入力である
TaskRegisterInputDataから、Value ObjectであるTaskNameをつくる -
TaskNameからFactoryクラスを使ってEntityであるTaskをつくる -
TaskをRepositoryクラスに渡し、データを登録する - 登録したデータすなわち
TaskからTaskRegisterOutputDataをつくって返す
という流れになっています。
なお、ABCInteractorは全てのInteractorクラスが継承する抽象基底クラスです。
from typing import Any, cast
from injector import inject, singleton
from pydantic import PrivateAttr
from custom_pydantic.config import BaseFrozenConfig
from ....domain.models.tasks import IFTaskFactory, IFTaskRepository, TaskName
from ...abc import ABCInputData, ABCInteractor, ABCOutputData
from ..commons.task_data import TaskData
from .task_register_input_data import TaskRegisterInputData
from .task_register_output_data import TaskRegisterOutputData
@singleton
class TaskRegisterInteractor(ABCInteractor):
__task_factory: IFTaskFactory = PrivateAttr()
__task_repository: IFTaskRepository = PrivateAttr()
@inject
def __init__(
self,
task_factory: IFTaskFactory,
task_repository: IFTaskRepository,
**data: Any,
) -> None:
super().__init__(**data)
self.__task_factory = task_factory
self.__task_repository = task_repository
def handle(self, input_data: ABCInputData) -> ABCOutputData:
input_data = cast(TaskRegisterInputData, input_data)
name = TaskName(value=input_data.name)
task = self.__task_factory.create(name)
self.__task_repository.save(task)
return TaskRegisterOutputData(
data=TaskData(
task_id=task.task_id.value,
name=task.name.value,
status=task.status.value,
)
)
class Config(BaseFrozenConfig):
...
REST APIエンドポイント
InputData, Interactor, OutputDataをそれぞれ見ていきました。
せっかくなので、このタイミングでAPIへリクエストが来たところの処理も見ておきましょう。
# ...
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = cast(TaskRegisterOutputData, InteractorBus.handle(input_data))
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
まだInteractorBusについて詳しく説明していないので、いま説明している段階のソースコードに書き直すと以下のようになります。
# ...
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = TaskRegisterInteractor.handle(input_data)
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
ビジネスロジック部分を完全にInteractorに任せて、単にルーティングに専念していることが分かります。
一旦まとめ
端的にまとめると、InputData, Interactor, OutputDataを設けることにより以下のメリットが享受できます。
- ドメインに依存するのは
Interactorのみ -
Interactorはクライアントとのインターフェース部分を気にする必要がなく、ビジネスロジックに集中できる - クライアントとのインターフェース部分はビジネスロジックと完全に切り離すことができる
IoC Containerの導入
ここまで説明した内容で、軽量DDDとクリーンアーキテクチャの実装自体は行うことができます。
が、今回はさらにイケてる実装にするため、IoC Containerを導入しました。
injectライブラリによるDependency Injection
まず、injectorライブラリを使用することで、Dependency Injectionが必要なクラスのインスタンス化を簡単にします。
実装の流れは以下のようになります。
ほとんどinjectorライブラリの説明となるため、詳しくは公式のドキュメントをご参照ください。
-
__init__()にinjector.injectのデコレータをつける - バインディングを定義し、
Injectorインスタンス経由で取得する
__init__()にinjector.injectのデコレータをつける
前述のInteractorの説明の際、読み飛ばした部分がこれにあたります。
from typing import Any, cast
from injector import inject, singleton
from pydantic import PrivateAttr
from custom_pydantic.config import BaseFrozenConfig
from ....domain.models.tasks import IFTaskFactory, IFTaskRepository, TaskName
from ...abc import ABCInputData, ABCInteractor, ABCOutputData
from ..commons.task_data import TaskData
from .task_register_input_data import TaskRegisterInputData
from .task_register_output_data import TaskRegisterOutputData
@singleton
class TaskRegisterInteractor(ABCInteractor):
__task_factory: IFTaskFactory = PrivateAttr()
__task_repository: IFTaskRepository = PrivateAttr()
@inject
def __init__(
self,
task_factory: IFTaskFactory,
task_repository: IFTaskRepository,
**data: Any,
) -> None:
super().__init__(**data)
self.__task_factory = task_factory
self.__task_repository = task_repository
# ...
バインディングを定義し、Injectorインスタンス経由で取得する
バインディングの定義は、injector.Moduleクラスを継承して行います。
configure()メソッド内のbind.bind()の部分です。
bind.bind(IFA, A)とすることによって、IFAの箇所にAが使われるようになります。
from injector import Binder, Module
from ..domain.models.tasks import IFTaskFactory, IFTaskRepository
from ..infrastructure.in_memory.tasks import (
InMemoryTaskFactory,
InMemoryTaskQueryService,
InMemoryTaskRepository,
)
class TaskModule(Module):
def configure(self, binder: Binder) -> None:
binder.bind(IFTaskFactory, InMemoryTaskFactory)
binder.bind(IFTaskRepository, InMemoryTaskRepository)
ModuleをInjectorをインスタンス化する際の引数に加えることで、バインディングの設定が反映されます。
from injector import Injector
from .task_module import TaskModule
injector = Injector(
[
TaskModule,
]
)
そして、injector.get(XxxInteractor)を行うと、バインディングされた状態でXxxInteractorインスタンスを取得することができます。
Interactorのクラスに@singletonのデコレータがついていることに気づいた方も多いと思います。
このデコレータを付けておくと、injector.get()を何回実行しても同じインスタンスを使い回してくれるようになります。
from todo.application.tasks.register import TaskRegisterInteractor
from todo.injector import injector
interactor = injector.get(TaskRegisterInteractor)
print(id(interactor) == injector.get(TaskRegisterInteractor)) # True
IoC Containerの実装
上記で説明したinjectorライブラリとpydantic.Settingsを組み合わせることで、IoC Containerを実装しています。
実装方法は以下の流れです。
-
pydantic.Settingsにより、環境変数から情報を読み込む - 読み込んだ情報により、バインディングする設定を変更する
pydantic.Settingsにより、環境変数から情報を読み込む
pydantic.Settingsを使うことで、環境変数から情報を読み込んでくれます。
from pydantic import BaseSettings
class TaskSettings(BaseSettings):
task_adapter: TaskAdapter = "in_memory"
例えば、export TASK_ADAPTER=hogehoge"を設定してから上記のTaskSettingsをインスタンス化すると、task_adapterにはhogehogeが代入されます。 何も設定しないと、デフォルト値のin_memory`が代入されます。
読み込んだ情報により、バインディングする設定を変更する
injectorの説明で載せたTaskModuleを以下のように改修します。
from injector import Binder, Module
from ..application.tasks import IFTaskQueryService
from ..config.task_settings import TaskAdapter, TaskSettings
from ..domain.models.tasks import IFTaskFactory, IFTaskRepository
from ..infrastructure.in_memory.tasks import (
InMemoryTaskFactory,
InMemoryTaskQueryService,
InMemoryTaskRepository,
)
from .exc import AdapterNotFoundError
class TaskModule(Module):
def configure(self, binder: Binder) -> None:
settings = TaskSettings()
match settings.task_adapter:
case TaskAdapter.in_memory:
binder.bind(IFTaskFactory, InMemoryTaskFactory) # type: ignore[type-abstract]
binder.bind(IFTaskRepository, InMemoryTaskRepository) # type: ignore[type-abstract]
binder.bind(IFTaskQueryService, InMemoryTaskQueryService) # type: ignore[type-abstract]
case _:
raise AdapterNotFoundError(
f"{settings.task_adapter} adapter was not found."
)
こうすることで、環境変数に応じてバインディングする情報を変更することができます。
今回in_memoryしか実装していないので少し分かりにくいので補足すると、例えば本番環境ではFirestoreを使用するとなった場合、以下のような手順で対応することができます。
export TASK_ADAPTER=firestore-
TASK_ADAPTER=firestoreの場合にFirestoreTaskFactoryやFirestoreTaskRepositoryなどをバインディングするようにTaskModuleにコードを足す
IoC Containerを使うことで、バインドするInfrastructure等が変わったとしても、API化している部分などの呼び出し側のコードを変更しなくて済むようになります。
Bus Patternの適用
続いて、もう一つの改善点である「Bus Patternの適用」について説明します。
こちらは、nrslibさんの記事を読んで「なるほど!」となったので真似して実装・導入しました。
適用する場所
ここまでのソースコードで、FastAPIのルーティング部分を実装してみると、以下のようになります。
input_data, interactor, output_dataのところにご注目ください。
# ...
@app.get(Path.todo_by_id.value)
async def get_todo_by_id(task_id: str) -> TaskGetResponse:
input_data = TaskGetByIdInputData(task_id=task_id)
interactor = injector.get(TaskGetByIdInteractor)
output_data = interactor.handle(input_data)
if output_data.data is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return TaskGetResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
@app.get(Path.base.value)
async def get_todo_all() -> TaskGetAllResponse:
input_data = TaskGetAllInputData()
interactor = injector.get(TaskGetAllInteractor)
output_data = interactor.handle(input_data)
data = [
TaskGetResponse(id=data.task_id, name=data.name, status=data.status)
for data in output_data.data
]
return TaskGetAllResponse(data=data)
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
interactor = injector.get(TaskRegisterInteractor)
output_data = interactor.handle(input_data)
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
使っているクラスはユースケースごとに違っていても、書き方が全く一緒なことが分かると思います。
ここに手を加えてみます。
適用
input_data, interactor, output_dataはユースケースごとに異なります。
言い換えれば、input_dataが決まれば、自ずとinteractorとoutput_dataも決まるということです。
この性質を利用すると、以下のように一箇所に集約することができ、加えてoutput_dataのreturnまでワンラインで実装することができます。
※ワンラインが良いとするかどうかはチームや個人の考え方によりますが
from ..application.abc import ABCInputData, ABCOutputData
from ..application.tasks.delete import TaskDeleteInputData, TaskDeleteInteractor
from ..application.tasks.get_all import TaskGetAllInputData, TaskGetAllInteractor
from ..application.tasks.get_by_id import TaskGetByIdInputData, TaskGetByIdInteractor
from ..application.tasks.register import TaskRegisterInputData, TaskRegisterInteractor
from ..application.tasks.update import TaskUpdateInputData, TaskUpdateInteractor
from .exc import InteractorNotFoundError
from .injector import injector
class InteractorBus:
@staticmethod
def handle(input_data: ABCInputData) -> ABCOutputData:
match input_data.__class__.__name__:
case TaskDeleteInputData.__name__:
return injector.get(TaskDeleteInteractor).handle(input_data)
case TaskGetByIdInputData.__name__:
return injector.get(TaskGetByIdInteractor).handle(input_data)
case TaskGetAllInputData.__name__:
return injector.get(TaskGetAllInteractor).handle(input_data)
case TaskRegisterInputData.__name__:
return injector.get(TaskRegisterInteractor).handle(input_data)
case TaskUpdateInputData.__name__:
return injector.get(TaskUpdateInteractor).handle(input_data)
case _:
raise InteractorNotFoundError(
f"Interactor corresponding to {input_data.__class__.__name__} was not found."
)
このInteractorBusを使うと、ルーティング部分を以下のように書き換えることができます。
# ...
@app.get(Path.todo_by_id.value)
async def get_todo_by_id(task_id: str) -> TaskGetResponse:
input_data = TaskGetByIdInputData(task_id=task_id)
output_data = cast(TaskGetByIdOutputData, InteractorBus.handle(input_data))
if output_data.data is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return TaskGetResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
@app.get(Path.base.value)
async def get_todo_all() -> TaskGetAllResponse:
input_data = TaskGetAllInputData()
output_data = cast(TaskGetAllOutputData, InteractorBus.handle(input_data))
data = [
TaskGetResponse(id=data.task_id, name=data.name, status=data.status)
for data in output_data.data
]
return TaskGetAllResponse(data=data)
@app.post(Path.base.value, status_code=status.HTTP_201_CREATED)
async def post_todo(request: TaskPostRequest) -> TaskPostResponse:
input_data = TaskRegisterInputData(name=request.name)
output_data = cast(TaskRegisterOutputData, InteractorBus.handle(input_data))
return TaskPostResponse(
id=output_data.data.task_id,
name=output_data.data.name,
status=output_data.data.status,
)
# ...
どうでしょうか。
...typing.castを書いているせいで改善されたのかどうか分かりにくいですね!!
結構好き嫌いが分かれると思いますが、これはこれで良いんじゃないかなあといった感じです。
外部APIが増えてきたり、もっと複雑になるとより良さが分かるかもしれません。
おわりに
以上、PythonでDDDでクリーンアーキテクチャで実装してみました。
今回実装してみて、個人的に感じた一番のことは、
ずばり importとcastが面倒 です。
書き疲れたので雑に終わりますが、ここまでPythonで実装しているものをあまり見かけなかったので、今回共有させていただきました。
Discussion