🐧

FastAPIアプリケーション〜環境設定からログイン処理まで〜

2024/01/28に公開

poetryを使用して環境を設定

poetryを使用して、開発環境を設定します。
開発環境の設定は、下記の記事が参考になりました。
https://zenn.dev/takuty/articles/b83c70c32820bb

FastAPIの起動

上記の記事では、現在のFastAPIの起動コマンドと異なります。
現在は下記のコマンドで行います。
なお、poetryを経由して行いますので、注意してください。

bash
poetry run fastapi dev main.py

poetryを使用して、必要なライブラリなどをインストール

FastAPIで開発するためのライブラリなどをインストールしていきます。

bash
poetry add [インストールするライブラリ]

【インストールするライブラリ】

  • fastapi[standard]
  • sqlmodel
  • uuid
  • alembic
  • psycopg2-binary
  • passlib
  • python-dotenv
  • python-jose
  • bcrypt
  • pgcli
  • pandas
  • openpyxl

それぞれがインストールされると、pyproject.tomlにライブラリが記載されます。

FastAPIの初期設定

メインの処理となるファイルを作成します。
今回はbackendディレクトにv1ディレクトリを作成し、その中にコードを書いていきます。

コードはこちら
backend/v1/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware

import time

from v1.core.config import settings
from v1.router import user, auth

def get_application():
  
  app = FastAPI(
    title = settings.PROJECT_NAME,
    version = settings.PROJECT_VERSION,
    description = settings.PROJECT_DESCRIPTION,
  )
  
  origins = [
    'http://localhost:3000',
  ]
  
  app.add_middleware (
    CORSMiddleware,
    allow_origins = origins,
    allow_credentials = True,
    allow_methods = ['*'],
    allow_headers = ['*'],
  )
  
  return app

app = get_application()

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
  start_time = time.time()
  response = await call_next(request)
  process_time = time.time() - start_time
  response.headers['X-Process-Time'] = str(process_time)
  return response

参考にした記事はこちら
https://fastapi.tiangolo.com/ja/tutorial/middleware/
https://nmomos.com/tips/2021/01/23/fastapi-docker-1/

アプリケーションの設定

FastAPIのアプリケーションを作っていきます。
まずは全体の共通設定とデータベースの設定を書いていきます。

コードはこちら
backend/v1/core/config.py
import os

from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer

from dotenv import load_dotenv

load_dotenv() # ()には必要に応じてファイルパスを記載

class Settings:
  # 
  AUTH_API_PROJECT_NAME: str = "Application"
  AUTH_API_PROJECT_VERSION: str = "0.1.0"
  AUTH_API_DESCRIPTION: str = """
  アプリケーションに関する説明などを記載(markdownで書けば、反映される)
  """

  # DB関連の設定
  POSTGRES_USER: str = os.environ['POSTGRES_USER']
  POSTGRES_PASSWORD: str = os.environ['POSTGRES_PASSWORD']
  POSTGRES_DOCKER: str = os.environ['POSTGRES_DOCKER']
  POSTGRES_PORT: str = os.environ['POSTGRES_PORT']
  POSTGRES_DB: str = os.environ['POSTGRES_DB']

  DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
  
  pwd_context = CryptContext(schemes = ['bcrypt'], deprecated = 'auto')
  oauth2_schema = OAuth2PasswordBearer(tokenUrl = '/user/token')

settings = Settings()

PostgresQLのための環境変数設定

  • os.environ['key']は、環境変数(keyで指定)の読み込み(環境変数がない場合はkeyErrorを返す)
  • os.environ['key']は、keyのほか、第2引数としてデフォルト値の設定が可能
    環境変数キーが存在しない場合はデフォルト値を返す

PostgreSQLのデータベースURLの設定

【同期処理の場合】
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"

【非同期処理の場合】
DATABASE_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"

データベースの設定

settingsに書いた設定を使用して、データベースの設定をしていきます。

コードはこちら
backend/v1/db/db.py
from sqlmodel import SQLModel, create_engine, Session
from sqlmodel.sql.expression import Select, SelectOfScalar

from ..core.config import settings

DATABASE_URL = settings.DATABASE_URL

SelectOfScalar.inherit_cache = True
Select.inherit_cache = True

# create_engine init
engine = create_engine(DATABASE_URL, echo=True)

