【FastAPI】で認証システムを構築しよう

2024/09/09に公開

前回の記事では、FastAPIをMySQLに接続する方法を紹介しました。
https://zenn.dev/xronotech/articles/46d6dec78a43ce
今回は、その続きとして認証システムを構築の方法を紹介します。

Cookieを使ったJWT認証の仕組み

Cookieとは

Cookieは、Webサーバーがブラウザに保存する小さなテキストデータです。ブラウザは、そのサーバーに対して後続のリクエストを送る際に、自動的にこのCookieを含めて送信します。

認証フローの概要

  1. ユーザーがログインフォームに認証情報(メールアドレスとパスワード)を入力。
  2. サーバーが認証情報を確認し、正しければセッションIDまたはトークンを生成します。
  3. 生成されたトークンをCookieとしてブラウザに返送します。
  4. 次回以降のリクエストで、ブラウザはこのCookieをサーバーに送信し、ユーザーが認証済みであることを示します。

JWT認証とは

JWT認証は、モダンなWebアプリケーションで広く採用されているステートレスな認証方式です。従来のセッション管理と異なり、サーバー側でユーザーの状態を保持する必要がないため、スケーラビリティに優れています。
仕組みとしてはユーザーは、ログイン後に情報を得たいときなど、サーバー側にHTTPリクエストを送ります。その時に、ログイン時に発行されたJWT(=トークン)を、リクエストと一緒に含めて送ります。 そしてサーバー側はリクエストを受け取る度にそのJWTが有効かを検証することで、登録されているユーザー本人からのリクエストなのかを確認することができるという仕組みです。

トークンには2種類存在します。

アクセストークンとリフレッシュトークン

アクセストークン:

  • 比較的短い有効期限(15-30分程度)
  • API認証に使用
  • ペイロードにユーザーIDや権限情報を含む
  • 署名により改ざんを検知

リフレッシュトークン:

  • 長めの有効期限(数日〜数週間)
  • 新しいアクセストークンの取得にのみ使用
  • データベースで管理し、無効化可能
  • セキュリティリスクを最小限に抑える

全体の認証の流れ

実装

ディレクトリ構成

基本的に前回と同じディレクトリ構成を使いますが、docker-compose.ymlにMySQLサービスを追加します。

fastapi-docker-example/
│
├── app/
│   ├── config
│   │    ├── __init__.py
│   │    ├── security.py # セキュリティ設定
│   │    ├── settings.py # 環境変数と設定
│   │    └── database.py
│   ├── models
│   │    └── user.py
│   ├── middleware/      # ミドルウェア
│   │   ├── __init__.py
│   │   └── auth.py      # 認証ミドルウェア
│   ├── repositories/    # データベース操作
│   │   ├── __init__.py
│   │   ├── admin.py     # 管理者リポジトリ
│   │   └── token.py     # トークンリポジトリ
│   ├── routes
│   │    └── auth.py
│   ├── services/        # ビジネスロジック
│   │   ├── __init__.py
│   │   ├── auth.py      # 認証サービス
│   │   └── token.py     # トークンサービス
│   ├── schemas
│   └── main.py
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md

Userモデルの定義

models/user.pyでは、ユーザーモデルを定義します。このモデルには、ユーザーの基本情報(名前、メールアドレス、パスワード)が含まれています。また、スキーマファイルではユーザーのリクエスト形式を定義しています。

models.py
from sqlalchemy import Column, Integer, String
from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), index=True)
    email = Column(String(50), unique=True, index=True)
    hashed_password = Column(String(100))
    access_tokens = relationship("UserAccessToken", back_populates="user")
    refresh_tokens = relationship("UserRefreshToken", back_populates="user")

class UserAccessToken(Base):
    __tablename__ = "user_access_tokens"
    id: Mapped[str] = mapped_column(
        String(36), primary_key=True, default=lambda: str(uuid.uuid4())
    )
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    access_key: Mapped[str] = mapped_column(
        String(250), nullable=True, index=True, default=None
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, server_default=func.now()
    )
    expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    user = relationship("User", back_populates="access_tokens")


