👏

DDDとクリーンアーキテクチャをはじめよう-Python編

に公開

背景

ども!池田(ikedadada)です!前回はGolang編でDDD+クリーンアーキテクチャの設計と実装方針をまとめました。本記事では、同じTodo
APIをPython(FastAPI + SQLAlchemy + MySQL)で組むときのキーポイントを紹介します。Node.js編の
AsyncLocalStorage、Go編のcontext.Contextに対応するPython特有の制約、特にSQLAlchemyのセッション管理とトランザクション境界の扱いに注目してください。

実装リポジトリ:

https://github.com/ikedadada/start-ddd-and-clean-architecture

以降はbackend_python/配下を前提に説明します。

全体像と依存方向

レイヤ構成は既存記事と同じく内向き依存を徹底しています。

  • domain: エンティティ/ドメインサービス/リポジトリポート
  • application_service: ユースケースとトランザクションサービス
  • infrastructure: SQLAlchemyベースのアダプタ(リポジトリ実装・トランザクション実装)
  • presentation: FastAPIのルータ、ハンドラ、ミドルウェア
  • main: Composition Root。依存解決とミドルウェア登録を担う

Python版でも「ドメインは技術詳細を知らない」「外側が内側のインターフェースに従う」点を守りつつ、SQLAlchemyの制限をインフラ層に閉じ込めます。

SQLAlchemyの制限とセッション管理

SQLAlchemyのORMセッションはスレッド/asyncセーフではなく、「同じリクエスト内で共有する」「リクエスト境界を出たら破棄する」必要があります。そこでContextProviderインターフェースを用意し、実装側でContextVarにセッションを束縛しています。ネストしたトランザクションはSession.begin_nested()で吸収し、expire_on_commit=Falseでコミット後もドメインオブジェクトの値をそのまま扱えるようにしています。

# backend_python/src/todo_api/infrastructure/repository/context_provider.py
class ContextProviderImpl(ContextProvider[Session]):
    def __init__(self, engine: Engine) -> None:
        self._session_factory = sessionmaker(bind=engine, expire_on_commit=False)
        self._state: ContextVar[Session | None] = ContextVar("session_state", default=None)

    @contextmanager
    def transaction(self) -> Iterator[Session]:
        existing = self._state.get()
        if existing is not None:
            with existing.begin_nested():
                yield existing
            return

        session = self._session_factory()
        token = self._state.set(session)
        try:
            with session.begin():
                yield session
        finally:
            self._state.reset(token)
            session.close()

FastAPI側ではSessionMiddlewareが必ずセッションを張り、レスポンスが4xx/5xxならrollback()を呼びます。これはエラーをハンドラでキャッチしてJSONに変換する場合も、最終的なコミットタイミングをフレームワークに握らせないためです。

# backend_python/src/todo_api/presentation/middleware/session_middleware.py
class SessionMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        with self._context_provider.transaction() as session:
            response = await call_next(request)
            if response.status_code >= 400 and session.in_transaction():
                session.rollback()
            return response

ドメインモデル(Todo)

ドメイン層はPythonでも純粋なオブジェクトに徹し、ORMやFastAPIに依存しません。TodoエンティティはUUIDv7でIDを生成し、完了フラグの整合性をメソッドで守ります。SQLAlchemyとの橋渡しにはPydanticベースのTodoDTOを用意し、永続化層との変換を明示しました。

# backend_python/src/todo_api/domain/model/todo.py
class Todo:
    def __init__(self, title: str, description: str | None = None):
        self._id: UUID7 = uuid7()
        self._title = title
        self._description = description
        self._completed = False

    def mark_as_completed(self):
        if self._completed:
            raise TodoAlreadyCompletedError()
        self._completed = True

    def mark_as_uncompleted(self):
        if not self._completed:
            raise TodoNotCompletedError()
        self._completed = False

    def to_dto(self) -> TodoDTO:
        return TodoDTO(
            id=str(self.id),
            title=self.title,
            description=self.description,
            completed=self.completed,
        )

    @classmethod
    def from_dto(cls, dto: TodoDTO) -> "Todo":
        todo = cls.__new__(cls)
        todo._id = parse_uuid7(dto.id)
        todo._title = dto.title
        todo._description = dto.description
        todo._completed = dto.completed
        return todo

__new__を使って__init__をバイパスしているのは、SQLAlchemyから再構築する際に新しいUUIDを生成しないための工夫です。

リポジトリ実装(SQLAlchemyアダプタ)

リポジトリはセッションへ直接アクセスせず、常にContextProvider経由で取得します。session.merge()を使っているのは、ドメインモデルがSQLAlchemyのトラッキング対象外であるためです。mergeは「識別子が同じなら更新、なければINSERT」を実現し、flush()で即時にSQLを発行して整合性を担保します。

# backend_python/src/todo_api/infrastructure/repository/todo_repository.py
class TodoRepositoryImpl(TodoRepository):
    def find_all(self) -> list[Todo]:
        session = self.context_provider.current()
        todos = session.scalars(select(TodoDataModel)).all()
        return [todo.to_domain() for todo in todos]

    def save(self, todo: Todo) -> None:
        session = self.context_provider.current()
        todo_data_model = TodoDataModel.from_domain(todo)
        session.merge(todo_data_model)
        session.flush()

    def delete(self, todo: Todo) -> None:
        session = self.context_provider.current()
        session.execute(delete(TodoDataModel).where(TodoDataModel.id == str(todo.id)))

