📐

FastAPIでClean Architecture + ADRアプローチを実践する

に公開

はじめに

FastAPIでAPIサーバーを作っていると、ルーター関数にビジネスロジックやDB操作が混在して、だんだん見通しが悪くなった経験はありませんか?

今回、クリーンアーキテクチャとADRアプローチを組み合わせた設計を実践してみました。
結果、かなり保守しやすく、テストも書きやすい構成になったので、その知見を共有します。

よくある実装の問題点

FastAPIで普通にルーターを書いていくと、こんな感じになることが多いと思います。

from fastapi import APIRouter
from sqlalchemy.orm import Session

router = APIRouter()

@router.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
    # DBから直接取得
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404)
    return {"id": user.id, "name": user.name}

@router.post("/users")
def create_user(data: dict, db: Session = Depends(get_db)):
    # ビジネスロジックとDB操作が混在
    if not data.get("email"):
        raise HTTPException(status_code=400)
    user = User(**data)
    db.add(user)
    db.commit()
    return {"id": user.id}

一見シンプルで良さそうですが、規模が大きくなると以下の問題が出てきます。

  • ビジネスロジックが散在: バリデーション、計算、状態変更がルーター関数内に混在
  • DB実装に依存: SQLAlchemyのクエリが直接書かれている
  • テストが難しい: DBをモックするか、実際のDBが必要
  • 再利用できない: 同じロジックを別のエンドポイントで使いたい時に困る

規模が大きくなるほど、この問題は顕著になります。

ADRアプローチという選択肢

ADR(Action-Domain-Responder)は、MVCをWebアプリケーション向けに洗練させたパターンです。

シンプルに言うと、以下の3つに分割します。

  • 1エンドポイント = 1Action: Controllerを細かく分割
  • Domain: ビジネスロジックを独立させる
  • Responder: レスポンス生成を分離

各エンドポイントが完全に独立するので、変更の影響範囲が狭く、テストも書きやすくなります。

実装したディレクトリ構成

Clean Architectureの4層構造をベースに、こんな感じで実装しています。

app/
├── api/
│   ├── presentation/      # プレゼンテーション層(Request/Response)
│   ├── application/       # アプリケーション層(UseCase、Repository Protocol)
│   ├── domain/            # ドメイン層(Entity、ビジネスロジック)
│   ├── infra/             # インフラストラクチャ層(Repository実装、DB、外部API)
│   ├── di/                # 依存性注入
│   └── api_v1/            # APIルーター(エントリーポイント)
├── core/                  # 共通機能
└── models/                # SQLAlchemyモデル

レイヤー間の依存関係

プレゼンテーション層(APIルーター)
    ↓ 依存
アプリケーション層(UseCase + Repository Protocol)
    ↓ 依存
ドメイン層(Entity)

インフラストラクチャ層(Repository実装)
    ↑ Protocolを実装(依存性逆転)
アプリケーション層(Repository Protocol)

依存性逆転の仕組み

  • CalculateRepositoryProtocolはアプリケーション層で定義
  • CalculateRepository(実装)はインフラ層で定義し、上位層のProtocolを実装
  • UseCaseは具体的な実装(CalculateRepository)を知らず、Protocol経由でのみアクセス
  • これにより、インフラ層の変更がアプリケーション層・ドメイン層に影響しない

リクエストが来たときの流れはこうです。

  1. プレゼンテーション層:APIルーターがリクエストを受け取る
  2. アプリケーション層:UseCase(Action)がビジネスロジックを実行
  3. ドメイン層:Entityでビジネスルールを表現
  4. インフラストラクチャ層:DBや外部APIとやりとり
  5. プレゼンテーション層:Presenter(Responder)がレスポンスを整形して返却

各層が疎結合なので、テストも書きやすく、変更時の影響範囲も限定的です。

実装例を見てみる

理論だけだと分かりにくいので、具体的なコード例で説明します。ここではdata_transformツール(データ変換API)を例に、各層がどう実装されるかを見ていきましょう。

このツールは、JSON/CSV/XML/YAML形式間でのデータ変換を行います。

Entity:ビジネスの「モノ」を表現する

# app/api/domain/entities/mcp/data_processing/data_transform_result_entity.py
from dataclasses import dataclass

@dataclass(frozen=True)
class DataTransformResultEntity:
    """ドメインエンティティ(データ変換結果)"""
    
    success: bool
    transformed_data: str
    from_format: str
    to_format: str
    record_count: int
    field_count: int
    processing_time: float
    applied_mappings: dict[str, str] | None = None
    warnings: list[str] | None = None
    error: str | None = None

