【FastAPI】で認証システムを構築しよう

2024/09/09に公開

前回の記事では、FastAPIをMySQLに接続する方法を紹介しました。
https://zenn.dev/xronotech/articles/46d6dec78a43ce
今回は、その続きとして認証システムを構築の方法を紹介します。

Cookieを使った認証の仕組み

Cookieとは

Cookieは、Webサーバーがブラウザに保存する小さなテキストデータです。ブラウザは、そのサーバーに対して後続のリクエストを送る際に、自動的にこのCookieを含めて送信します。

認証フローの概要

  1. ユーザーがログインフォームに認証情報(メールアドレスとパスワード)を入力。
  2. サーバーが認証情報を確認し、正しければセッションIDまたはトークンを生成します。
  3. 生成されたトークンをCookieとしてブラウザに返送します。
  4. 次回以降のリクエストで、ブラウザはこのCookieをサーバーに送信し、ユーザーが認証済みであることを示します。
    下図は、認証フローの流れを示しています:

実装

ディレクトリ構成

基本的に前回と同じディレクトリ構成を使いますが、docker-compose.ymlにMySQLサービスを追加します。

fastapi-docker-example/
│
├── app/
│   ├── config
│   │    └── database.py
│   ├── models
│   │    └── user.py
│   ├── routes
│   │    └── auth.py
│   ├── schemas
│   └── main.py
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md

Userモデルの定義

models/user.pyでは、ユーザーモデルを定義します。このモデルには、ユーザーの基本情報(名前、メールアドレス、パスワード)が含まれています。また、スキーマファイルではユーザーのリクエスト形式を定義しています。

models.py
from sqlalchemy import Column, Integer, String
from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), index=True)
    email = Column(String(50), unique=True, index=True)
    hashed_password = Column(String(100))
    access_tokens = relationship("UserAccessToken", back_populates="user")
    refresh_tokens = relationship("UserRefreshToken", back_populates="user")

class UserAccessToken(Base):
    __tablename__ = "user_access_tokens"
    id: Mapped[str] = mapped_column(
        String(36), primary_key=True, default=lambda: str(uuid.uuid4())
    )
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    access_key: Mapped[str] = mapped_column(
        String(250), nullable=True, index=True, default=None
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, server_default=func.now()
    )
    expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    user = relationship("User", back_populates="access_tokens")


class UserRefreshToken(Base):
    __tablename__ = "user_refresh_tokens"
    id: Mapped[str] = mapped_column(
        String(36), primary_key=True, default=lambda: str(uuid.uuid4())
    )
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    refresh_key: Mapped[str] = mapped_column(
        String(250), nullable=True, index=True, default=None
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, server_default=func.now()
    )
    expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    user = relationship("User", back_populates="refresh_tokens")

スキーマの定義(schemas.py)

Userモデルはユーザーの基本情報(名前、メールアドレス、パスワード)を格納し、スキーマファイルではユーザー登録リクエストを定義します。

schemas.py
from pydantic import BaseModel

# ユーザ登録時
class RegisterUserRequest(BaseModel):
    name: str
    email: EmailStr
    password: str

アカウント作成

ユーザーが新規登録を行う際、サーバーはまずメールアドレスが既に登録されていないかを確認します。
次に、パスワードが十分に強力であるかをチェックします。
新しいユーザーIDを生成し、ユーザーアカウントを作成します。
最後に、確認メールが自動的に送信されます。

auth.py
@router.post("/verify", status_code=status.HTTP_200_OK, response_model=MessageResponse)
async def verify_user_account(
    background_tasks: BackgroundTasks,
    data: VerifyUserRequest,
    session: Session = Depends(get_session),
):
    # メールアドレスの重複チェック
    user_exist = session.query(User).filter(User.email == data.email).first()
    if user_exist:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このメールアドレスは既に登録されています。"
        )

    # パスワード強度のチェック
    if not is_password_strong_enough(data.password):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="パスワードは最低8文字で、大文字小文字、数字、特殊文字を含む必要があります。"
        )

    # ユニークなユーザーIDの生成
    while True:
        user_id = unique_user_id()
        if not user_repository.get_user_by_user_id(session, user_id):
            break

    # ユーザーの作成
    user = user_repository.create_user(session, data, user_id)

    # アカウント確認メールの送信(バックグラウンドタスク)
    await send_account_verification_email(user, background_tasks=background_tasks)

    # 成功レスポンスの返却
    return MessageResponse(
        message="ユーザー登録が完了しました。確認メールをご確認ください。"
    )

ログイン

ログイン時には、ユーザーが入力したメールアドレスとパスワードを確認し、問題がなければJWT(JSON Web Token)を生成してクッキーに設定します。これにより、次回以降のリクエストで認証された状態が維持されます。

auth.py
@router.post("/login", status_code=status.HTTP_200_OK, response_model=bool)
async def user_login(
    response: Response,
    data: OAuth2PasswordRequestForm = Depends(),
    session: Session = Depends(get_session),
):
    # メールアドレスでユーザーを検索
    user: User | None = await user_repository.get_user_by_email(session, data.username)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このメールアドレスは登録されていません。"
        )

    # パスワードの検証
    if not verify_password(data.password, user.password):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="メールアドレスまたはパスワードが正しくありません。"
        )

    # アカウントの検証状態チェック
    if not user.verified_at:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="アカウントが未検証です。メールをご確認の上、アカウントを検証してください。"
        )

    # アカウントのアクティブ状態チェック
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="アカウントが無効化されています。サポートにお問い合わせください。"
        )

    # JWTトークンの生成とクッキーへの設定
    create_tokens(user.id, session, is_admin=False, response=response)

    return True

