「Remix v2」で「Clerk」のJWT を検証する
めっちゃ消耗した。
やりたいのは、ログイン認証状態でユーザーのスキーマをそのユーザーを限定したいんだけど、認証の知識が皆無だったので手こずった。(おかげで知識が増えたけど)
全然情報が落ちてないしずっとHS256 でデコードしてたけどダメだった。
以下JWTの検証方法です。
外部のサーバーに認証状態でアクセスしたい場合に有効です。
事前知識
JWTとは?
JWT(JSON Web Token)は、認証や情報交換に広く使用されるコンパクトなURLセーフな文字列です。JWTは通常、クライアントとサーバー間で情報を安全に送信するために使用され、ユーザーがサーバーにログインした後に生成されることが多いです。
JWTの構造
JWTは三部分から構成されています:ヘッダー(Header)、ペイロード(Payload)、署名(Signature)。
ヘッダー:ヘッダーは、トークンのタイプ(通常はJWT)と使用されているハッシュアルゴリズム(例えば、HS256)を示すJSONオブジェクトです。
ペイロード:ペイロードには、トークンに関する宣言(クレーム)が含まれます。これはトークンの発行者、有効期限、ユーザーのIDなどの情報を持つことができます。
署名:署名は、トークンが改ざんされていないことを保証するために、ヘッダーとペイロードを秘密鍵でハッシュ化して生成されます。
JWTの検証方法
JWTの検証は、その署名を確認するプロセスです。正しい鍵とアルゴリズムを使用してJWTが署名されたときのデータを再度ハッシュ化し、それが送信されたJWTの署名部分と一致するかどうかを検証します。検証プロセスは以下のステップを含みます:
ヘッダーとペイロードのデコード:ベース64デコーディングを使用してヘッダーとペイロードをデコードします。
署名の検証:デコードされたヘッダーから取得したアルゴリズム情報を使用して、送信されたヘッダーとペイロードを再度結合し、署名を生成します。これを元の署名と比較します。
クレームの検証:ペイロード内のクレームを検証します。有効期限、発行者などが期待通りであるかを確認します。
簡単に説明すると、暗号化と解読方法を用意して使う。解読方法は暗号化キーと解読キーを使う。
暗号化キー:秘密鍵
解読キー:公開鍵
暗号化キーで暗号化して、暗号化キーで解読する方法を共通鍵暗号方式という。
共通鍵暗号方式(対称暗号方式)
共通鍵暗号方式では、暗号化と復号化(解読)に同じ鍵(秘密鍵)を使用します。これは非常に効率的な方法で、データの暗号化と復号化が迅速に行えます。ただし、鍵を安全に共有する必要があり、その鍵の配布がセキュリティのリスクとなる可能性があります。
公開鍵暗号方式(非対称暗号方式)
公開鍵暗号方式では、暗号化には公開鍵を、復号化には秘密鍵を使用します。この方法では、誰もが公開鍵を使用してメッセージを暗号化でき、ただ一人(秘密鍵を持つ者)のみがそれを復号化できます。これにより、公開鍵は安全に配布することが可能で、秘密鍵は所有者のみが保持します。
公開鍵と秘密鍵は数学的に関連しており、一方がもう一方と密接に結びついていますが、秘密鍵から公開鍵を導出することは計算上実行不可能です。
この2つの暗号方式は、様々なセキュリティ要件に応じて適切に選択されます。例えば、公開鍵暗号方式はデジタル署名やSSL/TLS通信などに広く使用されています。一方、共通鍵暗号方式はデータの大量暗号化に適しています。
RS256はどっち?
JWTのエンコード
エンコードは、ユーザーの情報やその他のクレームを含むJWTを生成するプロセスです。ここでは、ユーザーIDや役割などのデータをエンコードしてJWTを生成します。
import jwt
from datetime import datetime, timedelta
# 秘密鍵
secret_key = 'your_secret_key'
# エンコードするデータ
payload = {
'user_id': 123,
'role': 'admin',
'exp': datetime.utcnow() + timedelta(days=1) # トークンの有効期限は1日
}
# JWTの生成
encoded_jwt = jwt.encode(payload, secret_key, algorithm='HS256')
print("生成されたJWT:", encoded_jwt)
JWTのデコード
デコードは、生成されたJWTから元の情報を取り出すプロセスです。このプロセスでは、トークンの署名が正しいこと、およびクレームが有効であることが確認されます。
import jwt
# トークンと秘密鍵(エンコード時と同じものを使用)
token = encoded_jwt # 上記のエンコード例で生成されたJWT
secret_key = 'your_secret_key'
# JWTのデコードと検証
try:
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
print("デコードされたデータ:", decoded)
except jwt.ExpiredSignatureError:
print("トークンが期限切れです")
except jwt.InvalidTokenError:
print("無効なトークンです")
上記が秘密鍵でエンコード、秘密鍵でデコードするパターンです。(HS256)
Clerkは公開鍵でデコードするパターンでデコードします。
ではやっていきますぜ。
ClerkのJWT検証に必要な情報
必要なのは
- JWT template
- token
- public_key
- [RS256]
1. JWT template
Claimsペイロードは好きなの作れるぞ
{
"id": "{{user.id}}",
"primary_email_address": "{{user.primary_email_address}}"
}
2. token
remixの場合はこれです
export const loader: LoaderFunction = async (args) => {
const { getToken } = await getAuth(args);
// トークンを取得
const token = await getToken({ template: 'GCP' });
if (!token) {
throw new Response("Token is missing", { status: 401 });
}
3. public_key
4. [RS256]
デコードは公開鍵でデコードするRS256です。ローカルでテストしてみるといいかも
JWT検証をローカルでテストする方法
- tokenの文字列をコンソールに出力する
- tokenをPythonでデコードする
1.tokenの文字列をコンソールに出力する
const token = await getToken({ template: 'GCP' });
console.log(token);
2.tokenをPythonでデコードする
import jwt
# ここにコンソールからコピーしたトークンを貼り付けてください
token = 'eyJhbGciOiJSUzI1NiIsImNhdCI6ImNsX0I3ZDRQRDIyMkFBQSIsImtpZCI6Imluc18yZXgzTVRCSVlONk5MeFlCU2JOb1U3NTRJdlEiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE3MTI5OTIxNjQsImlhdCI6MTcxMjk5MjEwNCwiaWQiOiJ1c2VyXzJleDhUdmtmWHZFemZPN2ZnRk56bE5IMjNkMSIsImlzcyI6Imh0dHBzOi8vdXAtcXVhZ2dhLTYuY2xlcmsuYWNjb3VudHMuZGV2IiwianRpIjoiOWIwMDNkMTM3ZDg5NzgzM2U1NTQiLCJuYmYiOjE3MTI5OTIwOTksInByaW1hcnlfZW1haWxfYWRkcmVzcyI6Imx1a2VoYXJ1QGdtYWlsLmNvbSIsInN1YiI6InVzZXJfMmV4OFR2a2ZYdkV6Zk83ZmdGTnpsTkgyM2QxIn0.Iu106XggLk-IXLAf2aj68oDpY8Scsgn7BNYvwuIiY_ZSJ15khR6TSaCDUxGmvltef1sT8IsHzs654RlGwfpWFxOjRaggZOoqE3uFN9Q6UneO4Q2ag0oyx48m-MCrQWte51aW_GfLooTq7lQSOKydLKQVqhASCif8VJift3Hdw09b1s8ucG0eJGcveOaP8TYOun3KCUmUUrrCLOlDM1XrJ5QGtoXxZ9_DwuABA1Mr8BIcLF7C5lt3WLw_T03J2Qav4abxrzUBMOb5k0jQEQajeE2-HdmZaQ3uxKKE7US76DXtNQpPz6l_5bVNNHd3vmIkUBHS0yNWJ3H9Yi2jezCbKA'
public_key = '''
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3QhTWOJZ79uJqjS1Yq7Y
fpq1JhyNyz4Pwt7gTzVsq/fr/bRvWFObrW0sSWdkMaosXVvrZ0RDxCZK+ELTzqc5
z5+nbazMlp7fnVAehcnQ+M0BflgR7jBI+BRxv7IiMxjVAa44toBz953p0qmeh5Nu
M3a6KFIjzBqqL8jkxOVwMj9iF5KsXDvLstl2iLIt9wYuzHdyKMzCiFfwc/cdGspx
JlGHP7hFN4OOeufvjhTm1wO1vss7FNw/VlSrVnU6XVU+Z2J/cc0w8u05v+BKGHdg
hJtHfrkAs5g3dkGgnHIO8Un3FRJAPw1OK8xj5VwJeJ5Re4asfobE7gSQR0yLDFVJ
GwIDAQAB
-----END PUBLIC KEY-----
''' # 実際の公開鍵をここに貼り付ける
try:
decoded = jwt.decode(token, public_key, algorithms=["RS256"])
print("デコードされたトークン:", decoded)
except jwt.ExpiredSignatureError:
print("トークンの有効期限が切れています。")
except jwt.InvalidTokenError:
print("無効なトークンです。")
except Exception as e:
print("デコード中にエラーが発生しました:", e)
hoge.tsx
remixのページで外部APIにJWTのtokenをつけて送る方法です。
// hoge.tsx
import { json,LoaderFunction } from '@remix-run/node';
import { useLoaderData } from '@remix-run/react';
import { getAuth } from "@clerk/remix/ssr.server";
const your_api_endpoint = "";
export const loader: LoaderFunction = async (args) => {
const { getToken } = await getAuth(args);
// トークンを取得
const token = await getToken({ template: 'GCP' });
if (!token) {
throw new Response("Token is missing", { status: 401 });
}
// URLにトークンをクエリパラメータとして追加
const url = new URL(your_api_endpoint);
url.searchParams.append('token', token);
// APIリクエストを送信
try {
const response = await fetch(url.toString(), {
headers: { accept: 'application/json' }
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const result = await response.json();
console.log(result);
return json(result); // jsonヘルパーを使用して適切に返す
} catch (error) {
console.error("Failed to fetch data:", error);
throw new Response('Failed to fetch data', { status: 500 });
}
};
export default function Hoge() {
const data = useLoaderData();
return (
<div>
<h1>Resource Data</h1>
<pre>{JSON.stringify(data, null, 2)}</pre>
</div>
);
}
TestAPI:Cloud Functions
以下は外部APIを作ってテストしてみるサンプルコードです。混同しないようにAPIはPythonで作りました。
ランタイム Python3.12
環境変数
JWT_PUBLIC_KEY = "-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3QhTWOJZ79uJqjS1Yq7Y fpq1JhyNyz4Pwt7gTzVsq/fr/bRvWFObrW0sSWdkMaosXVvrZ0RDxCZK+ELTzqc5 z5+nbazMlp7fnVAehcnQ+M0BflgR7jBI+BRxv7IiMxjVAa44toBz953p0qmeh5Nu M3a6KFIjzBqqL8jkxOVwMj9iF5KsXDvLstl2iLIt9wYuzHdyKMzCiFfwc/cdGspx JlGHP7hFN4OOeufvjhTm1wO1vss7FNw/VlSrVnU6XVU+Z2J/cc0w8u05v+BKGHdg hJtHfrkAs5g3dkGgnHIO8Un3FRJAPw1OK8xj5VwJeJ5Re4asfobE7gSQR0yLDFVJ GwIDAQAB -----END PUBLIC KEY-----
"
import os
import functions_framework
import jwt
from flask import jsonify, request
import logging
logging.basicConfig(level=logging.INFO)
@functions_framework.http
def hello_http(request):
request_json = request.get_json(silent=True)
request_args = request.args
headers = {"Access-Control-Allow-Origin": "*"}
token = request_json.get('token') if request_json else request_args.get('token')
if not token:
error_response = jsonify({"error": "Token is missing"})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 200, headers
public_key = os.getenv('JWT_PUBLIC_KEY')
if not public_key:
logging.error("JWT_PUBLIC_KEY is not set.")
error_response = jsonify({"error": "Internal server error"})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 500, headers
try:
decoded = jwt.decode(token, public_key, algorithms=['RS256'])
user_id = decoded.get('id', 'Unknown')
primary_email_address = decoded.get('primary_email_address', 'Unknown')
success_response = jsonify({
"message": "JWT is valid",
"id": user_id,
"primary_email_address": primary_email_address
})
logging.info(f"Response: {success_response.get_data(as_text=True)}")
return success_response, 200, headers
except jwt.ExpiredSignatureError:
error_response = jsonify({"error": "Token has expired"})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 200, headers
except jwt.InvalidTokenError as e:
error_response = jsonify({"error": "Invalid token", "details": str(e)})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 200, headers
except jwt.DecodeError:
error_response = jsonify({"error": "Error decoding token"})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 200, headers
except Exception as e:
logging.error(f"Unexpected error: {e}")
error_response = jsonify({"error": str(e)})
logging.info(f"Response: {error_response.get_data(as_text=True)}")
return error_response, 200, headers
functions-framework==3.*
pyjwt
cryptography
flask
Discussion