FastAPI
pipx
Python
├── sample
│ ├── __init__.py
│ └── sample.py
└── sample2
├── __init__.py
└── sample3
├── __init__.py
└── test_call.py
上記のようなファイル構造で、下記のようなファイル内容があります。
def call():
print('sample')
import sys
from pprint import pprint
from sample import sample
path = sys.path
pprint(path)
sample.call()
rootディレクトリ(/app)にいる状態で、ファイルを2通りのパターンで実行しようとしました。
$ python -m sample2.sample3.test_call
['/app',
'/usr/local/lib/python311.zip',
'/usr/local/lib/python3.11',
'/usr/local/lib/python3.11/lib-dynload',
'/usr/local/lib/python3.11/site-packages']
sample
$ python sample2/sample3/test_call.py
Traceback (most recent call last):
File "/app/sample2/sample3/test_call.py", line 1, in <module>
from sample import sample
ModuleNotFoundError: No module named 'sample'
上記の通りスラッシュでファイル指定をした場合にモジュールのインポートでエラーとなります。
python -m module.submodule の形式で指定した場合、Pythonは sys.path の各ディレクトリを検索して指定したモジュールを探します。sys.path の最初のエントリは通常、スクリプトが実行されているディレクトリ(ここでは /app)です。
python path/to/module.py の形式で指定した場合、Pythonは指定したファイルを直接実行しますが、そのファイルのあるディレクトリ(ここでは /app/sample2/sample3)が sys.path の最初のエントリになります。
alembic
.
├── Dockerfile
├── alembic.ini
├── app
│ ├── __init__.py
│ ├── models
│ │ ├── admin.py
│ │ ├── __init__.py
│ │ ├── mixins.py
│ │ └── user.py
チュートリアルにあるmodels.pyに全部打っ込む形式であれば、alembicのenv.pyでメタデータを指定する部分で直接db定義部分のdeclarative_bae()の変数を参照し、metadataを取れば良い。
上記のようにパッケージ形式でmodelsディレクトリにモデルファイルを複数入れる場合には、愚直にtarget_metadata変数にリストで各モデルを記載するか、modelsディレクトリをパッケージとして扱い、そこでモデルを各種インポートしてあげれば良い。
from database import Base
from .admin import Admin
from .user import User
ここでBaseをインポートしないとalembicのrevision --autogenerate
でどのモデルも検知されず、またmodels以下にあるmodelもインポートしないと同様に検知されない。
多分__init__.pyをmodels.pyと等価になるような構造にしてあげる必要があるのだと思う。
migration
Generate migrations from models
$ alembic revision --autogenerate -m "YOUR_COMMENT_HERE"
All Up
alembic upgrade head
+1 Up
alembic upgrade +1
All Down
alembic downgrade base
-1 Down
alembic downgrade -1
Dockerfile
PYTHONUNBUFFERED=1: Pythonが標準出力のバッファリングを無効にします。つまり、Pythonが標準出力に何かを書き込むと、その情報は即時にフラッシュされます。これは特にログをリアルタイムで見たい場合に有用です。
PYTHONDONTWRITEBYTECODE=1: Pythonが.pycバイトコードファイルを生成しないようにします。Dockerイメージ内でこのファイルを生成する必要は通常なく、イメージのサイズを大きくするだけです。
PIP_NO_CACHE_DIR=off: pipがパッケージのダウンロードキャッシュを無効にします。これにより、Dockerイメージのサイズを小さく保つことができます。
PIP_DISABLE_PIP_VERSION_CHECK=on: pipが起動時に最新バージョンをチェックするのを無効にします。これにより、不必要なネットワークリクエストを避け、ビルド時間を短縮できます。
PIP_DEFAULT_TIMEOUT=100: pipのネットワーク操作のタイムアウトを設定します。
POETRY_VERSION=1.5.1: Poetryのバージョンを指定します。
POETRY_HOME="/opt/poetry": Poetryのインストール先ディレクトリを指定します。
POETRY_VIRTUALENVS_CREATE=false: PoetryがPythonの仮想環境を生成しないようにします。Dockerコンテナ内では通常、全体が仮想環境とみなされるため、別途仮想環境を作成する必要はありません。
PYSETUP_PATH="/opt/pysetup": このパスにあるPythonセットアップスクリプト(またはファイル)を使用します。これはPythonアプリケーションの設定や起動に関するコードが含まれている場合に使用します。
dependency
APIRouterのdependenciesで関数または、クラスを注入できる。
router = APIRouter(
prefix="/samples",
tags=['samples'],
dependencies=[Depends(print_sample_text), Depends(TestClassService)]
)
- 関数の場合には、このルータのいずれのメソッドが呼ばれた際も必ず呼び出される(実行される)
- クラスの場合はコンストラクタが実行されインスタンスが生成されるっぽい
@router.get("/")
async def index(test_class_service: TestClassService = Depends(TestClassService)):
print(test_class_service.test_method())
return [{"name": "sample1"}, {"name": "sample2"}]
ルーターで生成したクラスインスタンスのメソッドを使いたい場合は、使用するメソッドで変数として使用したいクラスを割り当てる必要がある。
エンドポイント共通
クラスベースではないため、エンドポイント共通の依存関係を定義したからといってself.classのような形で各メソッドで使えるわけではない。
多分、認証とかミドルウェア関数とかを共通依存関係として入れておいて、それ以外はメソッド単位で依存関係入れる想定なのかも
FastAPIのAPIRouterのdependenciesパラメータは、そのルーター内のすべてのエンドポイントで共有する依存関係を一元管理するために使われます。そのため、共通の前処理や後処理、または各エンドポイントで共通して使われるサービスなどを指定するのに便利です。
例えば、すべてのエンドポイントでユーザー認証を行いたい場合、ユーザー認証を行う依存関係(例えばDepends(authenticate_user))をdependenciesパラメータに追加することで、そのルーターのすべてのエンドポイントで自動的にユーザー認証が行われます。エンドポイントごとに認証の依存関係を追加するよりも、一元管理された場所で依存関係を管理できるため、コードの重複を減らし、メンテナンス性を向上させることができます。
しかし、この場合でも、それぞれのエンドポイントでその依存関係から返される値を使用する場合(例えば、認証されたユーザーの情報を使う場合など)は、エンドポイントの関数のパラメータとしてその依存関係を明示的に注入する必要があります。これは、FastAPIがリクエストスコープ(リクエストごと)で依存関係を解決するためです。
したがって、dependenciesパラメータで依存関係を設定する主なメリットは、共通の依存関係を一元管理できること、そしてそのルーターのすべてのエンドポイントでその依存関係が自動的に適用されることです。ただし、それぞれのエンドポイントでその依存関係から返される値を使用する場合は、その依存関係を明示的に注入する必要があります。
FastAPIの依存関係システムでは、依存関係が解決される場所(つまり依存関係を明示的に使用する場所)に依存関係を明示的に指定する必要があります。この理由は、FastAPIがリクエストごとに新たな依存関係のインスタンスを生成するため、依存関係が解決されるタイミングを制御するためです。
したがって、APIRouterのdependenciesパラメータで依存関係を指定した場合でも、その依存関係から返される値(例えば、サービスクラスのインスタンスなど)をエンドポイントの関数内部で使用するためには、その関数のパラメータとして依存関係を再度指定する必要があります。これにより、FastAPIはその関数が呼び出される際に適切なタイミングで依存関係を解決できます。
ただし、依存関係の値自体を関数内部で使用しない場合(例えば、依存関係が何らかの初期化処理や認証処理を行い、その結果を関数内部で直接使用しない場合など)は、関数のパラメータとして依存関係を指定する必要はありません。この場合、APIRouterのdependenciesパラメータで依存関係を指定するだけで、その依存関係は各関数が呼び出される前に自動的に解決されます。
routerのdependenciesはリストの順番で直列に処理される
router = APIRouter(
prefix="/sample",
tags=["sample"],
dependencies=[Depends(get_first_user), Dpends(get_second_user)],
)
まずget_first_user
の依存関係が解決され、次にget_second_userの依存関係が解決される。
認証系とかを最初に入れておけばまずそこがチェックされるため、テストの時や本番でも即時にレスを返せる。
メソッドの解決は引数の順序によらず並列
async def update(
body: SampleRequest,
sample_id: PydanticObjectId = Path(...),
sample_service: SampleService = Depends(SampleService),
)
SampleRequestでバリデーションエラーが起きようがパスパラメータでエラーになろうが、Dependsはエンドポイント到達時点で解決される。
バリデーションエラーのテストを作成している時になんか遅いなと思ったら依存関係を見直すこと。
Class dependency test
依存性注入を探すときに辞書から探すが辞書のキーはハッシュ可能でないといけないので普通のクラス定義だけではFastapiが探せないならしい。
python組み込みのdataclassを使ってfrozenでインスタンス生成後の属性が不変であることを保証しeqで比較メソッドを自動生成する。
dataclassを使うと__init__なども自動生成されるので自作の__init__は削除してdataclassのinitを使用する。
dataclassはクラスフィールドとして定義された値を引数とする__init__を自動生成する。
FastAPIで依存関係をオーバーライドする際に、dependency_overridesのキーが一致するかどうかは、以下のプロセスで判定されます。
1. 関数のオーバーライド:
関数が依存関係として指定されている場合、FastAPIはその関数オブジェクト自体をキーとして使用します。つまり、関数オブジェクトのIDが一致するかどうかでオーバーライドが判定されます。
def get_user():
# ユーザーを取得する処理
pass
def get_admin():
# 管理者を取得する処理
pass
app.dependency_overrides[get_user] = lambda: "mocked user"
この場合、get_user関数を依存関係として使用している部分はオーバーライドされますが、get_admin関数には影響しません。これは、get_userとget_adminが異なる関数オブジェクトであり、それぞれ異なるIDを持つためです。
2. クラスのオーバーライド:
クラスが依存関係として指定されている場合、そのクラスの型自体がキーとして使用されます。クラスのオブジェクトではなく、**クラスの型(クラスオブジェクトそのもの)**がキーとして使用されます。
class UserService:
pass
app.dependency_overrides[UserService] = lambda: "mocked service"
ここでは、UserServiceというクラス型がキーになります。そのため、UserServiceを依存関係として使用している箇所がオーバーライドされます。
3. 引数付き関数のオーバーライド:
引数付き関数(例えば、can_perform(permission: str)のようなもの)をオーバーライドする場合、can_performが返す依存関係関数そのものがキーとして使用されます。
def can_perform(permission: str):
async def verify_permission():
pass
return verify_permission
app.dependency_overrides[can_perform('read')] = lambda: "mocked permission check"
can_perform('read')が返す関数が依存関係として指定されている場合、その関数オブジェクトがオーバーライドのキーになります。つまり、can_perform('read')の呼び出しごとに異なる関数オブジェクトが返されるため、同じ引数を渡した場合にのみオーバーライドが機能します。
4. 内部的な判定方法:
Pythonのid()関数により、オブジェクトのメモリアドレス(ID)で一致判定が行われます。dependency_overridesのキーとして使用された関数やクラスオブジェクトのIDが一致する場合にのみオーバーライドが適用されます。
関数オブジェクトの場合: 関数オブジェクトのIDが一致するかどうか。
クラスオブジェクトの場合: クラスの型が一致するかどうか。
これにより、FastAPIは依存関係を適切にオーバーライドするかどうかを決定します。
例外:
もし、関数が毎回異なるオブジェクトを返す場合、dependency_overridesでその関数をオーバーライドするのは困難です。例えば、can_performのように引数に基づいて新しい関数を返す場合、適切なオーバーライドができないことがあります。この場合、依存関係全体をオーバーライドするのではなく、関数内で特定の依存関係をモックする必要があるかもしれません。
SqlAlchemy
transaction(要動作確認)
トランザクション処理がどの単位でまとまるのかいまいちわかってないので調査。
FastAPIとSQLAlchemyを使用してアプリケーションを構築するとき、一般的なDBセッションのライフサイクルは次のようになります:
HTTPリクエストがFastAPIアプリケーションに到着します。ルーティングレイヤーは適切なエンドポイントハンドラを決定します。
FastAPIの依存性注入(DI)システムは、そのエンドポイントハンドラに必要なすべての依存関係を解決します。このプロセスは依存関係の階層をたどります。たとえば、あるサービスクラスがリポジトリクラスを必要とし、リポジトリクラスがAsyncSessionを必要とする場合、FastAPIはそれぞれのDepends()関数を呼び出して各オブジェクトを作成します。
このケースでは、get_db関数はDIシステムによって呼び出され、新しいDBセッション(AsyncSessionオブジェクト)を作成します。このセッションはリポジトリクラスのコンストラクタに注入され、リポジトリクラスのインスタンスが作成されます。そして、それがサービスクラスのコンストラクタに注入され、サービスクラスのインスタンスが作成されます。
その後、FastAPIはエンドポイントハンドラを呼び出し、その引数として作成したサービスクラスのインスタンスを渡します。ハンドラはそのサービスクラスを通じてデータベース操作を実行します。
エンドポイントハンドラの処理が終了し、HTTPレスポンスがクライアントに送信されます。
get_db関数でasync with async_session() as session:を使用しているため、FastAPIのレスポンス送信後にセッションが自動的に閉じられます。この時点で、未コミットのトランザクションがあれば自動的にロールバックされます。
このライフサイクルは、リクエストごとに繰り返されます。そして、各リクエストは独自のDBセッションを持つため、リクエスト間でのデータ競合を防ぐことができます。
なお、Depends(get_db)という記述により、そのリポジトリがFastAPIのDIシステムから同じリクエストの中で作成されたDBセッションを使用するように指定しています。そのため、同一リクエスト内で作成されるすべてのリポジトリ(およびそれらを依存とするサービス)は同じDBセッションを共有します。これにより、複数のリポジトリが協調して一連のデータベース操作を行い、それらを一つのトランザクションで管理することが可能になります。
session
from dotenv import dotenv_values
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DB_URL = "mysql+pymysql://%s:%s@%s/%s?charset=utf8" % (
dotenv_values(".env")["MYSQL_USER"],
dotenv_values(".env")["MYSQL_PASSWORD"],
dotenv_values(".env")["MYSQL_HOST"],
dotenv_values(".env")["MYSQL_DATABASE"],
)
ASYNC_DB_URL = "mysql+aiomysql://%s:%s@%s/%s?charset=utf8" % (
dotenv_values(".env")["MYSQL_USER"],
dotenv_values(".env")["MYSQL_PASSWORD"],
dotenv_values(".env")["MYSQL_HOST"],
dotenv_values(".env")["MYSQL_DATABASE"],
)
engine = create_engine(DB_URL, echo=True)
async_engine = create_async_engine(ASYNC_DB_URL, echo=True)
session = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
async_session = sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
class_=AsyncSession
)
Base = declarative_base()
async def get_db():
async with async_session() as s:
yield s
def get_sync_db():
with session() as s:
yield s
なんでジェネレータ?
実際には、returnを使用すると、関数は最初の呼び出しで完全に終了し、新しいセッションを作成することはありません。次にその関数が呼び出されると、新たに実行が始まります。
一方で、ジェネレータは一度に1つの結果をyieldしますが、その後、状態が保存され、次にそのジェネレータが呼び出されるときには前回のyieldの直後から処理が再開します。
このため、ジェネレータは必要に応じて新しいセッションを生成することができます。具体的には、ジェネレータのyieldキーワードは一時的に処理を停止し、再開する際には同じ関数内で新たなデータベースセッションを作成します。そのため、ジェネレータを使うことで、必要に応じて新しいデータベースセッションを動的に生成することが可能になります。
ただし、get_dbやget_sync_dbのような関数が新しいセッションを提供するかどうかは、それらがどのように実装されているかによります。この具体的なコードでは、get_dbやget_sync_dbはデータベース接続を管理し、新しいセッションを提供するためのものであり、それぞれの呼び出しで新しいセッションが提供されます。
sqlalchemy.exc.InvalidRequestError: When initializing mapper Mapper
循環参照系のエラーでハマった時、TYPE_CHECKING
を使ってるのに変だなーなどと思ったときはそもそも、ちゃんとDBをセットアップしているか確認しましょう。
特にテストなどでカスタムマークでDBのイニシャライズなどをしている場合には上記のエラーが出てしまったりするので無駄な時間を過ごさないようにチェックすること。
Pydantic
discriminated union list
GenericModel
from typing import TypeVar, Generic
from pydantic.generics import GenericModel
T = TypeVar("T")
class LanguageDict(GenericModel, Generic[T]):
en: T
ja: T
Query params
クエリパラメータの場合は変数としてクエリパラメータのキーを宣言すれば後はよしなにやってくれる
method variable
PydanticObjectIdを引数に取るメソッドについて。
Pythonの通常のtype hintingだとクラスを指定しても強制的にインスタンス化したりしないが、pydanticの場合はインスタンス化させる?要検証
エディタのエラー出るがテスト通る
@pytest.mark.asyncio
async def test_should_return_none_if_document_not_found(self):
result = await self.repo.get_by_id("123456789012345678901234") #型なし、バリデーション満たす文字列
assert result is None
エディタのエラー出るし、バリデーションエラー
@pytest.mark.asyncio
async def test_should_return_none_if_document_not_found(self):
result = await self.repo.get_by_id("1234567890")
assert result is None
定義時点でエディタエラーでるし、メソッド呼び出しでバリデーションエラー
@pytest.mark.asyncio
async def test_should_return_none_if_document_not_found(self):
dummy_object_id: PydanticObjectId = "1234567890"
result = await self.repo.get_by_id(dummy_object_id)
assert result is None
エディタ上エラーは見えないが、インスタンス化でバリデーションエラー
@pytest.mark.asyncio
async def test_should_return_none_if_document_not_found(self):
dummy_object_id = PydanticObjectId("1234567890")
result = await self.repo.get_by_id(dummy_object_id)
assert result is None
query params
同名引数で取得もできるし、全部まとめてpydanticmodelとして処理もできる
関数パスオペレーション(例えばルート)のパスパラメータ。例えば、@app.get('/items/{item_id}')のitem_id。
パスオペレーション関数の単一引数からなる依存性。これはDepends()を使用した依存性を指しますが、Pydanticモデルまたは他の型宣言を含む一つの引数だけを受け付けます。この引数はクエリパラメータとして認識されます。
パスオペレーション関数の複数引数からなる依存性。これもDepends()を使用した依存性ですが、複数の引数(Pydanticモデルや他の型宣言を含む)を受け付けます。これらの引数はそれぞれ個別のクエリパラメータとして認識されます。
リクエストボディのパラメータ。これらは通常、Pydanticモデルによって定義され、JSONとして送信されるデータを表します。
もしクエリパラメータが複数の場所(たとえば、パスパラメータと依存性)で同じ名前を持っていた場合、FastAPIは優先順位に従って解析します。つまり、まずパスパラメータが評価され、次に単一引数の依存性、そして複数引数の依存性が評価されます。最後にボディパラメータが評価されます。
@router.get('/', response_model=list[Stimulus])
async def index(
stimulus_service: StimulusService = Depends(StimulusService),
query: BasePaginateQuery = Depends()) -> list[Stimulus]:
return await stimulus_service.get(paginate=query.dict())
class BasePaginateQuery(BaseModel):
page: int = Query(1, gt=0)
limit: int = Query(10, gt=0, le=100)
parse_obj
定義してないキーを渡してもバリデーションエラーにならないと思ったら
PydanticのBaseModelはparse_objメソッドで与えられた不明なキーを黙って無視します。これは設計上の選択であり、厳密な検証を望む場合には、モデル定義でConfigクラスを利用し、extra設定を"forbid"に設定することで、不明なキーが与えられたときにエラーを引き起こすようにすることが可能です。
from pydantic import BaseModel
class Sample(BaseModel):
name: str
class Config:
extra = "forbid"
# Now using an unknown key will cause a ValidationError
Sample.parse_obj({'name': 'Test', 'unknown_key': 'value'})
競合
path: constr(regex=r"^\w+(\.\w+)*$") = Field(..., min_length=1)
上記のようにすると正規表現はword, word.wordしか許可しないのにFieldの方では最小1文字も許可するみたいになってしまうとValidationErrorが発生しない
Serialize
Model.dict(), Model.json()のように変換できるが、内部にEnumの値が含まれているとそれはEnumのまま。
普段使いで問題になることはほぼないけれど、ルーターのテストにおいてリクエストボディにPydanticModelをシリアライズしたものを利用する場合には、内部に含まれている値がJsonシリアライズ可能な状態になっているか注意しないとハマる
パスパラメータ
Fastapiにおいてパスパラメータは必須なので省略した場合該当のエンドポイントでバリデーションエラーを出すわけではなく、パス自体にマッチしない
@router.get('/sample/{param}')
@router.get('/{param}')
curl http://localhost/sample
上記のリクエストは/{param}
の方で処理される。
Query param in PydanticModel
class Sample(BaseModel):
query1: Optional[str] = None
query2: List[str] = [] # Queryで書いても同じ
上記のように定義してもquery1は機能するけれど、query2は機能しない。
@router.get('/')
async def get(
tags: List[str] = Query([], alias="tags")
):
ルーターの方に定義すれば機能する。
private(_)
レスポンスモデルとかで_idフィールドを定義しても、OpenAPIのドキュメントに含まれない
field_validator
他のフィールドを参照したいときやFieldが使えないタイプの時
class SignupRequestProviderEmail(BaseModel):
type: Literal['email'] = 'email'
email: EmailStr
password: SecretStr
@field_validator('password')
def has_min_length(cls, password: SecretStr) -> SecretStr:
min_length = 8
if len(password) < min_length:
raise ValueError(f"ensure this value has at least {min_length} characters")
return password
@field_validator('password')
def has_max_length(cls, password: SecretStr) -> SecretStr:
max_length = 32
if len(password) > max_length:
raise ValueError(f"ensure this value has at most {max_length} characters")
return password
@field_validator('password')
def matches_regex(cls, password: SecretStr) -> SecretStr:
regex = r"^(?=.*[a-zA-Z])(?=.*\d)(?=.*[!@#$%^&*]).+$"
if not re.match(regex, password.get_secret_value()):
raise ValueError("Password must have at least one letter, one number and one special character.")
return password
python
Python の import 機構では、最初にモジュールがインポートされるときにそのモジュールの内容が実行され、その結果がキャッシュされます。それ以降、同じモジュールをどこかからインポートしようとすると、キャッシュされた結果が再利用されます。そのため、current_env_config モジュールの get() 関数の結果が最初にキャッシュされた時点での状態(つまり ENV 環境変数の設定が反映される前)がその後も再利用されてしまっているのかもしれません。
import os
env = os.environ.get('ENV') # インポート時点の内容でcacheされちゃうよ
def sample_logic():
# some logics
Generate client
You got to edit some pieces.
override_dependencies
ダメなパターン
class TestStimulusCategoryRouter:
def setup_method(self):
self.app = FastAPI()
self.client = TestClient(self.app)
self.app.include_router(stimulus_category_router.router)
class TestDebug(TestStimulusCategoryRouter):
def test_should_return_200(self):
get_current_admin_user_mock = AsyncMock()
get_current_admin_user_mock.return_value = MagicMock(spec=AdminUser)
self.app.dependency_overrides[get_current_admin_user] = get_current_admin_user_mock
response = self.client.get('/stimulus_categories/sample')
assert response.status_code == 200
テストケースで使用しているunittest.mock.AsyncMockは、非同期関数を模擬しますが、戻り値を設定するときはunittest.mock.MagicMockのインスタンスを直接返すのではなく、フューチャー(Future)としてラップするため、実際の関数呼び出しの際には、awaitキーワードを使用して結果を取得する必要があります。
FastAPIの依存関係注入(dependency injection)機構は、関数が同期関数か非同期関数かによって異なる方法で結果を取得します。非同期関数の場合、FastAPIは自動的にその結果を待つ(await)ことができます。しかし、テストケース内でunittest.mock.AsyncMockを使用して依存関係を模擬すると、戻り値はフューチャーとしてラップされ、FastAPIがその結果を待つことができないため、422エラーが発生します。
いけるパターン
async def override_get_current_admin_user():
return MagicMock(spec=AdminUser)
class TestDebug(TestStimulusCategoryRouter):
def test_should_return_200(self):
self.app.dependency_overrides[get_current_admin_user] = override_get_current_admin_user
response = self.client.get('/stimulus_categories/sample')
assert response.status_code == 200
モック関数は非同期、同期どちらでも動いたのでFutureでラップされなければ良いということなのかと思う。
response_model
response_model=SomeModel
のようにBeanieOdmのクラスを指定するとインスタンス生成時にDBへの接続が発生してしまう。
純粋なPydanticオブジェクトを生成し返却することでDBへの接続を回避できる。
trailing slash
@router.get('/')
のようにするとトレイルスラッシュがつくのでよきせぬ挙動になったりする。
/entities
のようにしたいのであれば、@router.get('')
で良い。
UploadFIle
FastAPIの UploadFile クラスは、Pythonの SpooledTemporaryFile をベースに作られています。UploadFile.read() を呼び出すと、ファイルの内容を読み込み、その"カーソル"(すなわち、ファイル内の現在位置を示すポインタ、または 'seek')はファイルの最後に移動します。そのため、同じ UploadFile インスタンスから再度 read() を呼び出すと、カーソルがすでにファイルの最後にあるため、結果として何も読み込まれず、空のバイト列(つまり、サイズ0のデータ)が返されます。
Exception Handler
カスタムハンドラーの登録はアプリケーションの初期化時にすべきで、スタートアップのタイミングではない。
app = FastAPI()
if config['ENV'] == 'production':
app.add_exception_handler(DuplicateKeyError, prod_duplicate_key_exception_handler)
elif config['ENV'] == 'development':
app.add_exception_handler(DuplicateKeyError, dev_duplicate_key_exception_handler)
上記は適切に動くが、下記は動かない(登録できない)
@app.on_event('startup')
async def startup_event():
if config['ENV'] == 'production':
app.add_exception_handler(DuplicateKeyError, prod_duplicate_key_exception_handler)
elif config['ENV'] == 'development':
app.add_exception_handler(DuplicateKeyError, dev_duplicate_key_exception_handler)
Generic Error Handler
個々の例外をいちいち定義するのがだるい時は、例外クラスの親クラスでまとめて定義もできる
DuplicateKeyErro -> WriteError -> OperationFailure -> PyMongoError -> Exception -> BaseException
上記のように継承しているのでグルーピングすべきクラスを対象にカスタムエラーハンドラーを定義する。
全部突っ込みたい時はpythonだと例外クラスはException、BaseExceptionを基本的に必ず継承しているらしいのでそこで補足すれば良い。
ここのクラスでカスタムハンドラーを定義するのとのトレードオフは個別のメッセージ定義とかステータスコードのカスタマイズが大雑把になる点
Clean Architecture / Depends example
logger cloud watch
SQLModel Alembic
同期
import importlib
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
from sqlmodel import SQLModel
from dotenv import dotenv_values
import os
# Load environment variables from .env file
config = dotenv_values(".env")
# Retrieve database connection info from environment variables
DB_USER = config.get("MYSQL_USER")
DB_PASSWORD = config.get("MYSQL_PASSWORD")
DB_HOST = config.get("MYSQL_HOST")
DB_NAME = config.get("MYSQL_DATABASE")
# Construct the database URL
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?charset=utf8"
# Update the Alembic config with the database URL
config = context.config
config.set_main_option('sqlalchemy.url', DATABASE_URL)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Function to dynamically import all models from the specified root directory
# This function retrieves all Python files in the specified directory and files which end with '_model.py'
def import_all_models(root_directory):
for dirpath, _, filenames in os.walk(root_directory):
for filename in [f for f in filenames if f.endswith('.py') and f != '__init__.py']:
if filename.endswith('_model.py'):
module_name = filename[:-3] # Remove .py extension
module_path = os.path.join(dirpath, module_name).replace(os.sep, '.')
importlib.import_module(module_path)
# Import all models in the specified root directory
import_all_models('src/module')
# Add your model's MetaData object here
# for 'autogenerate' support
target_metadata = SQLModel.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
非同期
import asyncio
import importlib
import os
from logging.config import fileConfig
from dotenv import dotenv_values
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from sqlmodel import SQLModel
# Load environment variables from .env file
config = dotenv_values(".env")
# Retrieve database connection info from environment variables
DB_USER = config.get("MYSQL_USER")
DB_PASSWORD = config.get("MYSQL_PASSWORD")
DB_HOST = config.get("MYSQL_HOST")
DB_NAME = config.get("MYSQL_DATABASE")
# Construct the database URL
DATABASE_URL = f"mysql+aiomysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?charset=utf8"
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Update the Alembic config with the database URL
config.set_main_option('sqlalchemy.url', DATABASE_URL)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Function to dynamically import all models from the specified root directory
# This function retrieves all Python files in the specified directory and files which end with '_model.py'
def import_all_models(root_directory):
for dirpath, _, filenames in os.walk(root_directory):
for filename in [f for f in filenames if f.endswith('.py') and f != '__init__.py']:
if filename.endswith('_model.py'):
module_name = filename[:-3] # Remove .py extension
module_path = os.path.join(dirpath, module_name).replace(os.sep, '.')
importlib.import_module(module_path)
# Import all models in the specified root directory
import_all_models('src/module')
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
script.py.mako
マイグレーションファイルのフォーマットをテンプレートして設定できるので、sqlmodelをインポートするようにしておくこと。
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# NOTE: This import is required for the Alembic autogeneration to work when use SQLModel
import sqlmodel
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
Specified Type
sa_typeでSQLAlchemyかSQLModelのタイプを指定できる。EnumでPython上は型付したいけどDBでは文字列としておいてほしいケース。Enumが変わっても自動生成時に変更検知してくれないので
language: AvailableLanguageEnum | None = Field(sa_type=AutoString)
Cascade
ORM Level
SQLModelが持ってないのでsa_relationship_kwargs
利用
class UserModel(TimestampModel, SQLModel, table=True):
__tablename__ = "user"
id: int | None = Field(primary_key=True, default=None)
name: str
language: AvailableLanguageEnum | None = Field(sa_type=AutoString)
timezone: TimeZoneEnum | None = Field(sa_type=AutoString)
icon_url: Optional[str] | None
auth_provider_email: Optional["UserAuthProviderEmailModel"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"cascade": "all, delete-orphan"}
)
DB Level
SQLModelが持ってないのでsa_column
を利用
class UserAuthProviderEmailModel(SQLModel, table=True):
__tablename__ = "user_auth_provider_email"
user_id: int = Field(
sa_column=Column(
"user_id",
ForeignKey("user.id", ondelete="CASCADE"),
primary_key=True,
),
)
email: str = Field(index=True, unique=True)
password: str
verified: bool = False
user: "UserModel" = Relationship(back_populates="auth_provider_email")
Multiple field Unique
class Example(SQLModel, table=True):
__table_args__ = (UniqueConstraint('workspace_id', 'user_id', name='unique_workspace_member'),)
workspace_id: int
user_id: int
FUllTEXT Index in mysql
sqlalchemyの機能を使えば以下の定義でalmebicのマイグレーションファイルをインデックスを作成するように生成できる。
class ContactModel(TimestampModel, SQLModel, table=True):
__tablename__ = "contact"
__table_args__ = (
Index(
'idx_contact_fulltext', # index name
'first_name', # colmuns
'last_name',
mysql_prefix='FULLTEXT',
mysql_with_parser='ngram'
),
)
Dynamic multi sorts
for sort in query.sorts:
sort_col = getattr(ContactModel, sort.field, None)
if sort_col is None:
raise RuntimeError(f"Invalid sort field: {sort.field} does not exist in ContactModel")
statement = statement.order_by(sort_col.desc() if sort.order == 'desc' else sort_col.asc())
文字列でフィールドや順序を指定したモデルが定義されているとし、.desc
や.asc
などのSqlAlchemyのカラムオブジェクトのメソッドを利用したい時には上記のようにすれば利用可能
Subquery and count
statement.subquery()
複雑な検索条件に合致するレコードが何件あるのか?などを調べるときにpostgresqlは一つのクエリでFindAndCountのようなことができるがmysqlなどはできないので、複雑な検索条件のクエリを定義した後にサブクエリ化して集計クエリを作成するなどが可能
count_statement = select(func.count()).select_from(statement.subquery())
contact_count = await self.db_session.exec(count_statement)
total_count = contact_count.first() # カウント結果
Flush & Refresh
DBへの反映(コミットなし)ならFlushだけ
autoincrement系のフィールドはflushのみでも反映される
DBへ反映した上でDBの最新の状態をモデルに反映させたいならRefreshが必要(created_at系など)
UUID
MySQLの CHAR(32) カラムが UUID を保存できるのは、UUIDが一般的にはハイフンを含む36文字の形式で表現されますが、実際の保存方法に違いがあるためです。
仕組み
UUIDの形式:
uuid.uuid4() が生成する UUID は、以下のような36文字の形式です: 550e8400-e29b-41d4-a716-446655440000。この形式にはハイフンが含まれます。
しかし、データベースに保存する際、ハイフンは省略されることが多く、UUIDは32文字の16進数文字列 (550e8400e29b41d4a716446655440000) に変換されます。
MySQLでの保存:
CHAR(32) はハイフンを除いた32文字の文字列を保存するためのフィールドです。UUIDを保存するときに、SQLAlchemy やアプリケーションロジックが自動的にハイフンを取り除いて保存します。
これにより、MySQLの CHAR(32) カラムにはハイフンが除かれた32文字のUUIDが保存されるため、保存や取得時に問題が生じません。
UUIDの保存方法の管理:
SQLAlchemyや他のORMは、UUIDを扱う際に、ハイフンを含む形式から32文字の形式に変換して保存し、必要に応じてハイフン付きの形式に戻すことができます。つまり、保存時には内部的にハイフンを取り除き、取得時にハイフンを付ける処理を行います。
結論
CHAR(32) のカラムに保存できるのは、UUIDが保存されるときに自動的にハイフンが除かれた32文字の文字列に変換されているためです。
発行されたSQL見ると確かにハイフン除去文字列が代入されている
SQLModel avoid Circular
E sqlalchemy.exc.InvalidRequestError: When initializing mapper Mapper[WorkspaceModel(workspace)], expression 'WorkspaceMemberModel' failed to locate a name ('WorkspaceMemberModel'). If this is a class name, consider adding this relationship() to the <class 'src.module.workspace.infrastructure.orm.workspace_model.WorkspaceModel'> class after both dependent classes have been defined.
上記のようなエラーで循環参照形のエラーかと思いきやテーブルが適切に作れてなかったなど別なことが原因のことがあるので注意
SQLAlchemy Exception
async def test_should_raise_duplicate_error_when_user_id_duplicate(
self,
auth_repo,
user_model_factory,
user_auth_provider_google_factory,
db_session
):
[user] = await user_model_factory.create()
[provider] = user_auth_provider_google_factory.make(user_id=user.id)
auth_repo.create_google_provider(provider)
await db_session.commit()
with pytest.raises(IntegrityError) as exc:
auth_repo.create_google_provider(provider)
await db_session.commit()
print(type(exc.value.code))
print(exc.value.code)
print(type(exc.value.detail))
print(exc.value.detail)
print(type(exc.value.orig.args))
print(exc.value.orig.args)
print(type(exc.value.params))
print(exc.value.params)
print(type(exc.value.statement))
print(exc.value.statement)
<class 'str'>
gkpj
<class 'list'>
[]
<class 'tuple'>
(1062, "Duplicate entry '1' for key 'user_auth_provider_google.PRIMARY'")
<class 'tuple'>
(1, 'jhampton@example.com', 'c7be21a3-fdce-438a-b5e9-85135890ce85')
<class 'str'>
INSERT INTO user_auth_provider_google (user_id, email, client_id) VALUES (%s, %s, %s)
SQLModel implicit IO
上記はリレーションがプロパティアクセスされるまでロードされない仕様なので、プロパティにアクセスするときにIOが発生してAsync/Awaitじゃないとダメだよエラーの解決方法っぽいが、今回発生したのは、テストファクトリでモデルを作成し、テスト用のメソッドで同じモデルを取得したらそれによってファクトリで取得していたモデルのプロパティアクセスの際にIOが発生するというケース
どのような回避が望ましいか検討
sqlmodelで`.refresh(model)を実行するとselectでIOがそもそも発生してる。
sqlmodelでinsertしてrefreshせずにプロパティにアクセスするとIO発生
特に何も設定していない場合にはプロパティアクセス自体をawaitにしても効果がない(当然
sessionからデタッチされた判定になるかもしくは初回アクセスの時にIOが発生するっぽい
debugger
def check_object_state(obj):
# InstanceStateオブジェクトを取得
state = obj._sa_instance_state
# セッションが存在するかを確認
session = state.session
# デタッチされているかを確認
detached = state.detached
print(f"Session: {session}")
print(f"Detached: {detached}")
セッションからデタッチされていなくてもIOが発生するケースがある
table_a.commit -> table_b.commitの後にtable_aにインサートしてリフレッシュしたオブジェクトプロパティにアクセスするとIOが発生
追加コミットが発生した時にexpireマークされ自動的に最新の値をとりに行こうとしているかもしれない?
async def get_db_session():
async with AsyncSession(engine, expire_on_commit=False) as session:
yield session
AsyncSessionの親クラスで任意のキーワード引数(多分SQLAlchemyの)を取れるようになってるので上記のようにしてcommit時にexpire扱いにさせないようにすることで修正完了。
pytest caplog empty
唯一ワークしたのは、クラスのコンストラクタ引数でロガーをインスタンス化した時と、例外のキャッチの中でインスタンス化した時は機能したがそれ以外のケースだとなぜかインスタンス化はされるがログが出力されない
SQLModel Relationship
SQLAlchemy/SQLModelのリレーションシップ取得方法の4つのパターン
1. 遅延ロード(Lazy Loading)
- 説明: リレーションにアクセスした瞬間にクエリが発行され、データベースからフェッチされる。
- クエリの実行タイミング: 親エンティティ取得後、リレーションプロパティにアクセスしたタイミングでクエリが発行される。
-
最大クエリ数: N+1クエリ(親エンティティに対して1クエリ、各リレーションに対して1クエリずつ)。
- 例: 親エンティティに対して1クエリ、関連する10のリレーションがある場合、最大で11クエリ発行される。
クエリの例:
- 親エンティティの取得
SELECT parent.id, parent.name FROM parent
- リレーションプロパティにアクセスしたタイミングでクエリが発行される
SELECT child.id, child.name, child.parent_id FROM child WHERE child.parent_id = :parent_id
joinedload
2. - 説明: 親エンティティを取得する際に、関連するリレーションのデータもJOINして1つのクエリで取得する。
- クエリの実行タイミング: 親エンティティを取得するときにリレーションも同時に取得。
- 最大クエリ数: 1クエリ(全ての親エンティティとリレーションをJOINして1つのクエリで取得)。
クエリの例:
- 親エンティティとリレーションの取得
SELECT parent.id, parent.name, child.id, child.name FROM parent LEFT JOIN child ON parent.id = child.parent_id
subqueryload
3. - 説明: 親エンティティの取得とは別に、関連するリレーションのデータをサブクエリで取得する。
- クエリの実行タイミング: 親エンティティを取得した後で、リレーションをサブクエリで一括取得。
- 最大クエリ数: 2クエリ(1クエリで親エンティティを取得し、もう1クエリでリレーションをサブクエリで取得)。
クエリの例:
- 親エンティティの取得
SELECT parent.id, parent.name FROM parent
- リレーションのサブクエリでの取得
SELECT child.id, child.name, child.parent_id FROM child WHERE child.parent_id IN (SELECT parent.id FROM parent)
selectinload
4. - 説明: 親エンティティの取得後、関連するリレーションのデータをIN句で一括取得する。
- クエリの実行タイミング: 親エンティティを取得した後で、リレーションをIN句で取得。
- 最大クエリ数: 2クエリ(1クエリで親エンティティを取得し、もう1クエリでリレーションをIN句で取得)。
クエリの例:
- 親エンティティの取得
SELECT parent.id, parent.name FROM parent
- リレーションのIN句による取得
SELECT child.id, child.name, child.parent_id FROM child WHERE child.parent_id IN (:parent_ids)
比較表
ロード方法 | クエリの発行タイミング | 最大クエリ数 | クエリの特徴 |
---|---|---|---|
遅延ロード | リレーションプロパティにアクセス時 | N+1クエリ | 各リレーションにアクセス時に個別にクエリ発行 |
joinedload |
親エンティティ取得時に同時にリレーション取得 | 1クエリ | 親とリレーションを1クエリでJOINして取得 |
subqueryload |
親エンティティ取得後、サブクエリでリレーション取得 | 2クエリ | サブクエリで一括取得 |
selectinload |
親エンティティ取得後、IN句でリレーション取得 | 2クエリ | IN句で一括取得 |
Create with relationship
本当はflushやcommitしないとID参照できないけど、リレーションが定義されていればSQLModelが適切な順序での作成と、外部キーの設定をやってくれる
SQLModel Update
model.field = new_value
のようにして更新したいモデルのフィールドに新しい値をセットした後で該当のモデルをセッションに追加しflushかけるとちゃんとマルチフィールドアップデートのクエリが発行される。
賢い
flushの影響
flushするとsessionステートのdirty, newが消える。多分DBに反映したからcommitしてなくてもdirtyじゃないよね判定?
Background Task
バックグラウンドタスクで、DBセッションなどを扱うのであれば通常のリクエストフローで開始、終了が管理されているセッションを使うと閉じなかったりコミットしなかったりして問題になるので、バックグラウンドタスク用の独立した開始、終了を明示的に宣言できるセッションを利用しましょう
Blocking trouble shoot
thread