🐏

【NextAuth.js/認可】S3バケットへのアクセスをIdTokenで制限する

2023/09/27に公開

はじめに

ウェブアプリケーションにおいて、ユーザーのAWSリソースへのアクセスを制限したい時に、IAM Roleで制御する方法について簡単にまとめます。

本記事は、以下の記事を参考にAWS CDKで構築したものです。

https://dev.classmethod.jp/articles/cognito-trigger-allow-access-per-identity-id/

なお、本記事のCognitoのIDプールの設定は以下の記事をベースに書いています。
事前に読んでおくとより理解がしやすいはずです。

https://zenn.dev/gsy0911/articles/5f3290ca3a54ce

やりたいこと

基本的なIAM Roleの取得と一時クレデンシャルキーの取得の概念は以下のようになっています。
ユーザーが所属するグループにS3へアクセス可能な権限が付与されており、
そのため、一時クレデンシャルキーを使ってユーザーがS3にアクセスできます。


S3にアクセス可能な一時キーを取得してアクセス

S3のディレクトリごとのアクセス制限をLambdaに設定できます。
IDプールからはIdentityIdというIDが発行され、IAM Roleから参照可能です。
そのため、S3のパスのアクセス制限として設定できます。

例えば、ユーザーAにはap-northeast-1:aaaa0001、ユーザーBにap-northeast-1:bbbb0002というIdentityIdが発行されたとします。
そして、IAM Roleでは、S3のパスのうちs3://zenn-example/cognito-test/{IdentityId}へのアクセスのみ許可する設定にしておきます。
そのIAM Roleを介してLambdaを実行すると、ユーザーAはs3://zenn-example/cognito-test/ap-northeast-1:aaaa0001のパスのみアクセス可能になります。
対して、ユーザーBのパスとしているs3://zenn-example/cognito-test/ap-northeast-1:bbbb0002へのアクセスはできないようになります。


S3にアクセス可能な一時キーを取得してアクセス

対象読者

  • Cognitoを使っている
  • サインインしたユーザーごとに権限を付与したい
    • ユーザーへ特定のS3のパスのみアクセスできるように、IAM Roleで制限したい

デプロイ環境

  • macOS: 13.5
  • AWS CDK: 2.96.2

コード

コードは以下のリポジトリにおいてあります。本記事では紹介しない、API Gatewayの構築などもできます。

https://github.com/gsy0911/zenn-nextjs-authjs-cognito/tree/v3

インフラ

PolicyStatementでIDプールから発行されるIdentityIdを受け取る形で
S3へのアクセス制限を記述しています。
${cognito-identity.amazonaws.com:sub}IdentityIdを受け取っています。

infrastructure/lib/Cognito.ts(一部)
(前略)
    // IdentityPoolからassumeできるIAM Role
    const federatedPrincipal = new aws_iam.FederatedPrincipal(
      "cognito-identity.amazonaws.com",
      {
        StringEquals: {
          "cognito-identity.amazonaws.com:aud": params.idPool.idPoolId,
        },
        "ForAnyValue:StringLike": {
          "cognito-identity.amazonaws.com:amr": "authenticated",
        },
      },
      "sts:AssumeRoleWithWebIdentity",
    );

    const adminRole = new aws_iam.Role(this, "admin-role", {
      roleName: `${prefix}-api-gateway-admin-role`,
      assumedBy: federatedPrincipal,
      inlinePolicies: {
        executeApi: new aws_iam.PolicyDocument({
          statements: [
            new aws_iam.PolicyStatement({
              effect: aws_iam.Effect.ALLOW,
              resources: adminOnlyApiGwResource(
                accountId,
                params.idPool.apigwRestApiId,
              ),
              actions: ["execute-api:Invoke"],
            }),
+           new aws_iam.PolicyStatement({
+             effect: aws_iam.Effect.ALLOW,
+             actions: ["s3:ListBucket"],
+             resources: [`arn:aws:s3:::${params.s3Bucket}`],
+             conditions: { StringLike: { "s3:prefix": ["cognito-test"] } },
+           }),
+           new aws_iam.PolicyStatement({
+             effect: aws_iam.Effect.ALLOW,
+             resources: [
+               `arn:aws:s3:::${params.s3Bucket}/cognito-test/\${cognito-identity.amazonaws.com:sub}`,
+               `arn:aws:s3:::${params.s3Bucket}/cognito-test/\${cognito-identity.amazonaws.com:sub}/*`,
+             ],
+             actions: ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
+           }),
          ],
        }),
      },
    });
(後略)

バックエンド

Lambdaで実行する処理の説明をします。
以下の関数でIdTokenからAWSのクレデンシャルを取得します。

backend/src/lambda_handlers.py(一部)
import jwt
import boto3
import s3fs
import os


client = boto3.client("cognito-identity", region_name="ap-northeast-1")
idp = boto3.client("cognito-idp", region_name="ap-northeast-1")

REGION = "ap-northeast-1"
ACCOUNT_ID = os.environ["ACCOUNT_ID"]
USER_POOL_ID = os.environ["USER_POOL_ID"]
IDENTITY_POOL_ID = os.environ["IDENTITY_POOL_ID"]
S3_BUCKET = os.environ["S3_BUCKET"]


def _get_credentials_from_id_token(id_token: str) -> dict:
    logins = {f"cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}": id_token}
    decode_jwt = jwt.decode(id_token, options={"verify_signature": False})
    # ユーザーが存在するかの確認
    # 存在しない場合はエラーになる
    user_info = idp.admin_get_user(UserPoolId=USER_POOL_ID, Username=decode_jwt["cognito:username"])
    print(f"{user_info=}")

    # identity-poolからidを取得する
    cognito_identity_id = client.get_id(
        AccountId=ACCOUNT_ID, IdentityPoolId=IDENTITY_POOL_ID, Logins=logins
    )
    # get ACCESS_KEY, SECRET_KEY, etc...
    credentials = client.get_credentials_for_identity(
        IdentityId=cognito_identity_id["IdentityId"], Logins=logins
    )
    return credentials