JWTの生成とクッキーへの設定

JWTは、ユーザーのセッションを管理するために使われるトークンです。アクセストークンとリフレッシュトークンをそれぞれ生成し、ブラウザにCookieとして設定します。これにより、ユーザーがログインしている間、サーバーはこのトークンを使ってユーザーを認証します。

アクセストークンは短期間の有効期限が設定され、リフレッシュトークンはより長期間有効です。リフレッシュトークンを使用して、アクセストークンの有効期限が切れた場合でも、新しいアクセストークンを取得できます。

def create_tokens(user_id: int, session: Session, response: Response):
    now = datetime.now(timezone.utc)

    # ユニークなキーを生成
    access_key = unique_string(50)
    refresh_key = unique_string(100)

    # トークンの有効期限を設定
    at_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    rt_expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)

    # データベースにトークンを保存
    at_token, rt_token = user_repository.create_tokens(
        db=session,
        user_id=user_id,
        access_key=access_key,
        refresh_key=refresh_key,
        at_expires=at_expires,
        rt_expires=rt_expires,
    )

    # アクセストークンのペイロードを作成
    at_payload = {
        "sub": str(user_id),
        "jti": str(at_token.id),
        "type": "access",
        "exp": int((now + at_expires).timestamp()),
        "iat": int(now.timestamp()),
    }

    # リフレッシュトークンのペイロードを作成
    rt_payload = {
        "sub": str(user_id),
        "jti": str(rt_token.id),
        "type": "refresh",
        "exp": int((now + rt_expires).timestamp()),
        "iat": int(now.timestamp()),
    }

    # JWTトークンを生成
    at_jwt = generate_token(at_payload, settings.JWT_SECRET)
    rt_jwt = generate_token(rt_payload, settings.JWT_REFRESH_SECRET)

    # クッキーにアクセストークンを設定
    response.set_cookie(
        key="access_token",
        value=f"Bearer {at_jwt}",
        httponly=True,
        secure=True,  # HTTPS環境では True に設定
        samesite="lax",  # 要件に応じて "strict" に変更可能
        max_age=int(at_expires.total_seconds()),
        expires=int((now + at_expires).timestamp()),
    )

    # クッキーにリフレッシュトークンを設定
    response.set_cookie(
        key="refresh_token",
        value=rt_jwt,
        httponly=True,
        secure=True,  # HTTPS環境では True に設定
        samesite="lax",  # 要件に応じて "strict" に変更可能
        max_age=int(rt_expires.total_seconds()),
        expires=int((now + rt_expires).timestamp()),
    )

トークンのリフレッシュ

アクセストークンの有効期限が切れた場合、リフレッシュトークンを使用して新しいアクセストークンを取得する様にしています。このプロセスは自動化されており、ユーザーは再度ログインする必要はありません。

リフレッシュされるの流れ

トークンを作る関数

Cokkieを取得する関数

async def get_token_from_cookie(
    access_token: Optional[str] = Cookie(None, alias="access_token")
) -> str:
    if not access_token:
        raise HTTPException(status_code=401, detail="Not authenticated")

    # "Bearer " プレフィックスを削除(必要な場合)
    if access_token.startswith("Bearer "):
        access_token = access_token[7:]

    return access_token


async def get_refresh_token_from_cookie(
    refresh_token: Optional[str] = Cookie(None, alias="refresh_token")
):
    if not refresh_token:
        raise HTTPException(status_code=401, detail="Not authenticated")

    # "Bearer " プレフィックスを削除(必要な場合)
    if refresh_token.startswith("Bearer "):
        refresh_token = refresh_token[7:]

    return refresh_token

トークンのリフレッシュ

@router.post("/refresh", status_code=status.HTTP_200_OK, response_model=bool)
async def user_refresh_token(
    response: Response,
    session: Session = Depends(get_session),
    access_token: str = Depends(get_token_from_cookie),
    refresh_token: str = Depends(get_refresh_token_from_cookie),
) -> bool:
    rt_token_payload = get_token_payload(refresh_token, settings.JWT_REFRESH_SECRET)

    if not rt_token_payload:
        return False

    at_token_payload = get_token_payload(access_token, settings.JWT_SECRET)
    if not at_token_payload:
        return False
    rt_id = rt_token_payload.get("jti")
    at_id = at_token_payload.get("jti")
    if not rt_id or not at_id:
        return False

    user_token = user_repository.invalidate_token(session, at_id, rt_id)
    if not user_token:
        return False
    user: User = user_token.user

    # Generate the JWT token
    create_tokens(user.id, session, is_admin=False, response=response)
    return True

ローカル環境でsafariだとうまくいかない問題

secure: trueをするとhttpでの通信ができなくなるのでログインしてもトークンが取得できない問題が発生します
create_tokens時のsecureFalseにしてあげてください

response.set_cookie(
        ...
        secure=False,  # HTTP環境では False に設定
        ...
)

まとめ

この記事では、FastAPIを使ったCookieベースの認証システムの構築方法について解説しました。Cookieを利用することで、ユーザーの認証情報を効率的に管理し、セキュアな認証フローを実現できます。特に、JWTトークンを使ったアクセストークンとリフレッシュトークンの仕組みを導入することで、ユーザー体験を損なわずに認証情報を安全に保持し続けることが可能です。次回は、この認証システムをさらに拡張し、実際の運用に役立つ機能を追加していきます。

参考文献

https://fastapi.tiangolo.com/ja/advanced/response-cookies/?h=coo
https://qiita.com/zksytmkn/items/eb247779cb6cf1f9d36e

株式会社Xronotech

Discussion