【FastAPI】で認証システムを構築しよう
前回の記事では、FastAPIをMySQLに接続する方法を紹介しました。
今回は、その続きとして認証システムを構築の方法を紹介します。Cookieを使った認証の仕組み
Cookieとは
Cookieは、Webサーバーがブラウザに保存する小さなテキストデータです。ブラウザは、そのサーバーに対して後続のリクエストを送る際に、自動的にこのCookieを含めて送信します。
認証フローの概要
- ユーザーがログインフォームに認証情報(メールアドレスとパスワード)を入力。
- サーバーが認証情報を確認し、正しければセッションIDまたはトークンを生成します。
- 生成されたトークンをCookieとしてブラウザに返送します。
- 次回以降のリクエストで、ブラウザはこのCookieをサーバーに送信し、ユーザーが認証済みであることを示します。
下図は、認証フローの流れを示しています:
実装
ディレクトリ構成
基本的に前回と同じディレクトリ構成を使いますが、docker-compose.yml
にMySQLサービスを追加します。
fastapi-docker-example/
│
├── app/
│ ├── config
│ │ └── database.py
│ ├── models
│ │ └── user.py
│ ├── routes
│ │ └── auth.py
│ ├── schemas
│ └── main.py
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md
Userモデルの定義
models/user.pyでは、ユーザーモデルを定義します。このモデルには、ユーザーの基本情報(名前、メールアドレス、パスワード)が含まれています。また、スキーマファイルではユーザーのリクエスト形式を定義しています。
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), index=True)
email = Column(String(50), unique=True, index=True)
hashed_password = Column(String(100))
access_tokens = relationship("UserAccessToken", back_populates="user")
refresh_tokens = relationship("UserRefreshToken", back_populates="user")
class UserAccessToken(Base):
__tablename__ = "user_access_tokens"
id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
access_key: Mapped[str] = mapped_column(
String(250), nullable=True, index=True, default=None
)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.now()
)
expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
user = relationship("User", back_populates="access_tokens")
class UserRefreshToken(Base):
__tablename__ = "user_refresh_tokens"
id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
refresh_key: Mapped[str] = mapped_column(
String(250), nullable=True, index=True, default=None
)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.now()
)
expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
user = relationship("User", back_populates="refresh_tokens")
スキーマの定義(schemas.py)
Userモデルはユーザーの基本情報(名前、メールアドレス、パスワード)を格納し、スキーマファイルではユーザー登録リクエストを定義します。
from pydantic import BaseModel
# ユーザ登録時
class RegisterUserRequest(BaseModel):
name: str
email: EmailStr
password: str
アカウント作成
ユーザーが新規登録を行う際、サーバーはまずメールアドレスが既に登録されていないかを確認します。
次に、パスワードが十分に強力であるかをチェックします。
新しいユーザーIDを生成し、ユーザーアカウントを作成します。
最後に、確認メールが自動的に送信されます。
@router.post("/verify", status_code=status.HTTP_200_OK, response_model=MessageResponse)
async def verify_user_account(
background_tasks: BackgroundTasks,
data: VerifyUserRequest,
session: Session = Depends(get_session),
):
# メールアドレスの重複チェック
user_exist = session.query(User).filter(User.email == data.email).first()
if user_exist:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このメールアドレスは既に登録されています。"
)
# パスワード強度のチェック
if not is_password_strong_enough(data.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="パスワードは最低8文字で、大文字小文字、数字、特殊文字を含む必要があります。"
)
# ユニークなユーザーIDの生成
while True:
user_id = unique_user_id()
if not user_repository.get_user_by_user_id(session, user_id):
break
# ユーザーの作成
user = user_repository.create_user(session, data, user_id)
# アカウント確認メールの送信(バックグラウンドタスク)
await send_account_verification_email(user, background_tasks=background_tasks)
# 成功レスポンスの返却
return MessageResponse(
message="ユーザー登録が完了しました。確認メールをご確認ください。"
)
ログイン
ログイン時には、ユーザーが入力したメールアドレスとパスワードを確認し、問題がなければJWT(JSON Web Token)を生成してクッキーに設定します。これにより、次回以降のリクエストで認証された状態が維持されます。
@router.post("/login", status_code=status.HTTP_200_OK, response_model=bool)
async def user_login(
response: Response,
data: OAuth2PasswordRequestForm = Depends(),
session: Session = Depends(get_session),
):
# メールアドレスでユーザーを検索
user: User | None = await user_repository.get_user_by_email(session, data.username)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このメールアドレスは登録されていません。"
)
# パスワードの検証
if not verify_password(data.password, user.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="メールアドレスまたはパスワードが正しくありません。"
)
# アカウントの検証状態チェック
if not user.verified_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="アカウントが未検証です。メールをご確認の上、アカウントを検証してください。"
)
# アカウントのアクティブ状態チェック
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="アカウントが無効化されています。サポートにお問い合わせください。"
)
# JWTトークンの生成とクッキーへの設定
create_tokens(user.id, session, is_admin=False, response=response)
return True
JWTの生成とクッキーへの設定
JWTは、ユーザーのセッションを管理するために使われるトークンです。アクセストークンとリフレッシュトークンをそれぞれ生成し、ブラウザにCookieとして設定します。これにより、ユーザーがログインしている間、サーバーはこのトークンを使ってユーザーを認証します。
アクセストークンは短期間の有効期限が設定され、リフレッシュトークンはより長期間有効です。リフレッシュトークンを使用して、アクセストークンの有効期限が切れた場合でも、新しいアクセストークンを取得できます。
def create_tokens(user_id: int, session: Session, response: Response):
now = datetime.now(timezone.utc)
# ユニークなキーを生成
access_key = unique_string(50)
refresh_key = unique_string(100)
# トークンの有効期限を設定
at_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
rt_expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
# データベースにトークンを保存
at_token, rt_token = user_repository.create_tokens(
db=session,
user_id=user_id,
access_key=access_key,
refresh_key=refresh_key,
at_expires=at_expires,
rt_expires=rt_expires,
)
# アクセストークンのペイロードを作成
at_payload = {
"sub": str(user_id),
"jti": str(at_token.id),
"type": "access",
"exp": int((now + at_expires).timestamp()),
"iat": int(now.timestamp()),
}
# リフレッシュトークンのペイロードを作成
rt_payload = {
"sub": str(user_id),
"jti": str(rt_token.id),
"type": "refresh",
"exp": int((now + rt_expires).timestamp()),
"iat": int(now.timestamp()),
}
# JWTトークンを生成
at_jwt = generate_token(at_payload, settings.JWT_SECRET)
rt_jwt = generate_token(rt_payload, settings.JWT_REFRESH_SECRET)
# クッキーにアクセストークンを設定
response.set_cookie(
key="access_token",
value=f"Bearer {at_jwt}",
httponly=True,
secure=True, # HTTPS環境では True に設定
samesite="lax", # 要件に応じて "strict" に変更可能
max_age=int(at_expires.total_seconds()),
expires=int((now + at_expires).timestamp()),
)
# クッキーにリフレッシュトークンを設定
response.set_cookie(
key="refresh_token",
value=rt_jwt,
httponly=True,
secure=True, # HTTPS環境では True に設定
samesite="lax", # 要件に応じて "strict" に変更可能
max_age=int(rt_expires.total_seconds()),
expires=int((now + rt_expires).timestamp()),
)
トークンのリフレッシュ
アクセストークンの有効期限が切れた場合、リフレッシュトークンを使用して新しいアクセストークンを取得する様にしています。このプロセスは自動化されており、ユーザーは再度ログインする必要はありません。
リフレッシュされるの流れ
トークンを作る関数
Cokkieを取得する関数
async def get_token_from_cookie(
access_token: Optional[str] = Cookie(None, alias="access_token")
) -> str:
if not access_token:
raise HTTPException(status_code=401, detail="Not authenticated")
# "Bearer " プレフィックスを削除(必要な場合)
if access_token.startswith("Bearer "):
access_token = access_token[7:]
return access_token
async def get_refresh_token_from_cookie(
refresh_token: Optional[str] = Cookie(None, alias="refresh_token")
):
if not refresh_token:
raise HTTPException(status_code=401, detail="Not authenticated")
# "Bearer " プレフィックスを削除(必要な場合)
if refresh_token.startswith("Bearer "):
refresh_token = refresh_token[7:]
return refresh_token
トークンのリフレッシュ
@router.post("/refresh", status_code=status.HTTP_200_OK, response_model=bool)
async def user_refresh_token(
response: Response,
session: Session = Depends(get_session),
access_token: str = Depends(get_token_from_cookie),
refresh_token: str = Depends(get_refresh_token_from_cookie),
) -> bool:
rt_token_payload = get_token_payload(refresh_token, settings.JWT_REFRESH_SECRET)
if not rt_token_payload:
return False
at_token_payload = get_token_payload(access_token, settings.JWT_SECRET)
if not at_token_payload:
return False
rt_id = rt_token_payload.get("jti")
at_id = at_token_payload.get("jti")
if not rt_id or not at_id:
return False
user_token = user_repository.invalidate_token(session, at_id, rt_id)
if not user_token:
return False
user: User = user_token.user
# Generate the JWT token
create_tokens(user.id, session, is_admin=False, response=response)
return True
ローカル環境でsafariだとうまくいかない問題
secure: true
をするとhttpでの通信ができなくなるのでログインしてもトークンが取得できない問題が発生します
create_tokens
時のsecure
をFalse
にしてあげてください
response.set_cookie(
...
secure=False, # HTTP環境では False に設定
...
)
まとめ
この記事では、FastAPIを使ったCookieベースの認証システムの構築方法について解説しました。Cookieを利用することで、ユーザーの認証情報を効率的に管理し、セキュアな認証フローを実現できます。特に、JWTトークンを使ったアクセストークンとリフレッシュトークンの仕組みを導入することで、ユーザー体験を損なわずに認証情報を安全に保持し続けることが可能です。次回は、この認証システムをさらに拡張し、実際の運用に役立つ機能を追加していきます。
参考文献
Discussion