FastAPIアプリケーション〜環境設定からログイン処理まで〜
poetryを使用して環境を設定
poetryを使用して、開発環境を設定します。
開発環境の設定は、下記の記事が参考になりました。
FastAPIの起動
上記の記事では、現在のFastAPIの起動コマンドと異なります。
現在は下記のコマンドで行います。
なお、poetryを経由して行いますので、注意してください。
poetry run fastapi dev main.py
poetryを使用して、必要なライブラリなどをインストール
FastAPIで開発するためのライブラリなどをインストールしていきます。
poetry add [インストールするライブラリ]
【インストールするライブラリ】
- fastapi[standard]
- sqlmodel
- uuid
- alembic
- psycopg2-binary
- passlib
- python-dotenv
- python-jose
- bcrypt
- pgcli
- pandas
- openpyxl
それぞれがインストールされると、pyproject.tomlにライブラリが記載されます。
FastAPIの初期設定
メインの処理となるファイルを作成します。
今回はbackendディレクトにv1ディレクトリを作成し、その中にコードを書いていきます。
コードはこちら
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
from v1.core.config import settings
from v1.router import user, auth
def get_application():
app = FastAPI(
title = settings.PROJECT_NAME,
version = settings.PROJECT_VERSION,
description = settings.PROJECT_DESCRIPTION,
)
origins = [
'http://localhost:3000',
]
app.add_middleware (
CORSMiddleware,
allow_origins = origins,
allow_credentials = True,
allow_methods = ['*'],
allow_headers = ['*'],
)
return app
app = get_application()
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
参考にした記事はこちら
アプリケーションの設定
FastAPIのアプリケーションを作っていきます。
まずは全体の共通設定とデータベースの設定を書いていきます。
コードはこちら
import os
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
load_dotenv() # ()には必要に応じてファイルパスを記載
class Settings:
#
AUTH_API_PROJECT_NAME: str = "Application"
AUTH_API_PROJECT_VERSION: str = "0.1.0"
AUTH_API_DESCRIPTION: str = """
アプリケーションに関する説明などを記載(markdownで書けば、反映される)
"""
# DB関連の設定
POSTGRES_USER: str = os.environ['POSTGRES_USER']
POSTGRES_PASSWORD: str = os.environ['POSTGRES_PASSWORD']
POSTGRES_DOCKER: str = os.environ['POSTGRES_DOCKER']
POSTGRES_PORT: str = os.environ['POSTGRES_PORT']
POSTGRES_DB: str = os.environ['POSTGRES_DB']
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
pwd_context = CryptContext(schemes = ['bcrypt'], deprecated = 'auto')
oauth2_schema = OAuth2PasswordBearer(tokenUrl = '/user/token')
settings = Settings()
PostgresQLのための環境変数設定
- os.environ['key']は、環境変数(keyで指定)の読み込み(環境変数がない場合はkeyErrorを返す)
- os.environ['key']は、keyのほか、第2引数としてデフォルト値の設定が可能
環境変数キーが存在しない場合はデフォルト値を返す
PostgreSQLのデータベースURLの設定
【同期処理の場合】
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
【非同期処理の場合】
DATABASE_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
データベースの設定
settingsに書いた設定を使用して、データベースの設定をしていきます。
コードはこちら
from sqlmodel import SQLModel, create_engine, Session
from sqlmodel.sql.expression import Select, SelectOfScalar
from ..core.config import settings
DATABASE_URL = settings.DATABASE_URL
SelectOfScalar.inherit_cache = True
Select.inherit_cache = True
# create_engine init
engine = create_engine(DATABASE_URL, echo=True)
# create table
def init_db():
SQLModel.metadata.create_all(engine)
# Dependency for FastAPI
def get_session():
with Session(engine) as session:
yield session
FastAPIのスキーマ設定
データベースの構造にもなるスキーマを書いていきます。
今回はSQLModelを使用しています。
コードはこちら
from sqlmodel import SQLModel, Field, AutoString
from typing import Optional
from pydantic import EmailStr
import uuid
from datetime import datetime
class BaseUser(SQLModel):
username: str = Field(nullable = False, default = None, unique = True)
email: EmailStr =Field(unique = True, index = True, sa_type=AutoString)
is_active: bool = Field(default = True)
is_superuser: bool = Field(default = False)
class User(BaseUser, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
uuid: str = Field(default_factory = uuid.uuid4, nullable = False)
hashed_password: str = Field(nullable = False)
created_at: datetime = Field(default = datetime.now(), nullable = False)
updated_at: datetime = Field(default_factory = datetime.now, nullable = False, sa_column_kwargs = {'onupdate': datetime.now})
refresh_token: str = Field(nullable = True)
class UserOut(SQLModel):
id: int
uuid: str
username: str
email: str
is_active: bool
is_superuser: bool
created_at: datetime
updated_at: datetime
refresh_token: str
class UserLogin(SQLModel):
email: str
password: str
class UserCreate(SQLModel):
username: str
email: EmailStr
password: str
class UserRead(BaseUser):
id: int
uuid: str
created_at: datetime
updated_at: datetime
class UserUpdate(BaseUser):
username: Optional[str] = None
email: Optional[EmailStr] = None
class Token(SQLModel):
access_token: str
refresh_token: str
token_type: str
class TokenData(SQLModel):
uuid: str | None = None
FastAPIでユーザー情報取得するためのコードを書く
APIとしてデータの取得をするためのコードを書きます。
コードはこちら
import os
from fastapi import HTTPException, status
from typing import List
# schema
from ..schema.user import User, UserOut
# sqlmodel
from sqlmodel import select
# uuid
import uuid as uuid_pkg
# settings
from ..core.config import settings
# pandas
import pandas as pd
def check_user(session, user):
checked_user = session.exec(
select(User)
.where(User.username == user.username)
.where(User.email == user.email)
).first()
return checked_user
def get_hashed_password(password):
return settings.pwd_context.hash(password)
def verfy_password(password, hashed_password):
return settings.pwd_context.verify(password, hashed_password)
def get_all_user(session):
all_user = select(User)
user_list: List[UserOut] = session.exec(all_user)
def update_user(session, uuid, user):
db_user = session.exec(select(User).where(User.uuid == uuid)).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail = "User not found."
)
user_data = user.model_dump(exclude_unset = True)
for key, value in user_data.items():
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
def delete_user(session, uuid):
delete_user = get_user_by_uuid(session, uuid)
if not delete_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail = "User not found."
)
session.delete(delete_user)
session.commit()
return {"message": "User was deleted."}
def get_user_by_uuid(session, uuid: str):
return session.query(User).filter(User.uuid == uuid).first()
def get_user_by_email(session, email: str):
return session.query(User).filter(User.email == email).first()
"""一括登録用のメソッド"""
def users_register(session, file_path: str):
failed_users = []
if file_path.endswith(".csv"):
df = pd.read_csv(file_path)
elif file_path.endswith(".xlsx"):
df = pd.read_excel(file_path)
else:
raise ValueError(
"Invalid file format. Please provide a CSV or EXCEL file."
)
for index, row in df.iterrows():
# A: username, B: email, C:password, D:is_active, E: is_superuser
username = row["username"]
email = row["email"]
password = row["password"]
is_active = row["is_active"]
is_superuser = ["is_superuser"]
if pd.isna(username) or pd.isna(email) or pd.isna(password):
failed_users.append({
"username": username,
"error": "Username, email, password are required fired."
})
continue
try:
hashed_password = get_hashed_password(password)
# 既存ユーザーの検索
existing_user = session.query(User).filter((User.username == username) | (User.email == email)).first()
if existing_user:
# ユーザーが存在する場合、情報を更新(空欄の場合はその項目をスキップ)
if not pd.isna(username):
existing_user.username = username
if not pd.isna(email):
existing_user.email = email
if not pd.isna(password):
existing_user.hashed_password = hashed_password
if not pd.isna(is_active):
existing_user.is_active = is_active
if not pd.isna(is_superuser):
existing_user.is_superuser = is_superuser
else:
# ユーザーが存在しない場合、新規作成
new_user = User(
uuid = str(uuid_pkg.uuid4()),
username = username,
email = email,
hashed_password=hashed_password,
is_active=is_active,
is_superuser=is_superuser,
)
session.add(new_user)
except Exception as e:
failed_users.append({
"username": username,
"error": str(e)
})
session.commit()
return failed_users
FastAPIでAPI出力
次にAPIドキュメント(OpenAPIで出力される)を生成するためのコードを書いていきます。
コードはこちら
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlmodel import Session, select
from typing import List
import shutil
# functions
from ..crud.user import (
check_user,
update_user,
delete_user,
get_hashed_password,
users_register,
)
# schemas
from ..schema.user import (
User,
UserCreate,
UserOut,
UserRead,
UserUpdate,
)
import uuid as uuid_pkg
# db
from ..db.db import get_session
router = APIRouter(
prefix="/user",
tags=["User"]
)
# sign up
@router.post("/signup", response_model=UserOut)
def create_user(userIn: UserCreate, session: Session = Depends(get_session)):
user = check_user(session, userIn)
if user:
raise HTTPException(
status_code=409,
detail = "Username or email has been used. Please change to other username or email."
)
else:
new_user = User(
username=userIn.username,
email = userIn.email,
is_active=True,
is_superuser=False,
hashed_password=get_hashed_password(userIn.password),
refresh_token=""
)
session.add(new_user)
session.commit()
session.refresh(new_user)
return new_user
@router.post("/upload-users")
def upload_users(
session: Session = Depends(get_session),
file: UploadFile = File(...)
):
file_location = f"temp_{file.filename}" # 一時的にファイルを保存
with open(file_location, "wb+") as file_object:
shutil.copyfileobj(file.file, file_object)
# 登録処理
result = users_register(session, file_location)
if result:
return {
"message": "Some users failed to register. Please check each item.",
"failed_users": result
}
else:
return {"message": "Users registered successfully."}
@router.get("/all_user", response_model=List[UserOut])
def read_all_user(session: Session = Depends(get_session)):
all_user = session.exec(
select(User)
).all()
return all_user
@router.get("/{user_uuid}", response_model=UserOut)
def get_user(user_uuid: str, session: Session = Depends(get_session)):
user = session.get(User, user_uuid)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found."
)
return user
@router.patch("/edit_user/{user_uuid}", response_model=UserOut)
def edit_user(
*,
session: Session=Depends(get_session),
user: UserUpdate,
user_uuid: str
):
return update_user(session, user_uuid, user)
@router.delete("/delete_user/{user_uuid}")
def delete_user(
*,
session: Session=Depends(get_session),
uuid: str
):
user = session.get(User, uuid)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found."
)
return delete_user(session, uuid)
main.pyで設定したrouterを読み込む
main.pyでrouterを読み込みます。
コードはこちら
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
from v1.core.config import settings
from v1.router import user, auth
def get_application():
app = FastAPI(
title = settings.PROJECT_NAME,
version = settings.PROJECT_VERSION,
description = settings.PROJECT_DESCRIPTION,
)
origins = [
'http://localhost:3000',
]
app.add_middleware (
CORSMiddleware,
allow_origins = origins,
allow_credentials = True,
allow_methods = ['*'],
allow_headers = ['*'],
)
return app
app = get_application()
# include router
+ app.include_router(user.router)
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
データベースの設定
データベースはalembicを使用して設定していきます。
まずは、以下のコマンドを実行します。
poetry run alembic init migrations
これにより、migrationsディレクトリが作成され、いくつかファイルが作成されるため、これらのファイルに追記していきます。
env.pyのコードはこちら
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# add
+ import os
+ from sqlmodel import SQLModel
+ from v1.schema.user import User
# add
+ from dotenv import load_dotenv
+ load_dotenv()
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
- target_metadata = None
+ target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
+ # set_section_option (セクション: str 、名前: str 、値: str )
+ config.set_section_option('alembic', 'POSTGRES_USER', os.environ['POSTGRES_USER']) # add
+ config.set_section_option('alembic', 'POSTGRES_PASSWORD', os.environ['POSTGRES_PASSWORD']) # add
+ config.set_section_option('alembic', 'POSTGRES_DOCKER', os.environ['POSTGRES_DOCKER']) # add
+ config.set_section_option('alembic', 'POSTGRES_PORT', os.environ['POSTGRES_PORT']) # add
+ config.set_section_option('alembic', 'POSTGRES_DB', os.environ['POSTGRES_DB']) # add
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
script.py.makoのコードはこちら
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
+ import sqlmodel
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
prepend_sys_path = .
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
+ sqlalchemy.url = postgresql://%(POSTGRES_USER)s:%(POSTGRES_PASSWORD)s@%(POSTGRES_SERVER)s:%(POSTGRES_PORT)s/%(POSTGRES_DB)s
[post_write_hooks]
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
上記のとおり追記したら、下記のコマンドを実行し、マイグレートしていきます。
poetry run alembic revision --autogenerate -m "first migration"
# poetryを使用する場合は、先頭に「poetry run」をつけて実行します
# "first migration"の部分はマイグレーションファイル名に記載されます
実行にあたっては以下のコマンドを実行します
poetry run alembic upgrade head
# poetryを使用する場合は、先頭に「poetry run」をつけて実行します
# マイグレートを実行
【参考記事】
(alembicについて)
(%記法について)
全体の流れは以上です。
上記までの流れを行えば、基本的にAPIの設定変更(基本的にはTableに影響を及ぼすschemaの変更)を書いても、反映させることができると思います。
認証機能の設定
次に認証に関するコードを書いていきます。
コードはこちら
import os
from fastapi import Depends, HTTPException, status
from datetime import datetime, timedelta, timezone
# auth
from jose import jwt, JWTError
# sqlmodel
from sqlmodel import Session
# db
from ..db.db import get_session
# settings
from ..core.config import settings
# functions
from .user import verfy_password, get_user_by_uuid, get_user_by_email, get_all_user
# schemas
from ..schema.user import TokenData
# .env
from dotenv import load_dotenv
load_dotenv()
# email authentication
def authenticate(session, email, password):
user = get_user_by_email(session, email)
print(user)
if verfy_password(password, user.hashed_password) != True:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Can't authentiate by your password."
)
return user
def create_tokens(uuid: str):
access_token_payload = {
"token_type": "access_token",
"exp": datetime.now(timezone.utc) + timedelta(days=int(os.environ["ACCESS_TOKEN_EXPIRE_DAYS"])),
"uuid": uuid,
}
refresh_token_payload = {
"token_type": "refresh_token",
"exp": datetime.now(timezone.utc) + timedelta(days=int(os.environ["REFRESH_TOKEN_EXPIRE_DAYS"])),
"uuid": uuid,
}
# create token
access_token = jwt.encode(
access_token_payload,
os.environ["SECRET_KEY"],
algorithm=os.environ["ALGORITHM"]
)
refresh_token = jwt.encode(
refresh_token_payload,
os.environ["SECRET_KEY"],
algorithm=os.environ["ALGORITHM"]
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
def get_current_user_from_token(session, token: str, token_type: str):
payload = jwt.decode(
token,
os.environ["SECRET_KEY"],
algorithms=[os.environ["ALGORITHM"]]
)
current_user = get_user_by_uuid(session, payload["uuid"])
if payload["token_type"] != token_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="token_type does not match."
)
if token_type == "refresh_token" and current_user.refresh_token != token:
# print(current_user.refresh_token, "¥n", token)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="refresh_token does not match."
)
return current_user
def get_current_user(token: str = Depends(settings.oauth2_schema), session: Session = Depends(get_session)):
# print(token)
print(get_current_user_from_token(session, token, "access_token"))
return get_current_user_from_token(session, token, "access_token")
def get_user_from_refresh_token(token: str = Depends(settings.oauth2_schema), session: Session = Depends(get_session)):
return get_current_user_from_token(session, token, "refresh_token")
def verify_token(token: str):
try:
payload = jwt.decode(
token,
os.environ["SECRET_KEY"],
algorithms=[os.environ["ALGORITHM"]]
)
uuid: str = payload.get("uuid")
if uuid is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail = "Unauthorized."
)
token_data = TokenData(uuid = uuid)
return token_data
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail = "Invalid token."
)
認証に関するAPIの設定
コードはこちら
from fastapi import APIRouter, Depends, HTTPException, status, Body
from fastapi.security import OAuth2PasswordRequestForm
# functions
from ..crud.auth import (
get_current_user,
get_user_from_refresh_token,
create_tokens,
authenticate,
verify_token
)
# schema
from ..schema.user import Token, User, UserLogin
# db
from ..db.db import get_session
# SQLModel
from sqlmodel import Session
# settings
from ..core.config import settings
router = APIRouter(
prefix="/auth",
tags=["Authentication"]
)
@router.post("/auth_token", response_model=Token)
def login(login_user_info: UserLogin, session: Session = Depends(get_session)):
user = authenticate(session, login_user_info.email, login_user_info.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password."
)
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found."
)
token = create_tokens(user.uuid)
user.refresh_token = token["refresh_token"]
session.add(user)
session.commit()
session.refresh(user)
return token
@router.get("/refresh_token", response_model=Token)
def refresh_token(
session: Session = Depends(get_session),
current_user: User = Depends(get_user_from_refresh_token)
):
token = create_tokens(current_user.uuid)
current_user.refresh_token = token("refresh_token")
session.add(current_user)
session.commit()
session.refresh(current_user)
return token
@router.get("/me", response_model=User)
def get_me(current_user: User = Depends(get_current_user)):
return current_user
@router.post("/verify-token")
async def verify_token_route(token: str = Depends(settings.oauth2_schema)):
token_data = verify_token(token)
return {
"isAuthenticated": True,
"uuid": token_data.uuid,
}
上記のコードでは、OAuth2PasswordRequestFormを使用してログインフォームを構成しており、json形式でのデータ送信という形はとっていません。
この場合、OAuth2PasswordRequestFormでは、usernameをフォームに使用する仕様となっているため、今回定義したauthenticateメソッドとは受け取るデータが異なります。
そのため、OAuth2PasswordRequestFormのusernameから受け取る値は、authenticateメソッドのemailに該当するものとして使用しています。
フィールドはusernameとして設定されている仕様のまま運用しますが、入力された値は認証処理に使用するemailの値となることに注意してください。
:::
main.pyでもこのrouterの設定を読み込みます。
コードはこちら
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
from v1.core.config import settings
from v1.router import user, auth
def get_application():
app = FastAPI(
title = settings.PROJECT_NAME,
version = settings.PROJECT_VERSION,
description = settings.PROJECT_DESCRIPTION,
)
origins = [
'http://localhost:3000',
]
app.add_middleware (
CORSMiddleware,
allow_origins = origins,
allow_credentials = True,
allow_methods = ['*'],
allow_headers = ['*'],
)
return app
app = get_application()
# include router
app.include_router(user.router)
+ app.include_router(auth.router)
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
以上が、FastAPIを使用したAPIの作成とユーザー認証までの流れとなります。
Discussion