🕰️

IAMの力を借りて、簡易TOTPを実装したい

2023/12/29に公開

発端

『社内アプリで全員が実行できるようにしたくない』
『でも、パスワード管理したくない……せやOTP持ってたら実行できるようにしよ』
『OTP周りは自分で実装・管理したくない!』
そんなお気持ちからはIAM Userに登録される仮想デバイスを使ってOTPの認証をAPI Gateway+AWS LambdaでTOTPを検証するエンドポイントを実装したお話

前提

この辺りのセキュリティに関しては……突き詰めれば青天井なので『以下ができればいいかな。』という前提

  • one region
    • 今回全部 ap-northeast-1 にデプロイ
  • カスタムドメインは使わない
  • OTPの仮想デバイスの共有方法は考えない
    • 大抵の場合は個人に紐づくものであるが、1passwordとかの機能を使うと他人と共有することは可能なため
    • このツールの目的はあらかじめ複数人が共有できる方法を持っているものとする
  • 利用可能なMFAデバイスは1つに絞る、複数に対応する場合は実装を変更する必要あり
  • OTPに使うためには、IAM Userからアクセスキー・シークレットアクセスキーは発行するのは仕方ない
    • 但しこのIAM Userには権限を一切つけない(=シークレットアクセスキーが漏れた場合被害最小限に抑える)
    • もうちょい被害最小限にしたい場合は、AWS Secrets Managerの値をローテションするAWS Lambdaを作成するのを検討するとよい、かも

準備

IAM User作成

IAM>ユーザー>「ユーザーの作成」

何も持たないIAM Userを作成し、アクセスキーを発行。マネージドコンソールにはアクセスさせないので、「AWS マネジメントコンソールへのユーザーアクセスを提供する - オプション」にはチェックを入れない

「ポリシーを直接アタッチ」を選択、許可ポリシーは何も一つ選択せず「次へ」

「ユーザーの作成」をし、IAM Userを作成

多要素認証(MFA)を設定

MFAの識別子(画像例だと arn:aws:iam::000000000000:mfa/test-otp の部分)をメモしておく

アクセスキー・シークレットアクセスキーを発行

作成後に表示される「アクセスキー」と「シークレットアクセスキー」をメモしておく

AWS Secrets Managerにメモした値を登録

コードに埋め込みたくないので、AWS Secrets Managerに登録して、そこから引っ張って来るようにします。
シークレット名はコードでも使うので、 OTP_USER_SECRET で登録しておきます。

serverless framework のプロジェクト作成

serverless framework のプロジェクト作成して諸々設定していきます。

プロジェクト作成

$ mkdir -p /path/to
$ cd /path/to

$ serverless create --template aws-python3 --name verify_otp

✔ Project successfully created in "./" from "aws-python3" template (2s)

Need a faster logging experience than CloudWatch? Try our Dev Mode in Console: run "serverless dev"

$ tree .
.
├── handler.py
└── serverless.yml

1 directory, 2 files

plugin serverless-python-requirements を導入

serverless.ymlを下記のように編集。

serverless.yml
service: verify-otp
frameworkVersion: "3"

provider:
  name: aws
  runtime: python3.9
  stage: dev
  region: ap-northeast-1

  iam:
    role:
      statements:
        - Effect: "Allow"
          Action:
            - "secretsmanager:GetSecretValue"
            - "sts:GetTokenSession"
          Resource:
            - "*"
  apiGateway:
    apiKeys:
      - name: ${self:service}-${self:provider.stage}
        value: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

plugins:
  - serverless-python-requirements

package:
  patterns:
    - "!./**"
    - handler.py

functions:
  verify_otp:
    handler: handler.handler
    timeout: 29
    environment:
      OTP_USER_SECRET: OTP_USER_SECRET
    events:
      - http:
          path: verify-otp
          method: post
          private: true

pluginを入れていく

$ serverless plugin install -n serverless-python-requirements

✔ Plugin "serverless-python-requirements" installed  (31s)

仮想環境作成、必要パッケージインストール

仮想環境を作成して、パッケージをインストールします。
boto3-stubsはvscodeで補完出力で重宝します。
pydanticはjson->Classの変換サボるのに使用、1系を指定してるのはCPUアーキテクチャの問題でM1MacからDeployしてAWSLambda(x86_64)で動かそうとするとimportでコケた。
pydanticを2系で使うことに対してこだわりなかったので、今回は1系で実装しました。
もちろん、解決できるなら2系のほうが望ましいと思います。

$ python -mvenv .venv
$ . .venv/bin/activate
(.venv) $ pip install -U pip
(.venv) $ pip install "boto3-stubs[secretsmanager,sts]" "pydantic<2"

# Deployに必要なので、requirements.txtを作成しておく
(.venv) $ pip freeze > requirements.txt

実装

AWS Secrets Manager に保存した情報から、AWS STSのboto3クライアントを作成し、 get_session_token の成功有無で判定