存在しないリソースはRepositoryNotFoundErrorに正規化し、プレゼンテーション層でHTTP
404にマッピングします。

トランザクション境界とユースケース

複数のリポジトリ操作を束ねるユースケースはTransactionServiceを受け取り、関数をRunでラップします。これにより、インフラ層のセッション制御を意識せずにトランザクション境界を確立できます。単一の書き込みしか行わないCreateだけはセッションミドルウェアに任せ、余計な抽象を増やしていません。

# backend_python/src/todo_api/application_service/usecase/update_todo_usecase.py
class UpdateTodoUsecaseImpl(UpdateTodoUsecase):
    def __init__(
        self,
        todo_repository: TodoRepository,
        transaction_service: TransactionService,
    ) -> None:
        self.todo_repository = todo_repository
        self.transaction_service = transaction_service

    def execute(self, input_dto: UpdateTodoUsecaseInput) -> UpdateTodoUsecaseOutput:
        def func() -> UpdateTodoUsecaseOutput:
            todo = self.todo_repository.find_by_id(input_dto.id)
            todo.update(title=input_dto.title, description=input_dto.description)
            self.todo_repository.save(todo)
            return UpdateTodoUsecaseOutput(todo=todo)

        return self.transaction_service.Run(func)

トランザクション実装側ではContextProvider.transaction()を呼び出すだけなので、依存方向は内向きのままです。

プレゼンテーション層(FastAPI)

FastAPIのハンドラは入出力変換に集中し、ユースケースエラーをHTTPエラーへマッピングします。例としてUpdateTodoHandlerはRepository層のRepositoryNotFoundErrorを404に変換しています。

# backend_python/src/todo_api/presentation/handler/update_todo_handler.py
class UpdateTodoHandler:
    def handle(self, id: UUID7, request: UpdateTodoRequest) -> UpdateTodoResponse:
        try:
            result = self.update_todo_usecase.execute(
                UpdateTodoUsecaseInput(id=id, title=request.title, description=request.description)
            )
        except RepositoryNotFoundError:
            raise NotFoundError(message="Todo not found")
        return UpdateTodoResponse(
            id=str(result.todo.id),
            title=result.todo.title,
            description=result.todo.description,
            completed=result.todo.completed,
        )

main.pyではComposition
Rootとしてすべての依存を組み立て、ミドルウェアを登録します。この構造はNode.js編のmain.ts、Go編のcmd/server/main.goと対応しています。

# backend_python/main.py(抜粋)
engine = create_engine(DATABASE_URL, echo=True)
context_provider = ContextProviderImpl(engine=engine)
todo_repository = TodoRepositoryImpl(context_provider=context_provider)
transaction_service = TransactionServiceImpl(context_provider=context_provider)

create_todo_usecase = CreateTodoUsecaseImpl(todo_repository=todo_repository)
update_todo_usecase = UpdateTodoUsecaseImpl(
    todo_repository=todo_repository, transaction_service=transaction_service
)

app.add_middleware(SessionMiddleware, context_provider=context_provider)
app.add_middleware(ErrorHandler)
app.include_router(todo_router(todo_router_container))

テスト戦略

Python版もレイヤ単位でテストを配置しています。

  • domain: エンティティの状態遷移とDTO変換を確認(tests/domain/test_todo.py
  • application_service: ユースケースの入出力と例外ハンドリングを確認
  • infrastructure: コンテキストプロバイダやトランザクションサービスがコミット/ロールバックすることを実 DB で検証
  • presentation: ミドルウェアとハンドラのエラーマッピング、ルータのHTTP仕様をテスト

特にtests/infrastructure/service/test_transaction_service.pyでは、ネストされたトランザクションが外側に影響しないことを確認しています。SQLAlchemyの制限に対して仕組みが正しく機能するかを自動テストで担保しているのがポイントです。

また、インフラ層ではsession.merge()ON DUPLICATE KEYに相当するMySQL固有の挙動を利用してアップサートを成立させているため、SQLiteなどの簡易DBに切り替えると動作差分が出てしまいます。この理由からtests/infrastructure/conftest.pyではTestcontainersでMySQLを起動し、実際のSQL方言と同じ環境で検証しています。

まとめ

  • SQLAlchemyのセッションはContextVarとミドルウェアでリクエスト境界に閉じ込め、ネスト時はbegin_nested()で吸収
  • ドメインはORMを知らず、DTO経由で永続化層とやり取りすることでテスタビリティと整合性を確保
  • トランザクションはTransactionServiceを介してユースケースに集中させ、FastAPIの例外ハンドリングと連携してHTTPレスポンスを統一

PythonでもDDD+クリーンアーキテクチャの骨格はそのままに、言語固有の制約をインフラレイヤで解消すれば快適に進められます。

次回はJavaでの実装例を紹介します。お楽しみに!

GitHubで編集を提案

Discussion