# create table
def init_db():
    SQLModel.metadata.create_all(engine)


# Dependency for FastAPI
def get_session():
    with Session(engine) as session:
        yield session

FastAPIのスキーマ設定

データベースの構造にもなるスキーマを書いていきます。
今回はSQLModelを使用しています。

コードはこちら
backend/v1/schema/user.py
from sqlmodel import SQLModel, Field, AutoString
from typing import Optional
from pydantic import EmailStr
import uuid
from datetime import datetime


class BaseUser(SQLModel):
    username: str = Field(nullable = False, default = None, unique = True)
    email: EmailStr =Field(unique = True, index = True, sa_type=AutoString)
    is_active: bool = Field(default = True)
    is_superuser: bool = Field(default = False)


class User(BaseUser, table = True):
    id: Optional[int] = Field(default = None, primary_key = True)
    uuid: str = Field(default_factory = uuid.uuid4, nullable = False)
    hashed_password: str = Field(nullable = False)
    created_at: datetime = Field(default = datetime.now(), nullable = False)
    updated_at: datetime = Field(default_factory = datetime.now, nullable = False, sa_column_kwargs = {'onupdate': datetime.now})
    refresh_token: str = Field(nullable = True)


class UserOut(SQLModel):
    id: int
    uuid: str
    username: str
    email: str
    is_active: bool
    is_superuser: bool
    created_at: datetime
    updated_at: datetime
    refresh_token: str


class UserLogin(SQLModel):
    email: str
    password: str

class UserCreate(SQLModel):
    username: str
    email: EmailStr
    password: str


class UserRead(BaseUser):
    id: int
    uuid: str
    created_at: datetime
    updated_at: datetime


class UserUpdate(BaseUser):
    username: Optional[str] = None
    email: Optional[EmailStr] = None


class Token(SQLModel):
    access_token: str
    refresh_token: str
    token_type: str

class TokenData(SQLModel):
    uuid: str | None = None

FastAPIでユーザー情報取得するためのコードを書く

APIとしてデータの取得をするためのコードを書きます。

コードはこちら
backend/v1/crud/auth.py
import os
from fastapi import HTTPException, status

from typing import List

# schema
from ..schema.user import User, UserOut

# sqlmodel
from sqlmodel import select

# uuid
import uuid as uuid_pkg

# settings
from ..core.config import settings

# pandas
import pandas as pd


def check_user(session, user):
    checked_user = session.exec(
        select(User)
        .where(User.username == user.username)
        .where(User.email == user.email)
    ).first()
    return checked_user


def get_hashed_password(password):
    return settings.pwd_context.hash(password)


def verfy_password(password, hashed_password):
    return settings.pwd_context.verify(password, hashed_password)


def get_all_user(session):
    all_user = select(User)
    user_list: List[UserOut] = session.exec(all_user)


def update_user(session, uuid, user):
    db_user = session.exec(select(User).where(User.uuid == uuid)).first()
    
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail = "User not found."
        )
    
    user_data = user.model_dump(exclude_unset = True)

    for key, value in user_data.items():
        setattr(db_user, key, value)
    
    session.add(db_user)
    session.commit()
    session.refresh(db_user)
    return db_user


def delete_user(session, uuid):
    delete_user = get_user_by_uuid(session, uuid)
    
    if not delete_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail = "User not found."
        )
    
    session.delete(delete_user)
    session.commit()
    return {"message": "User was deleted."}


def get_user_by_uuid(session, uuid: str):
    return session.query(User).filter(User.uuid == uuid).first()


def get_user_by_email(session, email: str):
    return session.query(User).filter(User.email == email).first()


