📑

Cognitoユーザープールを移行してみる(クライアントシークレットあり/なし)

2024/03/19に公開

概要

こんにちは🙌
AWS Cognitoユーザープールを移行するときに考えたことを書き記してみました。
検証としてCognitoユーザープールをクライアントシークレットあり/なしの状態でのLambda移行を試してみます。

同一のAWSアカウント、リージョン内での移行を想定していますので、アカウントが異なる場合、アカウント間でのアクセスを設定するための追加考慮事項も必要となります。

ゴール

Cognitoユーザープール移行方法を試してみる
ユーザープールの設定内容による認証方法の違いを知る

移行方法

既に色々な記事で書かれていますので、詳細は書きませんが、ユーザープールを移行するには2つの手段があります。

  1. CSVでのエクスポートとインポート
  2. LambdaによるCognito APIを利用した移行方法

それぞれの方法について概要は以下のブログの内容が非常に分かりやすくまとめてくださっていたで、置いておきます。

簡潔に言うと、CSVでのエクスポートとインポートを使った場合、ユーザーによって設定されたパスワードの移行ができないという制約があります。そのためユーザーに対して再度パスワードを設定してもらうための手間が発生します。
マネジメントコンソール上にも記載がありますね。

ユーザーパスワードをインポートすることはできません。
Cognito1

CSVのインポートジョブを実行するには、事前にIAMロールが必要になります。
Cognitoへの書き込み、CloudWatchログ(ログ出力が必要であれば)の権限を持ったロールを設定することになります。
Cognito2

これからやること

今回は、Lambdaによる移行を試してみたいので、2つの場面を設定して、それぞれでどのように対応する必要があるのかを考えてみたいと思います。
移行元と先のCognitoは同一の設定が入っていることを前提とします。

  1. ユーザー名、パスワードで認証する(パブリッククライアント)
  2. クライアントシークレットが設定されたユーザープールの移行

リソースはすべて簡易的にTerraformで用意していきます。

前提知識

Black Beltの資料を使わせていただいて、事前にCognitoのAPIと認証フローを理解しておきます。

Cognitoには、Admin APIと非Admin APIがあります。
それぞれ呼び出し元によって使い分けることが出来ます。

認証フローは以下となります。
ADMIN_USER_PASSWORD_AUTHはAdmin APIからの呼び出しで利用します。管理用のフローとなりますので、例えばユーザーがパスワードを忘れた場合等にADMIN_USER_PASSWORD_AUTHフローを使って一時的にアカウントを設定したりする時に使わたり、ユーザーに代わって認証プロセスを完了する時に使用します。

検証のための事前準備

リソースの作成

検証のために使うリソースを先に作成しておきます。以下のリソースを作成します。

  • 移行元Cognito
  • 移行先Cognito
  • IAMロール、ポリシー
  • APIGateway(動作確認のために作成します。)
# Cognito
resource "aws_cognito_user_pool" "src_user_pool" {
  name                = "src-pool"
  username_attributes = ["email"]
  password_policy {
    minimum_length                   = 8
    require_lowercase                = false
    require_numbers                  = false
    require_symbols                  = false
    require_uppercase                = false
    temporary_password_validity_days = 7
  }
}

resource "aws_cognito_user_pool_client" "src_client" {
  name            = "src-client"
  user_pool_id    = aws_cognito_user_pool.src_user_pool.id
  generate_secret = false
  explicit_auth_flows = [
    "ADMIN_NO_SRP_AUTH",
    "USER_PASSWORD_AUTH"
  ]
}

resource "aws_cognito_user_pool" "dst_user_pool" {
  name                = "dst-pool"
  username_attributes = ["email"]
  password_policy {
    minimum_length                   = 8
    require_lowercase                = false
    require_numbers                  = false
    require_symbols                  = false
    require_uppercase                = false
    temporary_password_validity_days = 7
  }
}

resource "aws_cognito_user_pool_client" "dst_client" {
  name         = "dst-client"
  user_pool_id = aws_cognito_user_pool.dst_user_pool.id
  generate_secret = false
  explicit_auth_flows = [
    "ADMIN_NO_SRP_AUTH",
    "USER_PASSWORD_AUTH"
  ]
}