Entityは「不変」にしています(frozen=True)。これによって、予期せぬ状態変更を防げます。

ビジネスロジックの結果(変換されたデータ、処理時間、エラーなど)を表現するだけで、どうやってデータベースに保存するかなどのインフラ詳細は含みません。

Repository Protocol:インターフェースを定義

# app/api/application/repositories/mcp/data_processing/data_transform_repository_protocol.py
from typing import Any, Protocol, runtime_checkable
from app.api.domain.entities.mcp.data_processing import DataTransformResultEntity

@runtime_checkable
class DataTransformRepositoryProtocol(Protocol):
    """リポジトリプロトコル(データ変換ツール)"""
    
    async def transform_data(
        self,
        data: str,
        from_format: str,
        to_format: str,
        options: dict[str, Any] | None = None,
        field_mapping: dict[str, str] | None = None,
        include_headers: bool = True,
        delimiter: str = ",",
        encoding: str = "utf-8",
    ) -> DataTransformResultEntity:
        """データ変換を実行する"""
        ...

PythonのProtocolを使って、「こういうメソッドがあればOK」というインターフェースを定義しています。

これがあることで、実装を差し替えやすくなります。テスト時にモックに置き換えたり、将来別の変換ライブラリに切り替えたりが簡単です。

UseCase:ビジネスロジックの本体

# app/api/application/usecases/mcp/data_processing/data_transform_usecase.py
from app.api.application.repositories.mcp.data_processing import DataTransformRepositoryProtocol
from app.api.application.usecases.abstract_async_usecase import AbstractAsyncUsecase
from app.api.application.presenters import PresenterProtocol
from app.api.presentation.request.mcp.data_processing import DataTransformRequest
from app.api.presentation.response.mcp.data_processing import DataTransformResponse

class DataTransformUsecase(
    AbstractAsyncUsecase[
        PresenterProtocol[DataTransformRequest, DataTransformResponse]
    ]
):
    """ユースケース(データ変換ツール)"""
    
    def __init__(self, repository: DataTransformRepositoryProtocol) -> None:
        self._repository = repository
    
    async def execute(
        self,
        presenter: PresenterProtocol[DataTransformRequest, DataTransformResponse],
    ) -> None:
        """ユースケースを実行する"""
        request = presenter.request
        
        # データ変換を実行する(ドメインロジック)
        result = await self._repository.transform_data(
            data=request.data,
            from_format=request.from_format,
            to_format=request.to_format,
            field_mapping=request.field_mapping,
            include_headers=request.include_headers,
            delimiter=request.delimiter,
            encoding=request.encoding,
        )
        
        # エンティティからレスポンスに変換
        presenter.response = DataTransformResponse(
            success=result.success,
            transformed_data=result.transformed_data,
            from_format=result.from_format,
            to_format=result.to_format,
            record_count=result.record_count,
            field_count=result.field_count,
            processing_time=result.processing_time,
            applied_mappings=result.applied_mappings,
            warnings=result.warnings,
            error=result.error,
        )

UseCaseは「この機能で何をするか」を表現します。1つのUseCaseは1つのエンドポイントに対応しています。

Presenterを引数で受け取って、そこからRequestを取得し、Repositoryでビジネスロジックを実行、結果のEntityをResponseに変換してPresenterに設定します。戻り値がNoneなのは、結果をPresenterに詰めて返すためです。

Presenter:データの受け渡し役

# app/api/presentation/presenter.py
from typing import Generic, TypeVar

T = TypeVar("T")  # Request
R = TypeVar("R")  # Response

class Presenter(Generic[T, R]):
    """プレゼンター"""
    
    def __init__(self, request: T, response: R) -> None:
        self._request = request
        self._response = response
    
    @property
    def request(self) -> T:
        """リクエストデータを取得"""
        return self._request
    
    @property
    def response(self) -> R:
        """レスポンスデータを取得"""
        return self._response
    
    @request.setter
    def request(self, request: T) -> None:
        """リクエストデータを設定"""
        self._request = request
    
    @response.setter
    def response(self, response: R) -> None:
        """レスポンスデータを設定"""
        self._response = response
    
    @classmethod
    def create_presenter(cls) -> "Presenter[T, R]":
        """プレゼンターを生成"""
        return cls(cls.request, cls.response)

Presenterは、RequestとResponseを持っているだけのシンプルなクラスです。
Genericsで型を指定しているので、IDEの補完も効きますし、型チェックも通ります。