"""一括登録用のメソッド"""
def users_register(session, file_path: str):

    failed_users = []

    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path)
    elif file_path.endswith(".xlsx"):
        df = pd.read_excel(file_path)
    else:
        raise ValueError(
            "Invalid file format. Please provide a CSV or EXCEL file."
        )
    
    for index, row in df.iterrows():
        # A: username, B: email, C:password, D:is_active, E: is_superuser
        username = row["username"]
        email = row["email"]
        password = row["password"]
        is_active = row["is_active"]
        is_superuser = ["is_superuser"]

        if pd.isna(username) or pd.isna(email) or pd.isna(password):
            failed_users.append({
                "username": username,
                "error": "Username, email, password are required fired."
            })
            continue

        try:
            hashed_password = get_hashed_password(password)

            # 既存ユーザーの検索
            existing_user = session.query(User).filter((User.username == username) | (User.email == email)).first()

            if existing_user:
                # ユーザーが存在する場合、情報を更新(空欄の場合はその項目をスキップ)
                if not pd.isna(username):
                    existing_user.username = username
                if not pd.isna(email):
                    existing_user.email = email
                if not pd.isna(password):
                    existing_user.hashed_password = hashed_password
                if not pd.isna(is_active):
                    existing_user.is_active = is_active
                if not pd.isna(is_superuser):
                    existing_user.is_superuser = is_superuser
            
            else:
                # ユーザーが存在しない場合、新規作成
                new_user = User(
                    uuid = str(uuid_pkg.uuid4()),
                    username = username,
                    email = email,
                    hashed_password=hashed_password,
                    is_active=is_active,
                    is_superuser=is_superuser,
                )

                session.add(new_user)

        except Exception as e:
            failed_users.append({
                "username": username,
                "error": str(e)
            })
    
    session.commit()

    return failed_users

FastAPIでAPI出力

次にAPIドキュメント(OpenAPIで出力される)を生成するためのコードを書いていきます。

コードはこちら
backend/v1/router/user.py
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlmodel import Session, select
from typing import List

import shutil

# functions
from ..crud.user import (
    check_user,
    update_user,
    delete_user,
    get_hashed_password,
    users_register,
)

# schemas
from ..schema.user import (
    User,
    UserCreate,
    UserOut,
    UserRead,
    UserUpdate,
)

import uuid as uuid_pkg

# db
from ..db.db import get_session


router = APIRouter(
    prefix="/user",
    tags=["User"]
)

# sign up
@router.post("/signup", response_model=UserOut)
def create_user(userIn: UserCreate, session: Session = Depends(get_session)):
    user = check_user(session, userIn)

    if user:
        raise HTTPException(
            status_code=409,
            detail = "Username or email has been used. Please change to other username or email."
        )
    
    else:
        new_user = User(
            username=userIn.username,
            email = userIn.email,
            is_active=True,
            is_superuser=False,
            hashed_password=get_hashed_password(userIn.password),
            refresh_token=""
        )

        session.add(new_user)
        session.commit()
        session.refresh(new_user)
        return new_user


@router.post("/upload-users")
def upload_users(
    session: Session = Depends(get_session),
    file: UploadFile = File(...)
):
    file_location = f"temp_{file.filename}" # 一時的にファイルを保存
    
    with open(file_location, "wb+") as file_object:
        shutil.copyfileobj(file.file, file_object)

    # 登録処理
    result = users_register(session, file_location)

    if result:
        return {
            "message": "Some users failed to register. Please check each item.",
            "failed_users": result
        }
    else:
        return {"message": "Users registered successfully."}


@router.get("/all_user", response_model=List[UserOut])
def read_all_user(session: Session = Depends(get_session)):
    all_user = session.exec(
        select(User)
    ).all()
    return all_user

@router.get("/{user_uuid}", response_model=UserOut)
def get_user(user_uuid: str, session: Session = Depends(get_session)):
    user = session.get(User, user_uuid)
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found."
        )
    
    return user


@router.patch("/edit_user/{user_uuid}", response_model=UserOut)
def edit_user(
    *,
    session: Session=Depends(get_session),
    user: UserUpdate,
    user_uuid: str
):
    return update_user(session, user_uuid, user)

@router.delete("/delete_user/{user_uuid}")
def delete_user(
    *,
    session: Session=Depends(get_session),
    uuid: str
):
    user = session.get(User, uuid)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found."
        )
    return delete_user(session, uuid)

main.pyで設定したrouterを読み込む

main.pyでrouterを読み込みます。

コードはこちら
backend/v1/app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware

import time

from v1.core.config import settings
from v1.router import user, auth

def get_application():
  
  app = FastAPI(
    title = settings.PROJECT_NAME,
    version = settings.PROJECT_VERSION,
    description = settings.PROJECT_DESCRIPTION,
  )
  
  origins = [
    'http://localhost:3000',
  ]
  
  app.add_middleware (
    CORSMiddleware,
    allow_origins = origins,
    allow_credentials = True,
    allow_methods = ['*'],
    allow_headers = ['*'],
  )
  
  return app