class UserRefreshToken(Base):
    __tablename__ = "user_refresh_tokens"
    id: Mapped[str] = mapped_column(
        String(36), primary_key=True, default=lambda: str(uuid.uuid4())
    )
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    refresh_key: Mapped[str] = mapped_column(
        String(250), nullable=True, index=True, default=None
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, server_default=func.now()
    )
    expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    user = relationship("User", back_populates="refresh_tokens")

スキーマの定義(schemas.py)

Userモデルはユーザーの基本情報(名前、メールアドレス、パスワード)を格納し、スキーマファイルではユーザー登録リクエストを定義します。

schemas.py
from pydantic import BaseModel

# ユーザ登録時
class RegisterUserRequest(BaseModel):
    name: str
    email: EmailStr
    password: str

アカウント作成

ユーザーが新規登録を行う際、サーバーはまずメールアドレスが既に登録されていないかを確認します。
次に、パスワードが十分に強力であるかをチェックします。
新しいユーザーIDを生成し、ユーザーアカウントを作成します。
最後に、確認メールが自動的に送信されます。

auth.py
@router.post("/verify", status_code=status.HTTP_200_OK, response_model=MessageResponse)
async def verify_user_account(
    background_tasks: BackgroundTasks,
    data: VerifyUserRequest,
    session: Session = Depends(get_session),
):
    # メールアドレスの重複チェック
    user_exist = session.query(User).filter(User.email == data.email).first()
    if user_exist:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このメールアドレスは既に登録されています。"
        )

    # パスワード強度のチェック
    if not is_password_strong_enough(data.password):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="パスワードは最低8文字で、大文字小文字、数字、特殊文字を含む必要があります。"
        )

    # ユニークなユーザーIDの生成
    while True:
        user_id = unique_user_id()
        if not user_repository.get_user_by_user_id(session, user_id):
            break

    # ユーザーの作成
    user = user_repository.create_user(session, data, user_id)

    # アカウント確認メールの送信(バックグラウンドタスク)
    await send_account_verification_email(user, background_tasks=background_tasks)

    # 成功レスポンスの返却
    return MessageResponse(
        message="ユーザー登録が完了しました。確認メールをご確認ください。"
    )

ログイン

ログイン時には、ユーザーが入力したメールアドレスとパスワードを確認し、問題がなければJWT(JSON Web Token)を生成してクッキーに設定します。これにより、次回以降のリクエストで認証された状態が維持されます。

auth.py
@router.post("/login", status_code=status.HTTP_200_OK, response_model=bool)
async def user_login(
    response: Response,
    data: OAuth2PasswordRequestForm = Depends(),
    session: Session = Depends(get_session),
):
    # メールアドレスでユーザーを検索
    user: User | None = await user_repository.get_user_by_email(session, data.username)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このメールアドレスは登録されていません。"
        )

    # パスワードの検証
    if not verify_password(data.password, user.password):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="メールアドレスまたはパスワードが正しくありません。"
        )

    # アカウントの検証状態チェック
    if not user.verified_at:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="アカウントが未検証です。メールをご確認の上、アカウントを検証してください。"
        )

    # アカウントのアクティブ状態チェック
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="アカウントが無効化されています。サポートにお問い合わせください。"
        )

    # JWTトークンの生成とクッキーへの設定
    create_tokens(user.id, session, is_admin=False, response=response)

    return True

JWTの生成とクッキーへの設定

JWTは、ユーザーのセッションを管理するために使われるトークンです。アクセストークンとリフレッシュトークンをそれぞれ生成し、ブラウザにCookieとして設定します。これにより、ユーザーがログインしている間、サーバーはこのトークンを使ってユーザーを認証します。

アクセストークンは短期間の有効期限が設定され、リフレッシュトークンはより長期間有効です。リフレッシュトークンを使用して、アクセストークンの有効期限が切れた場合でも、新しいアクセストークンを取得できます。

