🐕

S3で静的サイトを公開するときのアクセス制御について。lambda@edgeでjwt検証

2024/12/28に公開

以下の記事の続き「2. lambda@edgeを使ったjwt検証」編。
認証の仕組みはAuth0を使う。

アーキテクチャ

cdkバージョン

"aws-cdk": "2.173.2",

CDKコード

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as path from 'path'
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';

export class S3StaticSiteStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);

        /* 制限をつけたいサイト */
        const bucket = new cdk.aws_s3.Bucket(this, 'BucketRestricted', {
            removalPolicy: cdk.RemovalPolicy.DESTROY, // テスト用なので削除可能とする
            autoDeleteObjects: true, // テスト用なので削除可能とする
            cors: [{
                allowedMethods: [cdk.aws_s3.HttpMethods.GET],
                allowedOrigins: ['*'],
                allowedHeaders: ['*']
            }]
        });
        // jwt検証lambda@edge
        const codePath = path.join(__dirname, './lambda');
        const lambdaExecutionRole = new cdk.aws_iam.Role(this, 'LambdaEdgeExecutionRole', {
            roleName: "LambdaEdgeExecutionRole",
            assumedBy: new cdk.aws_iam.CompositePrincipal(
                new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com'),
                new cdk.aws_iam.ServicePrincipal('edgelambda.amazonaws.com') // edgelambdaの信頼ポリシーが必要
            ),
            managedPolicies: [
                cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole'),
                cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName('AWSLambda_ReadOnlyAccess'),
            ],
        });
        const lambdaFunction = new PythonFunction(this, 'FuncVerifyJwt2',
            {
                functionName: "FuncVerifyJwt",
                entry: codePath,
                index: 'index.py',
                handler: 'handler',
                runtime: cdk.aws_lambda.Runtime.PYTHON_3_11,
                role: lambdaExecutionRole,
                timeout: cdk.Duration.seconds(5) // ビューワーリクエストのlambda@edgeは最大5秒.snapStartはlambda@edgeではサポートされない
            },
        );

        // cloud front distributionを追加
        new cdk.aws_cloudfront.Distribution(this, 'DistributionRestricted', {
            defaultRootObject: 'index.html',
            defaultBehavior: {
                origin: cdk.aws_cloudfront_origins.S3BucketOrigin.withOriginAccessControl(bucket),
                cachePolicy: cdk.aws_cloudfront.CachePolicy.CACHING_OPTIMIZED,
                edgeLambdas:[
                    {
                        functionVersion: lambdaFunction.currentVersion,
                        eventType:cdk.aws_cloudfront.LambdaEdgeEventType.VIEWER_REQUEST
                    }
                ]
            }
        });
    }
}

lambdaコード

import jwt
import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers

# Auth0設定
AUTH0_DOMAIN = "auth0のドメイン"
AUTH0_AUDIENCE = "auth0のオーディエンス"


# JWT公開鍵取得関数
def get_jwks():
    jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
    response = requests.get(jwks_url)
    return response.json()


# 公開鍵をPEM形式に変換
def get_public_key(jwks, kid):
    for key in jwks["keys"]:
        if key["kid"] == kid:
            public_numbers = RSAPublicNumbers(
                n=int.from_bytes(jwt.utils.base64url_decode(key["n"]), "big"),
                e=int.from_bytes(jwt.utils.base64url_decode(key["e"]), "big"),
            )
            public_key = public_numbers.public_key()
            return public_key.public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo,
            )
    raise Exception("Unable to find appropriate key")


# JWT検証関数
def verify_jwt(token):
    jwks = get_jwks()
    unverified_header = jwt.get_unverified_header(token)

    try:
        rsa_pem = get_public_key(jwks, unverified_header["kid"])
        payload = jwt.decode(
            token,
            rsa_pem,
            algorithms=["RS256"],
            audience=AUTH0_AUDIENCE,
            issuer=f"https://{AUTH0_DOMAIN}/",
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise Exception("Token has expired")
    except jwt.InvalidAudienceError:
        raise Exception("Invalid audience, check the audience claim")
    except jwt.InvalidIssuerError:
        raise Exception("Invalid issuer, check the issuer claim")
    except jwt.InvalidTokenError as e:
        raise Exception(f"Invalid token: {e}")


# Lambda@Edge ハンドラー関数
def handler(event, context):
    request = event["Records"][0]["cf"]["request"]
    headers = request["headers"]

    try:
        # Authorizationヘッダーからトークンを取得
        auth_header = headers.get("authorization", [])
        if not auth_header:
            return {
                "status": "401",
                "statusDescription": "Unauthorized",
                "body": "Authorization header missing",
            }

        token = auth_header[0]["value"].split(" ")[1]  # Bearerトークンを抽出

        # トークンの検証
        payload = verify_jwt(token)

        # ここから下はちょい蛇足
        # JWTのペイロードからユーザーIDを取得(例: `sub`クレーム)
        user_id = payload.get("sub")
        user_role = payload.get("role", "guest")  # カスタムクレーム例

        # ロールベースのアクセス制御(例)
        if user_role != "guest":
            return {
                "status": "403",
                "statusDescription": "Forbidden",
                "body": f'Access denied: User role "{user_role}" is not authorized',
            }

        # ペイロード情報をリクエストに追加(例)
        request["headers"]["x-user-id"] = [{"key": "X-User-Id", "value": user_id}]
        request["headers"]["x-user-role"] = [{"key": "X-User-Role", "value": user_role}]

        # 検証成功時はリクエストをそのまま通過させる
        return request

    except Exception as e:
        return {
            "status": "403",
            "statusDescription": "Forbidden",
            "body": f"Access denied: {str(e)}",
        }

結果

成功

$ curl 'https://d10nweq0z4cqez.cloudfront.net/' \
  -H 'authorization: bearer JWTの文字列'
<!DOCTYPE html>
<html>
    <head><title>Hello, World!</title></head>
    <body>Hello, World!</body>
</html>% 

authorizationなし

$ curl 'https://d10nweq0z4cqez.cloudfront.net/'  
Authorization header missing% 

Access denied

$ curl 'https://d10nweq0z4cqez.cloudfront.net/' \
  -H 'authorization: bearer 間違ったJWT文字列' 
Access denied: Invalid crypto padding%  

ポイント

  • lambda@edgeについて
    • us-east-1でしか使えない
    • ビューワーリクエストの場合は5秒間しか動作できない
    • snapStart非対応
    • cdk destroyでエラーになって削除できない
  • 検証lambdaに付いて
    • auth0で公開している公開鍵を取得
    • jwtに含まれるkidを使って公開鍵を特定
    • 公開鍵をPEM形式に変更する
    • decodeして検証する

lambda@edgeの削除

以下のエラーになります。

"Lambda was unable to delete arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:FuncVerifyJwt:7 because it is a replicated function.

以下の手順で削除できます。

  1. stackを矯正削除する

  2. 削除後数分待ってコンソールで削除する(5分くらい待つと削除できました)

Discussion