UseCaseとAPIルーターの間のデータ受け渡しをこのクラスに任せることで、責務が明確になります。

APIルーター:エンドポイントの入り口

# app/api/api_v1/mcp/routers/data_processing.py
from fastapi import APIRouter, Depends

from app.api.application.presenters import PresenterProtocol
from app.api.application.usecases.mcp.data_processing import DataTransformUsecase
from app.api.di.mcp.data_processing import (
    provide_data_transform_presenter,
    provide_data_transform_usecase,
)
from app.api.presentation.request.mcp.data_processing import DataTransformRequest
from app.api.presentation.response.mcp.data_processing import DataTransformResponse
from app.core.auth import verify_api_key

router = APIRouter()

@router.post(
    "/transform",
    operation_id="data_transform",
    description="JSON、CSV、XML、YAML形式間でのデータ変換を行います。",
    summary="データ変換ツール",
)
async def data_transform(
    request: DataTransformRequest,
    api_key: str = Depends(verify_api_key),
    presenter: PresenterProtocol[
        DataTransformRequest,
        DataTransformResponse,
    ] = Depends(provide_data_transform_presenter),
    usecase: DataTransformUsecase = Depends(provide_data_transform_usecase),
) -> DataTransformResponse:
    """
    データ変換ツール(MCPツール)
    
    JSON、CSV、XML、YAML形式間でのデータ変換を実行します。
    """
    # リクエストをPresenterに設定
    presenter.request = request
    
    # UseCaseを実行
    await usecase.execute(presenter)
    
    # レスポンスを返却
    return presenter.response

FastAPIの関数ベースルーターです。

やっていることは単純で、以下の処理フローになっています。

  1. Requestを受け取る
  2. Presenterに詰める
  3. UseCaseを実行
  4. PresenterからResponseを取り出して返す

FastAPIのDependsを使って依存性注入しているので、テストの時は簡単にモックに差し替えられます。

Repository:外部とのやりとり

# app/api/infra/repositories/mcp/data_processing/data_transform_repository.py
import csv
import json
import time
import xml.etree.ElementTree as ET
from io import StringIO
from typing import Any

import yaml

from app.api.application.repositories.mcp.data_processing import (
    DataTransformRepositoryProtocol
)
from app.api.domain.entities.mcp.data_processing import DataTransformResultEntity

class DataTransformRepository(DataTransformRepositoryProtocol):
    """リポジトリ(データ変換ツール)"""
    
    async def transform_data(
        self,
        data: str,
        from_format: str,
        to_format: str,
        field_mapping: dict[str, str] | None = None,
        include_headers: bool = True,
        delimiter: str = ",",
        encoding: str = "utf-8",
    ) -> DataTransformResultEntity:
        """データ変換を実行する"""
        start_time = time.time()
        
        try:
            # データ解析
            parsed_data = await self._parse_data(data, from_format, delimiter)
            
            # フィールドマッピング適用
            if field_mapping:
                parsed_data = await self._apply_mapping(parsed_data, field_mapping)
            
            # データ変換
            transformed = await self._convert_data(parsed_data, to_format, delimiter)
            
            processing_time = time.time() - start_time
            
            return DataTransformResultEntity(
                success=True,
                transformed_data=transformed,
                from_format=from_format,
                to_format=to_format,
                record_count=len(parsed_data) if isinstance(parsed_data, list) else 1,
                field_count=len(parsed_data[0]) if parsed_data else 0,
                processing_time=processing_time,
            )
        except Exception as e:
            return DataTransformResultEntity(
                success=False,
                transformed_data="",
                from_format=from_format,
                to_format=to_format,
                record_count=0,
                field_count=0,
                processing_time=time.time() - start_time,
                error=f"変換エラー: {str(e)}",
            )
    
    async def _parse_data(self, data: str, format_type: str, delimiter: str) -> Any:
        """データを解析する(プライベートメソッド)"""
        if format_type == "json":
            return json.loads(data)
        elif format_type == "csv":
            return list(csv.DictReader(StringIO(data), delimiter=delimiter))
        elif format_type == "yaml":
            return yaml.safe_load(data)
        # ... その他の形式

Repositoryは、先ほど定義したProtocolの実装です。

実際の変換処理(JSON、CSV、XMLのパース・変換)を行いますが、UseCaseからはProtocol越しにしか見えないので、実装の詳細が隠蔽されています。