app = get_application()

# include router
+ app.include_router(user.router)

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
  start_time = time.time()
  response = await call_next(request)
  process_time = time.time() - start_time
  response.headers['X-Process-Time'] = str(process_time)
  return response

https://fastapi.tiangolo.com/tutorial/bigger-applications/
https://fastapi.tiangolo.com/ja/reference/apirouter/

データベースの設定

データベースはalembicを使用して設定していきます。

まずは、以下のコマンドを実行します。

bash
poetry run alembic init migrations

これにより、migrationsディレクトリが作成され、いくつかファイルが作成されるため、これらのファイルに追記していきます。

env.pyのコードはこちら
backend/migrations/env.py
from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# add
+ import os
+ from sqlmodel import SQLModel
+ from v1.schema.user import User

# add
+ from dotenv import load_dotenv
+ load_dotenv()

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
- target_metadata = None
+ target_metadata = SQLModel.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

+    # set_section_option (セクション: str 、名前: str 、値: str )
+    config.set_section_option('alembic', 'POSTGRES_USER', os.environ['POSTGRES_USER']) # add
+    config.set_section_option('alembic', 'POSTGRES_PASSWORD', os.environ['POSTGRES_PASSWORD']) # add
+    config.set_section_option('alembic', 'POSTGRES_DOCKER', os.environ['POSTGRES_DOCKER']) # add
+    config.set_section_option('alembic', 'POSTGRES_PORT', os.environ['POSTGRES_PORT']) # add
+    config.set_section_option('alembic', 'POSTGRES_DB', os.environ['POSTGRES_DB']) # add
    
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
script.py.makoのコードはこちら
script.py.mako
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
+ import sqlmodel
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
    ${upgrades if upgrades else "pass"}


def downgrade() -> None:
    ${downgrades if downgrades else "pass"}
backend/alembic.ini
# A generic, single database configuration.

[alembic]
# path to migration scripts
script_location = migrations

prepend_sys_path = .

version_path_separator = os  # Use os.pathsep. Default configuration used for new projects.

+ sqlalchemy.url = postgresql://%(POSTGRES_USER)s:%(POSTGRES_PASSWORD)s@%(POSTGRES_SERVER)s:%(POSTGRES_PORT)s/%(POSTGRES_DB)s


[post_write_hooks]

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

上記のとおり追記したら、下記のコマンドを実行し、マイグレートしていきます。

bash
poetry run alembic revision --autogenerate -m "init"
# poetryを使用する場合は、先頭に「poetry run」をつけて実行します
# "init"の部分はマイグレーションファイル名に記載されます(いわゆる作成コメント部分)
bash
poetry run alembic upgrade head
# poetryを使用する場合は、先頭に「poetry run」をつけて実行します
# マイグレートを実行

【参考記事】
(alembicについて)
https://zenn.dev/shimakaze_soft/articles/4c0784d9a87751
(%記法について)
https://magazine.techacademy.jp/magazine/46444

全体の流れはこれまでのステップにより実行・反映可能です。
上記までの流れを行えば、基本的に新たにAPIの設定を書いても、同じように反映させることができると思います。

認証機能の設定

認証に関するコードを書いていきます。

コードはこちら
backend/v1/crud/auth.py
import os
from fastapi import Depends, HTTPException, status
from datetime import datetime, timedelta

# auth
from jose import jwt

# sqlmodel
from sqlmodel import Session

# db
from ..db.db import get_session

# settings
from ..core.config import settings

# functions
from .user import verfy_password, get_user_by_uuid, get_user_by_email, get_all_user

# uuid
import uuid as uuid_pkg

# .env
from dotenv import load_dotenv
load_dotenv()


# email authentication
def authenticate(session, email, password):
    user = get_user_by_email(session, email)
    print(user)
    
    if verfy_password(password, user.hashed_password) != True:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Can't authentiate by your password."
        )
    return user