def create_tokens(user_id: int, session: Session, response: Response):
    now = datetime.now(timezone.utc)

    # ユニークなキーを生成
    access_key = unique_string(50)
    refresh_key = unique_string(100)

    # トークンの有効期限を設定
    at_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    rt_expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)

    # データベースにトークンを保存
    at_token, rt_token = user_repository.create_tokens(
        db=session,
        user_id=user_id,
        access_key=access_key,
        refresh_key=refresh_key,
        at_expires=at_expires,
        rt_expires=rt_expires,
    )

    # アクセストークンのペイロードを作成
    at_payload = {
        "sub": str(user_id),
        "jti": str(at_token.id),
        "type": "access",
        "exp": int((now + at_expires).timestamp()),
        "iat": int(now.timestamp()),
    }

    # リフレッシュトークンのペイロードを作成
    rt_payload = {
        "sub": str(user_id),
        "jti": str(rt_token.id),
        "type": "refresh",
        "exp": int((now + rt_expires).timestamp()),
        "iat": int(now.timestamp()),
    }

    # JWTトークンを生成
    at_jwt = generate_token(at_payload, settings.JWT_SECRET)
    rt_jwt = generate_token(rt_payload, settings.JWT_REFRESH_SECRET)

    # クッキーにアクセストークンを設定
    response.set_cookie(
        key="access_token",
        value=f"Bearer {at_jwt}",
        httponly=True,
        secure=True,  # HTTPS環境では True に設定
        samesite="lax",  # 要件に応じて "strict" に変更可能
        max_age=int(at_expires.total_seconds()),
        expires=int((now + at_expires).timestamp()),
    )

    # クッキーにリフレッシュトークンを設定
    response.set_cookie(
        key="refresh_token",
        value=rt_jwt,
        httponly=True,
        secure=True,  # HTTPS環境では True に設定
        samesite="lax",  # 要件に応じて "strict" に変更可能
        max_age=int(rt_expires.total_seconds()),
        expires=int((now + rt_expires).timestamp()),
    )

トークンのリフレッシュ

アクセストークンの有効期限が切れた場合、リフレッシュトークンを使用して新しいアクセストークンを取得する様にしています。このプロセスは自動化されており、ユーザーは再度ログインする必要はありません。

リフレッシュされる流れ

トークンを作る関数

Cokkieを取得する関数

async def get_token_from_cookie(
    access_token: Optional[str] = Cookie(None, alias="access_token")
) -> str:
    if not access_token:
        raise HTTPException(status_code=401, detail="Not authenticated")

    # "Bearer " プレフィックスを削除(必要な場合)
    if access_token.startswith("Bearer "):
        access_token = access_token[7:]

    return access_token


async def get_refresh_token_from_cookie(
    refresh_token: Optional[str] = Cookie(None, alias="refresh_token")
):
    if not refresh_token:
        raise HTTPException(status_code=401, detail="Not authenticated")

    # "Bearer " プレフィックスを削除(必要な場合)
    if refresh_token.startswith("Bearer "):
        refresh_token = refresh_token[7:]

    return refresh_token

トークンのリフレッシュ

@router.post("/refresh", status_code=status.HTTP_200_OK, response_model=bool)
async def user_refresh_token(
    response: Response,
    session: Session = Depends(get_session),
    access_token: str = Depends(get_token_from_cookie),
    refresh_token: str = Depends(get_refresh_token_from_cookie),
) -> bool:
    rt_token_payload = get_token_payload(refresh_token, settings.JWT_REFRESH_SECRET)

    if not rt_token_payload:
        return False

    at_token_payload = get_token_payload(access_token, settings.JWT_SECRET)
    if not at_token_payload:
        return False
    rt_id = rt_token_payload.get("jti")
    at_id = at_token_payload.get("jti")
    if not rt_id or not at_id:
        return False

    user_token = user_repository.invalidate_token(session, at_id, rt_id)
    if not user_token:
        return False
    user: User = user_token.user

    # Generate the JWT token
    create_tokens(user.id, session, is_admin=False, response=response)
    return True

ミドルウェア

ミドルウェアは、HTTPリクエストが目的のエンドポイントに到達する前、またはレスポンスがクライアントに返される前に実行される処理を担当するソフトウェアコンポーネントです。リクエストとレスポンスの「中間」で動作することから、ミドルウェアと呼ばれています。

