🐕
S3で静的サイトを公開するときのアクセス制御について。lambda@edgeでjwt検証
以下の記事の続き「2. lambda@edgeを使ったjwt検証」編。
認証の仕組みはAuth0を使う。
アーキテクチャ
cdkバージョン
"aws-cdk": "2.173.2",
CDKコード
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as path from 'path'
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
export class S3StaticSiteStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
/* 制限をつけたいサイト */
const bucket = new cdk.aws_s3.Bucket(this, 'BucketRestricted', {
removalPolicy: cdk.RemovalPolicy.DESTROY, // テスト用なので削除可能とする
autoDeleteObjects: true, // テスト用なので削除可能とする
cors: [{
allowedMethods: [cdk.aws_s3.HttpMethods.GET],
allowedOrigins: ['*'],
allowedHeaders: ['*']
}]
});
// jwt検証lambda@edge
const codePath = path.join(__dirname, './lambda');
const lambdaExecutionRole = new cdk.aws_iam.Role(this, 'LambdaEdgeExecutionRole', {
roleName: "LambdaEdgeExecutionRole",
assumedBy: new cdk.aws_iam.CompositePrincipal(
new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com'),
new cdk.aws_iam.ServicePrincipal('edgelambda.amazonaws.com') // edgelambdaの信頼ポリシーが必要
),
managedPolicies: [
cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole'),
cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName('AWSLambda_ReadOnlyAccess'),
],
});
const lambdaFunction = new PythonFunction(this, 'FuncVerifyJwt2',
{
functionName: "FuncVerifyJwt",
entry: codePath,
index: 'index.py',
handler: 'handler',
runtime: cdk.aws_lambda.Runtime.PYTHON_3_11,
role: lambdaExecutionRole,
timeout: cdk.Duration.seconds(5) // ビューワーリクエストのlambda@edgeは最大5秒.snapStartはlambda@edgeではサポートされない
},
);
// cloud front distributionを追加
new cdk.aws_cloudfront.Distribution(this, 'DistributionRestricted', {
defaultRootObject: 'index.html',
defaultBehavior: {
origin: cdk.aws_cloudfront_origins.S3BucketOrigin.withOriginAccessControl(bucket),
cachePolicy: cdk.aws_cloudfront.CachePolicy.CACHING_OPTIMIZED,
edgeLambdas:[
{
functionVersion: lambdaFunction.currentVersion,
eventType:cdk.aws_cloudfront.LambdaEdgeEventType.VIEWER_REQUEST
}
]
}
});
}
}
lambdaコード
import jwt
import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
# Auth0設定
AUTH0_DOMAIN = "auth0のドメイン"
AUTH0_AUDIENCE = "auth0のオーディエンス"
# JWT公開鍵取得関数
def get_jwks():
jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
response = requests.get(jwks_url)
return response.json()
# 公開鍵をPEM形式に変換
def get_public_key(jwks, kid):
for key in jwks["keys"]:
if key["kid"] == kid:
public_numbers = RSAPublicNumbers(
n=int.from_bytes(jwt.utils.base64url_decode(key["n"]), "big"),
e=int.from_bytes(jwt.utils.base64url_decode(key["e"]), "big"),
)
public_key = public_numbers.public_key()
return public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
raise Exception("Unable to find appropriate key")
# JWT検証関数
def verify_jwt(token):
jwks = get_jwks()
unverified_header = jwt.get_unverified_header(token)
try:
rsa_pem = get_public_key(jwks, unverified_header["kid"])
payload = jwt.decode(
token,
rsa_pem,
algorithms=["RS256"],
audience=AUTH0_AUDIENCE,
issuer=f"https://{AUTH0_DOMAIN}/",
)
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token has expired")
except jwt.InvalidAudienceError:
raise Exception("Invalid audience, check the audience claim")
except jwt.InvalidIssuerError:
raise Exception("Invalid issuer, check the issuer claim")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {e}")
# Lambda@Edge ハンドラー関数
def handler(event, context):
request = event["Records"][0]["cf"]["request"]
headers = request["headers"]
try:
# Authorizationヘッダーからトークンを取得
auth_header = headers.get("authorization", [])
if not auth_header:
return {
"status": "401",
"statusDescription": "Unauthorized",
"body": "Authorization header missing",
}
token = auth_header[0]["value"].split(" ")[1] # Bearerトークンを抽出
# トークンの検証
payload = verify_jwt(token)
# ここから下はちょい蛇足
# JWTのペイロードからユーザーIDを取得(例: `sub`クレーム)
user_id = payload.get("sub")
user_role = payload.get("role", "guest") # カスタムクレーム例
# ロールベースのアクセス制御(例)
if user_role != "guest":
return {
"status": "403",
"statusDescription": "Forbidden",
"body": f'Access denied: User role "{user_role}" is not authorized',
}
# ペイロード情報をリクエストに追加(例)
request["headers"]["x-user-id"] = [{"key": "X-User-Id", "value": user_id}]
request["headers"]["x-user-role"] = [{"key": "X-User-Role", "value": user_role}]
# 検証成功時はリクエストをそのまま通過させる
return request
except Exception as e:
return {
"status": "403",
"statusDescription": "Forbidden",
"body": f"Access denied: {str(e)}",
}
結果
成功
$ curl 'https://d10nweq0z4cqez.cloudfront.net/' \
-H 'authorization: bearer JWTの文字列'
<!DOCTYPE html>
<html>
<head><title>Hello, World!</title></head>
<body>Hello, World!</body>
</html>%
authorizationなし
$ curl 'https://d10nweq0z4cqez.cloudfront.net/'
Authorization header missing%
Access denied
$ curl 'https://d10nweq0z4cqez.cloudfront.net/' \
-H 'authorization: bearer 間違ったJWT文字列'
Access denied: Invalid crypto padding%
ポイント
- lambda@edgeについて
- us-east-1でしか使えない
- ビューワーリクエストの場合は5秒間しか動作できない
- snapStart非対応
- cdk destroyでエラーになって削除できない
- 検証lambdaに付いて
- auth0で公開している公開鍵を取得
- jwtに含まれるkidを使って公開鍵を特定
- 公開鍵をPEM形式に変更する
- decodeして検証する
lambda@edgeの削除
以下のエラーになります。
"Lambda was unable to delete arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:FuncVerifyJwt:7 because it is a replicated function.
以下の手順で削除できます。
-
stackを矯正削除する
-
削除後数分待ってコンソールで削除する(5分くらい待つと削除できました)
Discussion