(後略)

以下がGET /read-fileのエンドポイントの実装です。
id_tokenから一時クレデンシャルキーを取得して、それを使ってS3へアクセスしています。

backend/src/lambda_handlers.py(一部)
(前略)

@api_router.get("/read-file")
def read_file(request: Request):
    lambda_event = request.scope["aws.event"]
    print(f"{lambda_event=}")
    id_token = lambda_event["headers"]["idToken"]
    credentials = _get_credentials_from_id_token(id_token=id_token)
    fs = s3fs.S3FileSystem(
        anon=False,
        key=credentials["Credentials"]["AccessKeyId"],
        secret=credentials["Credentials"]["SecretKey"],
        token=credentials["Credentials"]["SessionToken"],
    )
    with fs.open(f"s3://{S3_BUCKET}/cognito-test/{credentials['IdentityId']}/data.txt", "r") as f:
        data = f.readline()
    return {"status": "success", "type": "common", "data": data}

フロントエンド

フロントエンドで変更するところは基本的には無いです。前回の記事のフロントエンドを利用している場合には、リクエストヘッダーにidTokenを付与してください。

サンプルコード
import axios from "axios";

const credentials = await getCredentialsFromIdToken(
  idToken,
);
const signedHeaders = await getSignedHeaders(
  credentials,
  new URL(`${process.env.BACKEND_API_ENDPOINT}/v1/user`),
);

const BackendApiClient = axios.create({
  baseURL: `${process.env.BACKEND_API_ENDPOINT}/v1`,
});

const options = {
  method: "GET",
- headers: signedHeaders,
+ headers: {idToken, ...signedHeaders},
  url: "/user",
};
const result = await BackendApiClient(options)

動作確認

動作確認をするために、S3バケットにユーザーのIdentityIdごとにファイルを設置します。
例としてadminuserという2つのユーザーを考えます。
各ユーザーのIdentityIdごとのパスにdata.txtというファイルを設置しておきます。

admin側のdata.txtにはadminという文字列を
user側のdata.txtにはuserという文字列が入力されています。

それぞれのアクセス制限が動いているところを確認します。
Jupyterで、adminuserfspathをそれぞれ作成します。

admin_fsを使ってadmin_pathuser_pathにアクセスします。
admin_pathへのアクセスは成功し、user_pathへは失敗することが確認できました。

ほぼ同じですが、user_fsを使ってadmin_pathuser_pathにアクセスします。
user_pathへのアクセスは成功し、admin_pathへは失敗することが確認できました。

以上の結果から正しく動いていることが確認できました。

上記の確認コードはこちらにあります。

import jwt
import boto3
import s3fs
client = boto3.client("cognito-identity", region_name="ap-northeast-1")
idp = boto3.client("cognito-idp", region_name="ap-northeast-1")

REGION = "ap-northeast-1"
ACCOUNT_ID = "****"
USER_POOL_ID = "ap-northeast-1_****"
IDENTITY_POOL_ID = "ap-northeast-1:****"
S3_BUCKET = "****"
user_id_token = "****"
admin_id_token = "****"

def get_credentials_from_id_token(id_token: str) -> dict:
    logins = {f"cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}": id_token}
    # decode_jwt = JwtPayload.of(id_token)
    decode_jwt = jwt.decode(id_token, options={"verify_signature": False})
    # ユーザーが存在するかの確認
    # 存在しない場合はエラーになって、APIを終了させる
    user_info = idp.admin_get_user(UserPoolId=USER_POOL_ID, Username=decode_jwt["cognito:username"])
    # print(f"{user_info=}")

    # identity-poolからidを取得する
    cognito_identity_id = client.get_id(
        AccountId=ACCOUNT_ID, IdentityPoolId=IDENTITY_POOL_ID, Logins=logins
    )
    # get sessionToken, etc.
    credentials = client.get_credentials_for_identity(
        IdentityId=cognito_identity_id["IdentityId"], Logins=logins
    )
    return credentials


user_credentials = get_credentials_from_id_token(user_id_token)
admin_credentials = get_credentials_from_id_token(admin_id_token)

user_fs = s3fs.S3FileSystem(
    anon=False, 
    key=user_credentials["Credentials"]["AccessKeyId"], 
    secret=user_credentials["Credentials"]["SecretKey"], 
    token=user_credentials["Credentials"]["SessionToken"]
)
admin_fs = s3fs.S3FileSystem(
    anon=False, 
    key=admin_credentials["Credentials"]["AccessKeyId"], 
    secret=admin_credentials["Credentials"]["SecretKey"], 
    token=admin_credentials["Credentials"]["SessionToken"]
)

user_path = f"s3://{S3_BUCKET}/cognito-test/{user_credentials['IdentityId']}/data.txt"
admin_path = f"s3://{S3_BUCKET}/cognito-test/{admin_credentials['IdentityId']}/data.txt"

with user_fs.open(user_path, "r") as f:
    print(f.readline())
try:
    with user_fs.open(admin_path, "r") as f:
        print(f.readline())
except Exception as e:
    print(e)
with admin_fs.open(admin_path, "r") as f:
    print(f.readline())
try:
    with admin_fs.open(user_path, "r") as f:
        print(f.readline())
except Exception as e:
    print(e)

おわりに

IdTokenを使って、ユーザーごとにアクセス可能なS3のパスを設定しました。
誰かの参考になれば幸いです。

GitHubで編集を提案

Discussion