【NextAuth.js/認可】S3バケットへのアクセスをIdTokenで制限する
はじめに
ウェブアプリケーションにおいて、ユーザーのAWSリソースへのアクセスを制限したい時に、IAM Roleで制御する方法について簡単にまとめます。
本記事は、以下の記事を参考にAWS CDKで構築したものです。
なお、本記事のCognitoのIDプールの設定は以下の記事をベースに書いています。
事前に読んでおくとより理解がしやすいはずです。
やりたいこと
基本的なIAM Roleの取得と一時クレデンシャルキーの取得の概念は以下のようになっています。
ユーザーが所属するグループにS3へアクセス可能な権限が付与されており、
そのため、一時クレデンシャルキーを使ってユーザーがS3にアクセスできます。
S3にアクセス可能な一時キーを取得してアクセス
S3のディレクトリごとのアクセス制限をLambdaに設定できます。
IDプールからはIdentityId
というIDが発行され、IAM Roleから参照可能です。
そのため、S3のパスのアクセス制限として設定できます。
例えば、ユーザーAにはap-northeast-1:aaaa0001
、ユーザーBにap-northeast-1:bbbb0002
というIdentityId
が発行されたとします。
そして、IAM Roleでは、S3のパスのうちs3://zenn-example/cognito-test/{IdentityId}
へのアクセスのみ許可する設定にしておきます。
そのIAM Roleを介してLambdaを実行すると、ユーザーAはs3://zenn-example/cognito-test/ap-northeast-1:aaaa0001
のパスのみアクセス可能になります。
対して、ユーザーBのパスとしているs3://zenn-example/cognito-test/ap-northeast-1:bbbb0002
へのアクセスはできないようになります。
S3にアクセス可能な一時キーを取得してアクセス
対象読者
- Cognitoを使っている
- サインインしたユーザーごとに権限を付与したい
- ユーザーへ特定のS3のパスのみアクセスできるように、IAM Roleで制限したい
デプロイ環境
- macOS: 13.5
- AWS CDK: 2.96.2
コード
コードは以下のリポジトリにおいてあります。本記事では紹介しない、API Gatewayの構築などもできます。
インフラ
PolicyStatement
でIDプールから発行されるIdentityId
を受け取る形で
S3へのアクセス制限を記述しています。
${cognito-identity.amazonaws.com:sub}
でIdentityId
を受け取っています。
(前略)
// IdentityPoolからassumeできるIAM Role
const federatedPrincipal = new aws_iam.FederatedPrincipal(
"cognito-identity.amazonaws.com",
{
StringEquals: {
"cognito-identity.amazonaws.com:aud": params.idPool.idPoolId,
},
"ForAnyValue:StringLike": {
"cognito-identity.amazonaws.com:amr": "authenticated",
},
},
"sts:AssumeRoleWithWebIdentity",
);
const adminRole = new aws_iam.Role(this, "admin-role", {
roleName: `${prefix}-api-gateway-admin-role`,
assumedBy: federatedPrincipal,
inlinePolicies: {
executeApi: new aws_iam.PolicyDocument({
statements: [
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
resources: adminOnlyApiGwResource(
accountId,
params.idPool.apigwRestApiId,
),
actions: ["execute-api:Invoke"],
}),
+ new aws_iam.PolicyStatement({
+ effect: aws_iam.Effect.ALLOW,
+ actions: ["s3:ListBucket"],
+ resources: [`arn:aws:s3:::${params.s3Bucket}`],
+ conditions: { StringLike: { "s3:prefix": ["cognito-test"] } },
+ }),
+ new aws_iam.PolicyStatement({
+ effect: aws_iam.Effect.ALLOW,
+ resources: [
+ `arn:aws:s3:::${params.s3Bucket}/cognito-test/\${cognito-identity.amazonaws.com:sub}`,
+ `arn:aws:s3:::${params.s3Bucket}/cognito-test/\${cognito-identity.amazonaws.com:sub}/*`,
+ ],
+ actions: ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
+ }),
],
}),
},
});
(後略)
バックエンド
Lambdaで実行する処理の説明をします。
以下の関数でIdTokenからAWSのクレデンシャルを取得します。
import jwt
import boto3
import s3fs
import os
client = boto3.client("cognito-identity", region_name="ap-northeast-1")
idp = boto3.client("cognito-idp", region_name="ap-northeast-1")
REGION = "ap-northeast-1"
ACCOUNT_ID = os.environ["ACCOUNT_ID"]
USER_POOL_ID = os.environ["USER_POOL_ID"]
IDENTITY_POOL_ID = os.environ["IDENTITY_POOL_ID"]
S3_BUCKET = os.environ["S3_BUCKET"]
def _get_credentials_from_id_token(id_token: str) -> dict:
logins = {f"cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}": id_token}
decode_jwt = jwt.decode(id_token, options={"verify_signature": False})
# ユーザーが存在するかの確認
# 存在しない場合はエラーになる
user_info = idp.admin_get_user(UserPoolId=USER_POOL_ID, Username=decode_jwt["cognito:username"])
print(f"{user_info=}")
# identity-poolからidを取得する
cognito_identity_id = client.get_id(
AccountId=ACCOUNT_ID, IdentityPoolId=IDENTITY_POOL_ID, Logins=logins
)
# get ACCESS_KEY, SECRET_KEY, etc...
credentials = client.get_credentials_for_identity(
IdentityId=cognito_identity_id["IdentityId"], Logins=logins
)
return credentials
(後略)
以下がGET /read-file
のエンドポイントの実装です。
id_token
から一時クレデンシャルキーを取得して、それを使ってS3へアクセスしています。
(前略)
@api_router.get("/read-file")
def read_file(request: Request):
lambda_event = request.scope["aws.event"]
print(f"{lambda_event=}")
id_token = lambda_event["headers"]["idToken"]
credentials = _get_credentials_from_id_token(id_token=id_token)
fs = s3fs.S3FileSystem(
anon=False,
key=credentials["Credentials"]["AccessKeyId"],
secret=credentials["Credentials"]["SecretKey"],
token=credentials["Credentials"]["SessionToken"],
)
with fs.open(f"s3://{S3_BUCKET}/cognito-test/{credentials['IdentityId']}/data.txt", "r") as f:
data = f.readline()
return {"status": "success", "type": "common", "data": data}
フロントエンド
フロントエンドで変更するところは基本的には無いです。前回の記事のフロントエンドを利用している場合には、リクエストヘッダーにidToken
を付与してください。
import axios from "axios";
const credentials = await getCredentialsFromIdToken(
idToken,
);
const signedHeaders = await getSignedHeaders(
credentials,
new URL(`${process.env.BACKEND_API_ENDPOINT}/v1/user`),
);
const BackendApiClient = axios.create({
baseURL: `${process.env.BACKEND_API_ENDPOINT}/v1`,
});
const options = {
method: "GET",
- headers: signedHeaders,
+ headers: {idToken, ...signedHeaders},
url: "/user",
};
const result = await BackendApiClient(options)
動作確認
動作確認をするために、S3バケットにユーザーのIdentityId
ごとにファイルを設置します。
例としてadmin
とuser
という2つのユーザーを考えます。
各ユーザーのIdentityId
ごとのパスにdata.txt
というファイルを設置しておきます。
admin
側のdata.txt
にはadminという文字列を
user
側のdata.txt
にはuserという文字列が入力されています。
それぞれのアクセス制限が動いているところを確認します。
Jupyterで、admin
とuser
のfs
とpath
をそれぞれ作成します。
admin_fs
を使ってadmin_path
とuser_path
にアクセスします。
admin_path
へのアクセスは成功し、user_path
へは失敗することが確認できました。
ほぼ同じですが、user_fs
を使ってadmin_path
とuser_path
にアクセスします。
user_path
へのアクセスは成功し、admin_path
へは失敗することが確認できました。
以上の結果から正しく動いていることが確認できました。
上記の確認コードはこちらにあります。
import jwt
import boto3
import s3fs
client = boto3.client("cognito-identity", region_name="ap-northeast-1")
idp = boto3.client("cognito-idp", region_name="ap-northeast-1")
REGION = "ap-northeast-1"
ACCOUNT_ID = "****"
USER_POOL_ID = "ap-northeast-1_****"
IDENTITY_POOL_ID = "ap-northeast-1:****"
S3_BUCKET = "****"
user_id_token = "****"
admin_id_token = "****"
def get_credentials_from_id_token(id_token: str) -> dict:
logins = {f"cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}": id_token}
# decode_jwt = JwtPayload.of(id_token)
decode_jwt = jwt.decode(id_token, options={"verify_signature": False})
# ユーザーが存在するかの確認
# 存在しない場合はエラーになって、APIを終了させる
user_info = idp.admin_get_user(UserPoolId=USER_POOL_ID, Username=decode_jwt["cognito:username"])
# print(f"{user_info=}")
# identity-poolからidを取得する
cognito_identity_id = client.get_id(
AccountId=ACCOUNT_ID, IdentityPoolId=IDENTITY_POOL_ID, Logins=logins
)
# get sessionToken, etc.
credentials = client.get_credentials_for_identity(
IdentityId=cognito_identity_id["IdentityId"], Logins=logins
)
return credentials
user_credentials = get_credentials_from_id_token(user_id_token)
admin_credentials = get_credentials_from_id_token(admin_id_token)
user_fs = s3fs.S3FileSystem(
anon=False,
key=user_credentials["Credentials"]["AccessKeyId"],
secret=user_credentials["Credentials"]["SecretKey"],
token=user_credentials["Credentials"]["SessionToken"]
)
admin_fs = s3fs.S3FileSystem(
anon=False,
key=admin_credentials["Credentials"]["AccessKeyId"],
secret=admin_credentials["Credentials"]["SecretKey"],
token=admin_credentials["Credentials"]["SessionToken"]
)
user_path = f"s3://{S3_BUCKET}/cognito-test/{user_credentials['IdentityId']}/data.txt"
admin_path = f"s3://{S3_BUCKET}/cognito-test/{admin_credentials['IdentityId']}/data.txt"
with user_fs.open(user_path, "r") as f:
print(f.readline())
try:
with user_fs.open(admin_path, "r") as f:
print(f.readline())
except Exception as e:
print(e)
with admin_fs.open(admin_path, "r") as f:
print(f.readline())
try:
with admin_fs.open(user_path, "r") as f:
print(f.readline())
except Exception as e:
print(e)
おわりに
IdTokenを使って、ユーザーごとにアクセス可能なS3のパスを設定しました。
誰かの参考になれば幸いです。
Discussion