ミドルウエアの役割

共通処理の一元化
認証・認可
ログ記録
エラーハンドリング
レスポンスの加工
パフォーマンス計測

横断的な関心事の分離
ビジネスロジックとインフラストラクチャ的な処理を分離
コードの重複を防ぐ
アプリケーションの保守性を向上

実装

middleware/auth.py
class JWTAuthMiddleware(HTTPBearer):
    def __init__(self, auto_error: bool = True):
        super().__init__(auto_error=auto_error)
        self.settings = Settings()

    async def __call__(
        self,
        request: Request,
        auth_service: AuthService = Depends(get_auth_service),
    ) -> Optional[HTTPAuthorizationCredentials]:
        try:
            # Cookieからトークンを取得
            access_token = request.cookies.get("access_token")
            refresh_token = request.cookies.get("refresh_token")

            if not access_token and not refresh_token:
                if self.auto_error:
                    raise HTTPException(
                        status_code=401,
                        detail="No authentication token provided",
                        headers={"WWW-Authenticate": "Bearer"},
                    )
                return None

            # まずアクセストークンの検証を試みる
            if access_token:
                is_valid = await auth_service.validate_token(access_token)
                if is_valid:
                    return HTTPAuthorizationCredentials(
                        scheme="Bearer", credentials=access_token
                    )

            # アクセストークンが無効な場合、リフレッシュトークンを試す
            if refresh_token:
                is_refreshed = await auth_service.refresh_token(
                    response=request.state.response, token=refresh_token
                )
                if is_refreshed:
                    # 新しいアクセストークンが発行されたので、次のリクエストで使用される
                    return HTTPAuthorizationCredentials(
                        scheme="Bearer", credentials=access_token
                    )

            # どちらのトークンも無効な場合
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Invalid or expired token",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            return None

        except HTTPException:
            raise
        except Exception as e:
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Authentication failed",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            return None

# ミドルウェアを使用するためのDependency
async def get_current_user(
    auth: HTTPAuthorizationCredentials = Depends(JWTAuthMiddleware()),
    auth_service: AuthService = Depends(get_auth_service),
) -> str:
    try:
        # トークンからユーザーIDを取得
        user_id = await auth_service.get_user_id_from_token(auth.credentials)
        if not user_id:
            raise HTTPException(
                status_code=401,
                detail="Invalid authentication credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )
        return user_id
    except Exception:
        raise HTTPException(
            status_code=401,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

このミドルウェアを適応(アプリケーション全体に適用)

main.py

app = FastAPI(
    title="Your API",
    description="Your API description",
    version="1.0.0"
)

# CORSミドルウェアの設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 本番環境では適切に制限してください
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 認証が不要なルータ(例:ログイン、登録)
app.include_router(auth.router, prefix="/api/v1")

# 認証が必要なルータ
secured_router = FastAPI(dependencies=[Depends(get_current_user)])
secured_router.include_router(users.router, prefix="/users")
secured_router.include_router(other_routes.router, prefix="/other")

app.mount("/api/v1/secured", secured_router)

ローカル環境でsafariだとうまくいかない問題

secure: trueをするとhttpでの通信ができなくなるのでログインしてもトークンが取得できない問題が発生します
create_tokens時のsecureFalseにしてあげてください

response.set_cookie(
        ...
        secure=False,  # HTTP環境では False に設定
        ...
)

まとめ

この記事では、FastAPIを使ったCookieベースの認証システムの構築方法について解説しました。Cookieを利用することで、ユーザーの認証情報を効率的に管理し、セキュアな認証フローを実現できます。特に、JWTトークンを使ったアクセストークンとリフレッシュトークンの仕組みを導入することで、ユーザー体験を損なわずに認証情報を安全に保持し続けることが可能です。次回は、この認証システムをさらに拡張し、実際の運用に役立つ機能を追加していきます。

参考文献

https://fastapi.tiangolo.com/ja/advanced/response-cookies/?h=coo
https://qiita.com/zksytmkn/items/eb247779cb6cf1f9d36e

株式会社Xronotech

Discussion