🐍

Python の Web Framework FastAPI でトークン認証!

2021/07/19に公開

はじめに

Web サイトにおいて、アカウント認証が必要となる機会はよくあることかと思います。

先日、FastAPI について調べたので、今回は FastAPI を使ったトークン認証について、調べてみました。

python では JWT(Json Web Token)を使う際に jose パッケージを使っていきます。

他のフレームワークを使っても同じような仕組みでトークン認証は可能となりますので、参考になれば幸いです。

トークン認証とは

一般的なトークン認証&それを使ったリクエストの流れは以下になります。

トークンを作る際は、日付情報など一意になるような値を混ぜてハッシュ化する必要があります。

トークンの中身は、ヘッダー.ペイロード.署名 という形になっています。

また、JWT の中身をチェックするには、https://jwt.io/ を使ってみてください。

今回は、なるべく簡潔にトークン発行、検証、更新、削除のコードを示していきます。

実際のコード

準備

  1. requirements.txt に以下を記述
uvicorn==0.13.3
fastapi==0.63.0
python-multipart==0.0.5
peewee==3.14.3
python-jose==3.2.0
  1. 必要なパッケージをインストール
$ pip install -r requirements.txt

DBのモデルを作成

User モデルを作成します。 今回は、アカウント作成はスコープ外なので、実行時に tester アカウントを作るようにしています。

from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id: int = AutoField(primary_key=True)
    name: str = CharField(100)
    password: str = CharField(100)
    refresh_token: str = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])
User.create(name='tester', password='tester')

トークン発行

from datetime import datetime, timedelta

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id: int = AutoField(primary_key=True)
    name: str = CharField(100)
    password: str = CharField(100)
    refresh_token: str = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_user(username: str, password: str) -> bool:
    user = User.get(name=username)
    return user.password == password


def create_new_token(user_id: int) -> str:
    payload = {
        'exp': datetime.utcnow() + timedelta(days=90),
        'user_id': user_id,
    }
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    User.update(refresh_token=token).where(User.id == user_id).execute()
    return token


@app.post("/token/")
async def create(form: OAuth2PasswordRequestForm = Depends()):
    if not check_valid_user(form.username, form.password):
        raise HTTPException(status_code=401, detail='Invalid password')

    user = User.get(name=form.username)
    token = create_new_token(user.id)

    return {"detail": "Success", "token": token}

トークン検証

from fastapi import FastAPI, HTTPException
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id: int = AutoField(primary_key=True)
    name: str = CharField(100)
    password: str = CharField(100)
    refresh_token: str = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
    return len(User.select().where(User.refresh_token == token)) != 0


@app.put("/token/validate/{token}")
async def validate(token: str):
    if not check_valid_token(token):
        raise HTTPException(status_code=401, detail='Invalid token')

    return {"detail": "Valid"}

トークン更新

from datetime import datetime, timedelta

from fastapi import FastAPI, HTTPException
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id: int = AutoField(primary_key=True)
    name: str = CharField(100)
    password: str = CharField(100)
    refresh_token: str = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
    return len(User.select().where(User.refresh_token == token)) != 0


def create_new_token(user_id: int) -> str:
    payload = {
        'exp': datetime.utcnow() + timedelta(days=90),
        'user_id': user_id,
    }
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    User.update(refresh_token=token).where(User.id == user_id).execute()
    return token


def get_user_from_token(token: str) -> User:
    users = User.select().where(User.refresh_token == token)

    return None if len(users) == 0 else users[0]


@app.put("/token/refresh/{token}")
async def refresh(token: str):
    if not check_valid_token(token):
        raise HTTPException(status_code=401, detail='Invalid token')

    user = get_user_from_token(token)
    token = create_new_token(user.id)

    return {"detail": "Success", "token": token}

トークン削除

from datetime import datetime, timedelta

from fastapi import FastAPI, HTTPException
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
    id: int = AutoField(primary_key=True)
    name: str = CharField(100)
    password: str = CharField(100)
    refresh_token: str = TextField(null=True)

    class Meta:
        database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
    return len(User.select().where(User.refresh_token == token)) != 0


def create_new_token(user_id: int) -> str:
    payload = {
        'exp': datetime.utcnow() + timedelta(days=90),
        'user_id': user_id,
    }
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    User.update(refresh_token=token).where(User.id == user_id).execute()
    return token


def get_user_from_token(token: str) -> User:
    users = User.select().where(User.refresh_token == token)

    return None if len(users) == 0 else users[0]


def delete_token(user_id: int):
    User.update(refresh_token=None).where(User.id == user_id).execute()


@app.delete("/token/{token}")
async def delete(token: str):
    if not check_valid_token(token):
        raise HTTPException(status_code=401, detail='Invalid token')

    user = get_user_from_token(token)
    delete_token(user.id)

    return {"detail": "Success"}

実行

$ uvicorn main:app

おわりに

いかがでしたか。

そんなに詳しくない方でも比較的簡単に理解出来たかと思います。

Django などのフルスタックフレームワークを使うとデフォルトのアカウント管理機能がありますが、

バックエンドのフレームワークでは、自分でアカウント管理機能を実装しないといけないこともあるかと思います。

今回示した例では、出来る限り簡単に、理解できるように記述しましたので、リファクタリングしたい点はいくつもあるかと思います・・

参考程度に見ていただけると良いかと思います。

お知らせ

Webサイト・ツール・LP作成のご依頼は、

https://iteek.jp/contact/

こちらからお問い合わせいただけます。お気軽にご相談ください。

参考

https://fabeee.co.jp/blog/mattsun01/

Discussion