例えば、将来pandasを使った実装に変えたくなっても、Protocolを実装した新しいRepositoryを作るだけでOKです。UseCaseは変更不要です。

依存性注入:パーツを組み立てる

# app/api/di/mcp/data_processing/data_transform_di.py
from fastapi import Depends

from app.api.application.presenters import PresenterProtocol
from app.api.application.repositories.mcp.data_processing import (
    DataTransformRepositoryProtocol
)
from app.api.application.usecases.mcp.data_processing import DataTransformUsecase
from app.api.infra.repositories.mcp.data_processing import DataTransformRepository
from app.api.presentation.presenter import Presenter
from app.api.presentation.request.mcp.data_processing import DataTransformRequest
from app.api.presentation.response.mcp.data_processing import DataTransformResponse

def provide_data_transform_presenter() -> (
    PresenterProtocol[DataTransformRequest, DataTransformResponse]
):
    """プレゼンターを提供する"""
    return Presenter[
        DataTransformRequest,
        DataTransformResponse,
    ].create_presenter()

def provide_repository() -> DataTransformRepositoryProtocol:
    """リポジトリを提供する"""
    return DataTransformRepository()

def provide_data_transform_usecase(
    repository: DataTransformRepositoryProtocol = Depends(provide_repository),
) -> DataTransformUsecase:
    """ユースケースを提供する"""
    return DataTransformUsecase(repository=repository)

各層のインスタンスを生成する関数を定義しています。

FastAPIのDependsで使えるように、関数ベースで実装しているのがポイントです。

依存関係が明示的なので、「このUseCaseは何が必要か」が一目瞭然です。テスト時には、これらの関数をオーバーライドしてモックを注入することもできます。

この構成の良いところ

変更に強い

例えば、データ変換のロジックを変更したいとします。

この構成なら、DataTransformRepositoryだけを修正すればOKです。UseCaseやAPIルーターは一切触る必要がありません。

逆に、レスポンスの形式を変えたい場合はDataTransformResponseだけ修正すればよく、ビジネスロジックに影響しません。

各層が疎結合なので、変更の影響範囲が明確で、安心してリファクタリングできます。

テストが書きやすい

UseCaseのテストを書く場合、以下のように書くことができます。

# モックのRepository
class MockDataTransformRepository:
    async def transform_data(self, data, from_format, to_format, **kwargs):
        return DataTransformResultEntity(
            success=True,
            transformed_data='{"name": "test"}',
            from_format=from_format,
            to_format=to_format,
            record_count=1,
            field_count=1,
            processing_time=0.001,
        )

# テスト
usecase = DataTransformUsecase(MockDataTransformRepository())
presenter = Presenter(request, response)
await usecase.execute(presenter)
assert presenter.response.success == True
assert presenter.response.record_count == 1

Repositoryをモックに差し替えるだけで、実際のJSON/CSV変換ロジックに依存せずテストできます。

新機能の追加が楽

新しいエンドポイントを追加する時も、既存のコードを触らずに済みます。

  1. 新しいEntity、Request、Responseを作る
  2. Repository Protocolと実装を追加
  3. UseCaseを実装
  4. APIルーターに関数を追加
  5. DIで結線

他のエンドポイントとは完全に独立しているので、影響範囲を気にせず実装できます。

他の言語・フレームワークでも同じ

ここまでFastAPI(Python)で説明してきましたが、このアーキテクチャは言語に依存しません

以下に、実装例を挙げてみます。

PHP(Laravel)の場合

// Entity
readonly class DataTransformResultEntity {
    public function __construct(
        public bool $success,
        public string $transformedData,
        public string $fromFormat,
        public string $toFormat,
        public int $recordCount,
        public int $fieldCount,
        public float $processingTime,
        public ?array $appliedMappings = null,
        public ?array $warnings = null,
        public ?string $error = null,
    ) {}
}

// Repository Interface
interface DataTransformRepositoryInterface {
    public function transformData(
        string $data,
        string $fromFormat,
        string $toFormat,
        ?array $fieldMapping = null,
        bool $includeHeaders = true,
        string $delimiter = ',',
    ): DataTransformResultEntity;
}

// UseCase
class DataTransformUsecase {
    public function __construct(
        private DataTransformRepositoryInterface $repository
    ) {}
    
    public function execute(DataTransformPresenter $presenter): void {
        $request = $presenter->getRequest();
        $result = $this->repository->transformData(
            data: $request->data,
            fromFormat: $request->fromFormat,
            toFormat: $request->toFormat,
            fieldMapping: $request->fieldMapping,
        );
        $presenter->setResponse(new DataTransformResponse($result));
    }
}