def create_tokens(uuid: uuid_pkg.UUID):
    access_token_payload = {
        "token_type": "access_token",
        "exp": datetime.now() + timedelta(days=int(os.environ["ACCESS_TOKEN_EXPIRE_DAYS"])),
        "uuid": uuid
    }

    refresh_token_payload = {
        "token_type": "refresh_token",
        "exp": datetime.now() + timedelta(days=int(os.environ["REFRESH_TOKEN_EXPIRE_DAYS"])),
        "uuid": uuid
    }

    # create token
    access_token = jwt.encode(
        access_token_payload,
        os.environ["SECRET_KEY"],
        algorithm=os.environ["ALGORITHM"]
    )

    refresh_token = jwt.encode(
        refresh_token_payload,
        os.environ["SECRET_KEY"],
        algorithm=os.environ["ALGORITHM"]
    )

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }


def get_current_user_from_token(session, token: str, token_type: str):
    payload = jwt.decode(
        token,
        os.environ["SECRET_KEY"],
        algorithms=os.environ["ALGORITHM"]
    )

    current_user = get_user_by_uuid(session, payload["uuid"])

    if payload["token_type"] != token_type:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="token_type does not match."
        )
    if token_type == "refresh_token" and current_user.refresh_token != token:
        # print(current_user.refresh_token, "¥n", token)
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="refresh_token does not match."
        )
    return current_user


def get_current_user(session: Session = Depends(get_session), token: str = Depends(settings.oauth2_schema)):
    # print(token)
    return get_current_user_from_token(session, token, "access_token")


def get_user_from_refresh_token(session: Session = Depends(get_session), token: str = Depends(settings.oauth2_schema)):
    return get_current_user_from_token(session, token, "refresh_token")

認証に関するAPIの設定

コードはこちら
backend/v1/router/auth.py
from fastapi import APIRouter, Depends, Form, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

# functions
from ..crud.auth import (
    get_current_user,
    get_user_from_refresh_token,
    create_tokens,
    authenticate
)

# schema
from ..schema.user import Token, User, UserLogin

# db
from ..db.db import get_session

# SQLModel
from sqlmodel import Session

router = APIRouter(
    prefix="/auth",
    tags=["Authentication"]
)

@router.post("/token", response_model=Token)
def login(login_user_info: UserLogin, session: Session = Depends(get_session)):
    user = authenticate(session, login_user_info.email, login_user_info.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password."
        )
    
    token = create_tokens(user.uuid)
    user.refresh_token = token["refresh_token"]
    session.add(user)
    session.commit()
    session.refresh(user)
    return token

@router.get("/refresh_token", response_model=Token)
def refresh_token(
    session: Session = Depends(get_session),
    current_user: User = Depends(get_user_from_refresh_token)
):
    token = create_tokens(current_user.uuid)
    current_user.refresh_token = token("refresh_token")
    session.add(current_user)
    session.commit()
    session.refresh(current_user)
    return token


@router.get("/me", response_model=User)
def get_me(current_user: User = Depends(get_current_user)):
    return current_user

main.pyでもこのrouterの設定を読み込みます。

コードはこちら
backend/v1/app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware

import time

from v1.core.config import settings
from v1.router import user, auth

def get_application():
  
  app = FastAPI(
    title = settings.PROJECT_NAME,
    version = settings.PROJECT_VERSION,
    description = settings.PROJECT_DESCRIPTION,
  )
  
  origins = [
    'http://localhost:3000',
  ]
  
  app.add_middleware (
    CORSMiddleware,
    allow_origins = origins,
    allow_credentials = True,
    allow_methods = ['*'],
    allow_headers = ['*'],
  )
  
  return app

app = get_application()

# include router
app.include_router(user.router)
+ app.include_router(auth.router)

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
  start_time = time.time()
  response = await call_next(request)
  process_time = time.time() - start_time
  response.headers['X-Process-Time'] = str(process_time)
  return response

jinja2でログインを試す

作成したAPIを使用して、実際にログイン処理ができるかどうかを確認します。
まずは表示用のHTML、JavaScriptを書いていきます。

