🔐

FastAPIでrefresh tokenを実装してみた

2024/07/07に公開

概要

前回fastapiでログイン機能を実装しました。

https://zenn.dev/tnakano/articles/a2245ec1b55c63

def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=30)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

作成したアクセストークンにexpires_delta で有効期限を設定しました。万が一漏洩しても、期限が設定されているため、リスクを軽減できる一方、アプリを操作している間に有効期限が来ると、認証無効が返却されます。毎回再ログインするのは、ユーザからすると面倒臭い。。

そこで今回はRefresh Tokenを設定してみます。

Refresh Token

リフレッシュトークンとは、アクセストークンを再発行するために使用されるトークンです。

  • ユーザはログイン時にアクセストークンとリフレッシュトークンを取得します。
  • リフレッシュトークンはアクセストークンより長い有効期限を保持します。以下に例を示します。
    • アクセストークン:30分
    • リフレッシュトークン:1週間
  • アクセストークンの有効期限が切れた場合、リフレッシュトークンを使用して認証を行い、アクセストークンとリフレッシュトークンを再発行します。

ログイン処理

ログイン時にアクセストークンとリフレッシュトークンを取得します。
アクセストークン、リフレッシュトークンはuser.id と有効期限時刻から生成されるため、毎回異なる文字列が生成されます。

ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

@router.post("/login")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: SessionLocal = Depends(get_db)):
    user = authenticate_user(form_data.username, form_data.password, db)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    
    access_token = create_token(
        data={"sub": user.id}, expires_delta=access_token_expires
    )
    refresh_token = create_token(
        data={"sub": user.id}, expires_delta=refresh_token_expires
    )

    return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}

リフレッシュトークン処理

JWTトークンは署名が含まれるため、user_idが復元できそのユーザがDBに存在すれば認証とします。
認証後、アクセストークンとリフレッシュトークンを再生成します。

@router.post('/refresh')
def refresh(token: str, db: SessionLocal = Depends(get_db)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except InvalidTokenError:
        raise credentials_exception

    user = user_db.find_by_id(user_id=user_id, db=db)
    if user is None:
        raise credentials_exception

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_token(
        data={"sub": user.id}, expires_delta=access_token_expires
    )

    refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    refresh_token = create_token(
        data={"sub": user.id}, expires_delta=refresh_token_expires
    )

まとめ

リフレッシュトークンは有効期限が長く、アクセストークンを再発行できるため、セキュリティリスクが存在します。ただし、アクセストークンが無効な場合のみリクエストされるため、使用頻度が低く、盗難リスクを抑えることができます。

ユーザがアクセストークンが切れる度に、ユーザが再ログインする必要なくなりました。セキュリティトークンを適切に失効するなど、もっと堅牢な仕組みはありそうですが、今回はここまでとします。

Discussion