handler.py
import json
import logging
import os
import re
from typing import Any

import boto3
from botocore.exceptions import ClientError
from mypy_boto3_sts import STSClient
from pydantic import BaseModel

logger = logging.getLogger()


class OtpUserSecret(BaseModel):
    access_key_id: str
    secret_access_key: str
    mfa_arn: str


def fetch_otp_secret() -> OtpUserSecret:
    sm = boto3.client("secretsmanager")
    resp = sm.get_secret_value(SecretId=os.environ["OTP_USER_SECRET"])
    return OtpUserSecret.parse_raw(resp["SecretString"])


def build_sts_client(opt_user_secret: OtpUserSecret) -> STSClient:
    """
    OTPを検証するためのSTSクライアントを作成する
    """
    return boto3.client(
        "sts",
        aws_access_key_id=opt_user_secret.access_key_id,
        aws_secret_access_key=opt_user_secret.secret_access_key,
    )


def verify_otp(otp: str) -> bool:
    """
    OTPが有効かどうかを確認する
    """
    otp_user_secret = fetch_otp_secret()
    sts_client = build_sts_client(otp_user_secret)

    try:
        # 数字6桁の文字列ではない場合は失敗扱いにする
        if not re.match(r"^\d{6}$", otp):
            logger.warning("invalid otp format: %s", otp)
            return False

        # 取得したSession情報を使わないので、最小値の900秒で取得
        sts_client.get_session_token(
            DurationSeconds=900,
            SerialNumber=otp_user_secret.mfa_arn,
            TokenCode=otp,
        )
        return True
    except ClientError as e:
        # トークンが無効な場合や、その他のエラーが発生した場合は、
        # ClientErrorが発生する
	
        # トークンが無効な場合は、AccessDeniedが発生する
        if e.response["Error"]["Code"] == "AccessDenied":
            # トークンが無効な場合は、Message内に"invalid MFA one time pass code."が含まれる
            if "invalid MFA one time pass code." in e.response["Error"]["Message"]:
                # トークンが無効な場合はFalseを返す
                logger.warning("invalid OTP code.", exc_info=True)
                return False

        # それ以外のエラーはそのままはraiseする
        raise


def handler(event: dict, context: Any) -> dict:
    logger.info("event: %s", event)
    try:
        body = json.loads(event["body"])
        logger.info("body: %s", body)
        if "otp" not in body:
            return {
                "statusCode": 400,
                "body": json.dumps(
                    {
                        "message": "otp is required",
                    }
                ),
            }

        response = verify_otp(body["otp"])
        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "verify": response,
                }
            ),
        }
    except Exception:
        logger.exception("Failed to verify_otp")
        return {
            "statusCode": 500,
            "body": json.dumps(
                {
                    "error_message": "Internal Server Error",
                }
            ),
        }

Deploy、動作検証

AWSへdeployしたら、apiKeyとAPI GatewayのEndpointがわかる。

$ serverless deploy

Deploying verify-otp to stage dev (ap-northeast-1)

✔ Service deployed to stack verify-otp-dev (52s)

api keys:
  verify-otp-dev: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
endpoint: POST - https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/verify-otp
functions:
  verify_otp: verify-otp-dev-verify_otp (18 MB)

Improve API performance – monitor it with the Serverless Console: run "serverless --console"

responseで

  • 成功なら {"verify": true}
  • 失敗なら {"verify": false}

が返ってくる

$ curl -XPOST \
  -H x-api-key:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
  https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/verify-otp \ 
  -d '{"otp": "231896"}'
{"verify": true}

$ curl -XPOST \
  -H x-api-key:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
  https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/verify-otp \ 
  -d '{"otp": "123456"}'
{"verify": false}

request bodyがないと500エラーが返ってくる

$ curl -i -XPOST \
  -H x-api-key:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
  https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/verify-otp
HTTP/2 500 
content-type: application/json
content-length: 42
date: Thu, 28 Dec 2023 20:33:07 GMT
x-amzn-requestid: 07afcab1-8719-4d7c-b469-72f84c9ee2de
x-amz-apigw-id: xxxxxxxxxxxxxxxx
x-amzn-trace-id: Root=1-xxxxxxxx-yyyyyyyyyyyyyyyyyyyyyyyy;Sampled=0;lineage=zzzzzzzz:0
x-cache: Error from cloudfront
via: 1.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.cloudfront.net (CloudFront)
x-amz-cf-pop: KIX56-P1
x-amz-cf-id: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_yyyyy_zzzzzzzzzzzzz==

{"error_message": "Internal Server Error"} 

まとめ

正直作ったはいいけどこれが採用される日が来るのかわかりません(年末謎のノリ)
チーム内NGが出たら日の目は浴びないツールなので、ここで供養しときます。

実装の参考にはなると思いますが、自己責任で運用してください。

Discussion