# IAMロール、ポリシー
resource "aws_iam_role" "iam_for_lambda" {
  name = "migration-cognito-lambda-role"
  assume_role_policy = jsonencode({
    "Version" = "2012-10-17",
    "Statement" = [
      {
        "Action" = "sts:AssumeRole",
        "Effect" = "Allow",
        "Principal" = {
          "Service" = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "iam_for_lambda" {
  name = "migration-cognito-lambda-policy"
  policy = jsonencode({
    "Version" = "2012-10-17",
    "Statement" = [
      {
        "Action"   = "cognito-idp:*",
        "Effect"   = "Allow",
        "Resource" = "*"
      },
      {
        "Action"   = "logs:*",
        "Effect"   = "Allow",
        "Resource" = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "iam_for_lambda" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = aws_iam_policy.iam_for_lambda.arn
}

# APIGateway
resource "aws_api_gateway_rest_api" "example_api" {
  name        = "example-API"
  description = "This is my API for demonstration purposes"
}

resource "aws_api_gateway_resource" "example_resource" {
  rest_api_id = aws_api_gateway_rest_api.example_api.id
  parent_id   = aws_api_gateway_rest_api.example_api.root_resource_id
  path_part   = "exampleresource"
}

resource "aws_api_gateway_method" "example_method" {
  rest_api_id   = aws_api_gateway_rest_api.example_api.id
  resource_id   = aws_api_gateway_resource.example_resource.id
  http_method   = "GET"
  authorization = "COGNITO_USER_POOLS"
  authorizer_id = aws_api_gateway_authorizer.example_authorizer.id
}

resource "aws_api_gateway_method_response" "respose200" {
  rest_api_id = aws_api_gateway_rest_api.example_api.id
  resource_id = aws_api_gateway_resource.example_resource.id
  http_method = aws_api_gateway_method.example_method.http_method
  status_code = "200"
}

resource "aws_api_gateway_integration" "mock" {
  rest_api_id = aws_api_gateway_rest_api.example_api.id
  resource_id = aws_api_gateway_resource.example_resource.id
  http_method = aws_api_gateway_method.example_method.http_method
  type                 = "MOCK"
  passthrough_behavior = "WHEN_NO_MATCH"
  request_templates = {
    "application/json" = "{\"statusCode\": 200}"
  }
}

resource "aws_api_gateway_integration_response" "response200" {
  depends_on  = [aws_api_gateway_integration.mock]
  rest_api_id = aws_api_gateway_rest_api.example_api.id
  resource_id = aws_api_gateway_resource.example_resource.id
  http_method = aws_api_gateway_integration.mock.http_method
  status_code = aws_api_gateway_method_response.respose200.status_code
  response_templates = {
    "application/json" = "{\"message\": \"Hello, World!\"}"
  }
}

resource "aws_api_gateway_authorizer" "example_authorizer" {
  name            = "example-authorizer"
  rest_api_id     = aws_api_gateway_rest_api.example_api.id
  type            = "COGNITO_USER_POOLS"
  identity_source = "method.request.header.Authorization"
  provider_arns   = [aws_cognito_user_pool.dst_user_pool.arn]
}

resource "aws_api_gateway_deployment" "example_deployment" {
  depends_on = [
    aws_api_gateway_integration.mock,
    aws_api_gateway_integration_response.response200
  ]

  rest_api_id = aws_api_gateway_rest_api.example_api.id
  stage_name  = "test"
}

output "invoke_url" {
  value = "${aws_api_gateway_deployment.example_deployment.invoke_url}/${aws_api_gateway_resource.example_resource.path_part}"
}

リクエストを試す

新規ユーザーのステータスを検証済みにする。

ユーザーを作成した直後の状態だと確認ステータスがパスワードを強制的に変更となっています。GUIであればパスワード変更画面が出てきますが、今回はCurlで試してたいので、aws cliを使って検証済みの状態に変更します。
※事前にユーザーを作成してください。

$ aws cognito-idp admin-set-user-password \
--user-pool-id XXXXX --username XXXXX --password XXXXX --permanent

リクエストを送る

検証済みとなりましたので、認証情報を使ってリクエストが通るか確認してみます。
リクエストで利用する認証情報を含んだJSONファイルを作っておきます。(各環境で作成したリソース・ユーザーに合わせて変更します。)

{
  "ClientId" : "XXXXX", # COGNITO_CLIENT_IDを入れます。
  "AuthFlow" : "USER_PASSWORD_AUTH",
  "AuthParameters": {
    "USERNAME" : "XXXXX",
    "PASSWORD" : "XXXXX"
  }
}

リクエストをCurlで送るには、IdTokenを取得してから、その情報をAuthorizationヘッダーに設定してリクエストを送ります。
cognitoの作成リージョンに合わせてURLは変えるようにします。

# Cognitoへ認証を行い、トークンを取得します。
$ IdToken=$(curl -s -X POST \
  -H 'X-Amz-Target: AWSCognitoIdentityProviderService.InitiateAuth' \
  -H 'Content-Type: application/x-amz-json-1.1' \
  --data @auth.json \ # 先ほど作成したJSONファイルです。
  https://cognito-idp.<リージョン>.amazonaws.com/ | jq -r .AuthenticationResult.IdToken)

# APIGatewayへのアクセスとなります。
curl \
  -H "Authorization: $IdToken" \
  https://XXXXX.execute-api.ap-northeast-1.amazonaws.com/test/exampleresource

{"message": "Hello, World!"} が返却されたら成功です!

1.ユーザー名、パスワードのみで認証するユーザープールの移行(パブリッククライアント)

環境と動作確認するための準備が出来たので、Lambdaを作成して実際にユーザーが移行されるかどうか、まずはパブリッククライントのユーザープール移行を確認してみようと思います。
パブリッククライアントというのは、クライアントシークレットを使わない認証方式となります。ブラウザやモバイルデバイスで実行されるアプリケーションやSPAで認証を行う時のようなユーザー側のデバイスで認証が行われる場合に取られるパターンとなります。

まずは、Lambdaで使うPythonを作成します。(例外処理やログ出力は入れていない状態なので、検証用とご理解ください。)
ここでの処理は大きく2点です。

  1. ユーザーが新しいユーザープールで認証を試みるときUserMigration_Authentication、既存のユーザープールでユーザーを認証し、認証が成功したらそのユーザーの属性を新しいユーザープールにコピーします。
  2. ユーザーがパスワードを忘れた場合UserMigration_ForgotPassword、元のユーザープールでユーザーが存在するか確認し、存在すればそのユーザーの属性を新しいユーザープールにコピーします。
import boto3
import os

client = boto3.client("cognito-idp")

SRC_USER_POOL_ID = os.environ["SRC_USER_POOL_ID"]
SRC_CLIENT_ID = os.environ["SRC_CLIENT_ID"]


def lambda_handler(event, context):
    # 新規Cognitoにユーザーを追加
    if event["triggerSource"] == "UserMigration_Authentication":
        user = client.initiate_auth(
            ClientId=SRC_CLIENT_ID,
            AuthFlow="USER_PASSWORD_AUTH",
            AuthParameters={
                "USERNAME": event["userName"],
                "PASSWORD": event["request"]["password"],
            },
        )
        if user:
            # 元のユーザープールからユーザー属性を取得して、新しいユーザープールのユーザー属性に追加
            user_attributes = client.get_user(
                AccessToken=user["AuthenticationResult"]["AccessToken"]
            )
            for user_attribute in user_attributes["UserAttributes"]:
                if user_attribute["Name"] == "email":
                    user_email = user_attribute["Value"]
                    event["response"]["userAttributes"] = {
                        "email": user_email,
                        "email_verified": "true",
                    }
                    event["response"]["messageAction"] = "SUPPRESS"
            return event
        else:
            return "Bad Password"
    # パスワードリセットの場合、ユーザー有無のみを確認して、新しいユーザープールにユーザーを追加
    elif event["triggerSource"] == "UserMigration_ForgotPassword":
        try:
            user = client.admin_get_user(
                UserPoolId=SRC_USER_POOL_ID, Username=event["userName"]
            )
            if "UserAttributes" in user:
                email_attribute = next(
                    (
                        attr
                        for attr in user["UserAttributes"]
                        if attr["Name"] == "email"
                    ),
                    None,
                )
                if email_attribute:
                    user_email = email_attribute["Value"]
                    event["response"]["userAttributes"] = {
                        "email": user_email,
                        "email_verified": "true",
                    }
                    event["response"]["messageAction"] = "SUPPRESS"
            return event
        except client.exceptions.UserNotFoundException:
            return {"status": "User does not exist in the source user pool."}

TerraformでLambdaの作成とCognitoをトリガーとして設定するため変更を加えます。
先ほど作成したコードをZIPで固めてLambda関数として利用します。

resource "aws_cognito_user_pool" "dst_user_pool" {
  name                = "dst-pool"
  username_attributes = ["email"]
  password_policy {
    minimum_length                   = 8
    require_lowercase                = false
    require_numbers                  = false
    require_symbols                  = false
    require_uppercase                = false
    temporary_password_validity_days = 7
  }
  # 以下を追加して、Lambdaトリガーとする。
  lambda_config {
    user_migration = aws_lambda_function.migration_cognito.arn
  }
}

# Lambdaの作成
resource "aws_lambda_function" "migration_cognito" {
  function_name    = "migration-cognito"
  handler          = "cognito_migration.lambda_handler"
  runtime          = "python3.9"
  role             = aws_iam_role.iam_for_lambda.arn
  filename         = "lambda.zip"
  source_code_hash = filebase64sha256("lambda.zip")

  environment {
    variables = {
      SRC_USER_POOL_ID = aws_cognito_user_pool.src_user_pool.id
      SRC_CLIENT_ID    = aws_cognito_user_pool_client.src_client.id
    }
  }
}

resource "aws_lambda_permission" "cognito_migration" {
  statement_id  = "AllowExecutionFromCognito"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.migration_cognito.function_name
  principal     = "cognito-idp.amazonaws.com"
  source_arn    = aws_cognito_user_pool.dst_user_pool.arn
}

これで環境が整いました!
APIGatewayへのリクエストを送ってみると、ユーザーが移行先のCognitoにも出来上がっていることが確認出来ます。実行とレスポンスは以下のようになります。

$ IdToken=$(curl -s -X POST \
  -H 'X-Amz-Target: AWSCognitoIdentityProviderService.InitiateAuth' \
  -H 'Content-Type: application/x-amz-json-1.1' \
  --data @auth.json \
  https://cognito-idp.ap-northeast-1.amazonaws.com/ | jq -r .AuthenticationResult.IdToken)

$ curl \
  -H "Authorization: $IdToken" \
  https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/test/exampleresource
{"message": "Hello, World!"}%   

2.クライアントシークレットが設定されたユーザープールの移行

続いて、クライアントシークレットが設定されているユーザープールの移行を試してみます。

クライアントシークレットとは、Cognito ユーザープールにおけるアプリクライアントを識別するための秘密鍵です。これは、アプリケーションがサーバーサイドで実行されている場合や、クライアントシークレットを安全に保存できる環境で使用されます。クライアントシークレットを使用することで、アプリケーションがCognitoと通信する際のセキュリティが強化されます。

実装にあたっては次の情報を参考にしています。

処理内容を見てもらえば分かりますが、違いとしてはリクエストにシークレットハッシュを含めているかどうかというところだけです。このシークレットの値については安全に保管できる場所である必要があります。今回はLambdaの環境変数に入れることにしましたが、扱いには注意する必要があります。

import json
import boto3
import os
import hmac
import hashlib
import base64

client = boto3.client('cognito-idp')

SRC_USER_POOL_ID = os.environ['SRC_USER_POOL_ID']
SRC_CLIENT_ID = os.environ['SRC_CLIENT_ID']
SRC_CLIENT_SECRET = os.environ['SRC_CLIENT_SECRET']

# シークレットハッシュの取得
def get_secret_hash(username, client_id, client_secret):
    message = username + client_id
    dig = hmac.new(str(client_secret).encode('utf-8'),
                   msg=str(message).encode('utf-8'),
                   digestmod=hashlib.sha256).digest()
    return base64.b64encode(dig).decode()


def lambda_handler(event, context):
    # 新規Cognitoにユーザーを追加
    src_secret_hash = get_secret_hash(event['userName'], SRC_CLIENT_ID, SRC_CLIENT_SECRET)
    if event['triggerSource'] == 'UserMigration_Authentication':
        user = client.initiate_auth(
            ClientId=SRC_CLIENT_ID,
            AuthFlow='USER_PASSWORD_AUTH',
            AuthParameters={
                'USERNAME': event['userName'],
                'PASSWORD': event['request']['password'],
                'SECRET_HASH': src_secret_hash
            }
        )
        if user:
            # 元のユーザープールからユーザー属性を取得して、新しいユーザープールのユーザー属性に追加
            user_attributes = client.get_user(
                AccessToken=user['AuthenticationResult']['AccessToken']
            )
            for user_attribute in user_attributes['UserAttributes']:
                if user_attribute['Name'] == 'email':
                    user_email = user_attribute['Value']
                    event['response']['userAttributes'] = {
                        "email": user_email,
                        "email_verified": "true"
                    }
                    event['response']['messageAction'] = "SUPPRESS"
            return event
        else:
            return 'Bad Password'
    # パスワードリセットの場合、ユーザー有無のみを確認して、新しいユーザープールにユーザーを追加
    elif event["triggerSource"] == "UserMigration_ForgotPassword":
        try:
            user = client.admin_get_user(
                UserPoolId=SRC_USER_POOL_ID,
                Username=event['userName']
            )
            if 'UserAttributes' in user:
                email_attribute = next((attr for attr in user_attributes['UserAttributes'] if attr['Name'] == 'email'), None)
                if email_attribute:
                    user_email = email_attribute['Value']
                    event['response']['userAttributes'] = {
                        "email": user_email,
                        "email_verified": "true"
                    }
                    event['respose']['messageAction'] = "SUPPRESS"
            return event
        except client.exceptions.UserNotFoundException:
            return {'status': 'User does not exist in the source user pool.'}

TerraformでCognitoのクライアントシークレット有効化と、
Lambdaの環境変数SRC_CLIENT_SECRETを追加します。

# クライアントシークレットの有効化
resource "aws_cognito_user_pool_client" "src_client" {
  name            = "src-client"
  user_pool_id    = aws_cognito_user_pool.src_user_pool.id
  generate_secret = true # 有効化する
  explicit_auth_flows = [
    "ADMIN_NO_SRP_AUTH",
    "USER_PASSWORD_AUTH"
  ]
}

resource "aws_cognito_user_pool_client" "dst_client" {
  name         = "dst-client"
  user_pool_id = aws_cognito_user_pool.dst_user_pool.id
  generate_secret = true # 有効化する
  explicit_auth_flows = [
    "ADMIN_NO_SRP_AUTH",
    "USER_PASSWORD_AUTH"
  ]
}

# Lambdaの環境変数を追加しました。
resource "aws_lambda_function" "migration_cognito" {
  function_name    = "migration-cognito"
  handler          = "cognito_migration.lambda_handler"
  runtime          = "python3.9"
  role             = aws_iam_role.iam_for_lambda.arn
  filename         = "lambda.zip"
  source_code_hash = filebase64sha256("lambda.zip")

  environment {
    variables = {
      SRC_USER_POOL_ID  = aws_cognito_user_pool.src_user_pool.id
      SRC_CLIENT_ID     = aws_cognito_user_pool_client.src_client.id
      SRC_CLIENT_SECRET =  aws_cognito_user_pool_client.src_client.client_secret
    }
  }
}

クライアントシークレットは作り直しになるので、リクエストで使っているauth.jsonファイルのClientIdを書き換える必要があります。
あとは、SECRET_HASHを追加します。(作り方は後述)

{
  "ClientId": "XXXXXXX",
  "AuthFlow": "USER_PASSWORD_AUTH",
  "AuthParameters": {
    "USERNAME": "XXXXXX@XXXX.XXXX",
    "PASSWORD": "XXXXXXX",
    "SECRET_HASH": "XXXXXXXXXX"
  }
}

非常に面倒ではありますが、シークレットハッシュは、Lambdaコードの中にあるシークレット取得の部分と同じように取得する必要があります。
Pythonで以下を実行すると値を取得することが出来ます。

import hmac
import hashlib
import base64

client_id = "XXXXXXX" # 移行先CognitoクライントID
client_secret = "XXXXXXXX" # 移行先Cognitoクライアントシークレット
username = "XXXXXXXXXX"

message = bytes(username + client_id, 'utf-8')
key = bytes(client_secret, 'utf-8')

secret_hash = base64.b64encode(hmac.new(key, message, digestmod=hashlib.sha256).digest()).decode()
print(secret_hash)

これでJSONファイルを使ってAPIGatewayにリクエストを送ってみると、無事にユーザーが移行されていることを確認できるはずです!

おわりに

動作確認で使うためのサービスをいくつか悩んだ末にAPIGatewayにしたのですが、どれが最適だったのだろうかと思います。
ALBだとリスナーをHTTPSとする必要があり、準備が大変になりそうで敬遠をして、
CloudFront + S3についても、Lambda@Edgeを使ってCognitoを呼び出す方式を取る必要があって、検証するためにそこまで用意するのが大変だと思い、モックが使えて1つのサービスで完結できるAPIGatewayを選択してみました。

Lambdaでの方式を取るにしてもユーザーに対して移行期間を設けてアクション(ログインをしてもらう)必要はあるので、そういった何も意識する必要がない移行方法があったらいいなと思いました。
ありがとうございました🙌

Discussion