コードはこちら
backend/templates/index.html
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Login Page</title>
</head>
<body>
    <h2>Login</h2>
    <form id="loginForm">
        <label for="username">Username:</label>
        <input type="text" id="username" name="username" required>
        <br>
        <label for="password">Password:</label>
        <input type="password" id="password" name="password" required>
        <br>
        <button type="button" onclick="login()">Login</button>
    </form>

    <script>
        async function login() {
            const username = document.getElementById("username").value;
            const password = document.getElementById("password").value;

            const response = await fetch('http://0.0.0.0:8000/user/token', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `username=${encodeURIComponent(username)}&password=${encodeURIComponent(password)}`,
            });

            if (!response.ok) {
              throw new Error("Invalid username or password");
            }

            const data = await response.json();

            if (response.ok) {
                // ログイン成功
                console.log("Access Token:", data.access_token);
                console.log("Refresh Token:", data.refresh_token);

                // アクセストークンをCookieに保存
                document.cookie = `access_token=${data.access_token}; path=/`;
                document.cookie = `refresh_token=${data.refresh_token}; path=/`;
               refreshedData.access_token);
                // ログイン後のページに遷移
                window.location.href = "/userpage.html";
            } else {
                // ログイン失敗
                console.error("Login failed:", data.detail);
            }
        }
    </script>
</body>
</html>

同じく、ログイン後のページも用意します。

コードはこちら
backend/templates/userpage.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>User Info Page</title>
</head>
<body>
    <h1>User Info Page</h1>
    <h2 id="username"></h2>
    <h2 id="email"></h2>

    <script>
        // Cookieから指定した名前のCookie値を取得する関数
        function getCookie(name) {
            const cookies = document.cookie.split(';');
            for (const cookie of cookies) {
                const [cookieName, cookieValue] = cookie.trim().split('=');
                if (cookieName === name) {
                    return decodeURIComponent(cookieValue);
                }
            }
            return null;
        }

        // ページが読み込まれた際にCookieからユーザー情報を取得して表示
        document.addEventListener("DOMContentLoaded", async function() {
            const accessToken = getCookie("access_token");

            if (accessToken) {
                const response = await fetch('http://0.0.0.0:8000/user/user/me', {
                method: 'GET',
                headers: {
                    'Content-Type': 'application/json',
                    'Authorization': `Bearer ${accessToken}`,
                },
                });

                if (!response.ok) {
                    throw new Error("Invalid username or password");
                }
    
                const data = await response.json();

                if (data) {
                    // ユーザー情報を表示
                    document.getElementById("username").innerText = `User name: ${data.username}さんがログイン中です`;
                    document.getElementById("email").innerText = `Email: ${data.email}`;
                } else {
                    console.error("User info not found in the cookie.");
                }
            }else {
                console.error("Access token not found in the cookie.");
            }
        });
    </script>
</body>
</html>

次に、templateの表示用の設定をしていきます。
FastAPIでは、jinja2を使用した設定方法が用意されていますので、これに従います。
https://fastapi.tiangolo.com/ja/advanced/templates/
コードは以下のとおりです。

コードはこちら
backend/v1/app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.templating import Jinja2Templates

import time

from ..core.config import settings
from ..router import auth, user

def get_application():
  
  app = FastAPI(
    title = settings.AUTH_API_PROJECT_NAME,
    version = settings.AUTH_API_PROJECT_VERSION,
    description = settings.AUTH_API_DESCRIPTION,
  )
  
  origins = [
    'http://localhost:3000',
  ]
  
  app.add_middleware (
    CORSMiddleware,
    allow_origins = origins,
    allow_credentials = True,
    allow_methods = ['*'],
    allow_headers = ['*'],
  )
  
  return app

app = get_application()

# jinja
+ templates = Jinja2Templates(directory = 'templates')

# router settings
app.include_router(auth.router)
app.include_router(user.router)

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
  start_time = time.time()
  response = await call_next(request)
  process_time = time.time() - start_time
  response.headers['X-Process-Time'] = str(process_time)
  return response


+@app.get("/", tags = ['template example'])
+async def index(name: str, request: Request):
+    return templates.TemplateResponse("index.html", {
+        "name": name,
+        "request": request
+    })

+@app.get("/index.html", tags = ['login form'])
+async def index(request: Request):
+    return templates.TemplateResponse("index.html", {
+        "request": request
+    })

+@app.get("/userpage.html", tags = ['login form'])
+async def index(request: Request):
+    return templates.TemplateResponse("userpage.html", {
+        "request": request
+    })

開発用サーバーを起動して、指定したURL(今回の例では、ローカルの場合、http://0.0.0.0:8000/index.html)にアクセスすると設定したテンプレートが表示されます。

実際にログインできれば成功です。

以上が、FastAPIを使用したユーザー認証までの流れとなります。

Discussion