// APIルーター
Route::post('/data/transform', function (
    DataTransformRequest $request,
    DataTransformUsecase $usecase,
    DataTransformPresenter $presenter
) {
    $usecase->execute($presenter);
    return $presenter->getResponse();
});

Ruby(Rails)の場合

# Entity
class DataTransformResultEntity
  attr_reader :success, :transformed_data, :from_format, :to_format,
              :record_count, :field_count, :processing_time,
              :applied_mappings, :warnings, :error

  def initialize(success:, transformed_data:, from_format:, to_format:,
                 record_count:, field_count:, processing_time:,
                 applied_mappings: nil, warnings: nil, error: nil)
    @success = success
    @transformed_data = transformed_data
    @from_format = from_format
    @to_format = to_format
    @record_count = record_count
    @field_count = field_count
    @processing_time = processing_time
    @applied_mappings = applied_mappings
    @warnings = warnings
    @error = error
    freeze # イミュータブルに
  end
end

# UseCase
class DataTransformUsecase
  def initialize(repository)
    @repository = repository
  end
  
  def execute(presenter)
    request = presenter.request
    result = @repository.transform_data(
      data: request.data,
      from_format: request.from_format,
      to_format: request.to_format,
      field_mapping: request.field_mapping
    )
    presenter.response = DataTransformResponse.new(result)
  end
end

# APIルーター
post '/data/transform' do
  presenter = DataTransformPresenter.new(request, response)
  usecase = DataTransformUsecase.new(DataTransformRepository.new)
  usecase.execute(presenter)
  presenter.response.to_json
end

TypeScript(NestJS)の場合

// Entity
export class DataTransformResultEntity {
  constructor(
    public readonly success: boolean,
    public readonly transformedData: string,
    public readonly fromFormat: string,
    public readonly toFormat: string,
    public readonly recordCount: number,
    public readonly fieldCount: number,
    public readonly processingTime: number,
    public readonly appliedMappings?: Record<string, string>,
    public readonly warnings?: string[],
    public readonly error?: string,
  ) {}
}

// Repository Interface
export interface DataTransformRepositoryInterface {
  transformData(
    data: string,
    fromFormat: string,
    toFormat: string,
    fieldMapping?: Record<string, string>,
    includeHeaders?: boolean,
    delimiter?: string,
  ): Promise<DataTransformResultEntity>;
}

// UseCase
@Injectable()
export class DataTransformUsecase {
  constructor(
    private readonly repository: DataTransformRepositoryInterface
  ) {}
  
  async execute(presenter: DataTransformPresenter): Promise<void> {
    const request = presenter.request;
    const result = await this.repository.transformData(
      request.data,
      request.fromFormat,
      request.toFormat,
      request.fieldMapping,
    );
    presenter.response = new DataTransformResponse(result);
  }
}

// APIルーター
@Post('/data/transform')
async dataTransform(
  @Body() request: DataTransformRequest,
  @Inject() usecase: DataTransformUsecase,
  @Inject() presenter: DataTransformPresenter
): Promise<DataTransformResponse> {
  presenter.request = request;
  await usecase.execute(presenter);
  return presenter.response;
}

どの言語でも、基本的な構造は同じになります。

  • Entity: データクラス(dataclass, Value Object, interface)
  • Repository Protocol: インターフェース(interface, abstract class, Protocol)
  • UseCase: ビジネスロジックのクラス
  • Presenter: Request/Responseの受け渡し
  • DI: 各言語のDIコンテナ(Laravel Container, Rails DI, NestJS DI など)

このアーキテクチャを一度理解すれば、別の言語に移っても同じ設計思想で実装できるのが大きなメリットです。

まとめ

FastAPIでクリーンアーキテクチャ + ADRアプローチを実践してみて、以下のメリットがあると実感しています。

  • 責務が明確:どこに何があるか迷わない
  • テストしやすい:各層を独立してテストできる
  • 変更に強い:影響範囲が限定的
  • チーム開発向き:各層を並行して開発できる
  • 言語非依存:他の言語でも同じ設計思想で実装可能

最初は「レイヤーが多くて面倒そう」と思うかもしれませんが、規模が大きくなるほどこの構成の恩恵を受けられます。

特に、複数人で長期的に保守していくプロジェクトや、マイクロサービスで複数の言語を使うプロジェクトには向いていると感じました。

Discussion