🙌

「Remix v2」で「Clerk」のJWT を検証する

2024/04/13に公開

めっちゃ消耗した。

やりたいのは、ログイン認証状態でユーザーのスキーマをそのユーザーを限定したいんだけど、認証の知識が皆無だったので手こずった。(おかげで知識が増えたけど)

全然情報が落ちてないしずっとHS256 でデコードしてたけどダメだった。

以下JWTの検証方法です。

外部のサーバーに認証状態でアクセスしたい場合に有効です。

事前知識

JWTとは?

JWT(JSON Web Token)は、認証や情報交換に広く使用されるコンパクトなURLセーフな文字列です。JWTは通常、クライアントとサーバー間で情報を安全に送信するために使用され、ユーザーがサーバーにログインした後に生成されることが多いです。

JWTの構造
JWTは三部分から構成されています:ヘッダー(Header)、ペイロード(Payload)、署名(Signature)。

ヘッダー:ヘッダーは、トークンのタイプ(通常はJWT)と使用されているハッシュアルゴリズム(例えば、HS256)を示すJSONオブジェクトです。
ペイロード:ペイロードには、トークンに関する宣言(クレーム)が含まれます。これはトークンの発行者、有効期限、ユーザーのIDなどの情報を持つことができます。
署名:署名は、トークンが改ざんされていないことを保証するために、ヘッダーとペイロードを秘密鍵でハッシュ化して生成されます。
JWTの検証方法
JWTの検証は、その署名を確認するプロセスです。正しい鍵とアルゴリズムを使用してJWTが署名されたときのデータを再度ハッシュ化し、それが送信されたJWTの署名部分と一致するかどうかを検証します。検証プロセスは以下のステップを含みます:

ヘッダーとペイロードのデコード:ベース64デコーディングを使用してヘッダーとペイロードをデコードします。
署名の検証:デコードされたヘッダーから取得したアルゴリズム情報を使用して、送信されたヘッダーとペイロードを再度結合し、署名を生成します。これを元の署名と比較します。
クレームの検証:ペイロード内のクレームを検証します。有効期限、発行者などが期待通りであるかを確認します。

簡単に説明すると、暗号化と解読方法を用意して使う。解読方法は暗号化キーと解読キーを使う。

暗号化キー:秘密鍵
解読キー:公開鍵

暗号化キーで暗号化して、暗号化キーで解読する方法を共通鍵暗号方式という。

共通鍵暗号方式(対称暗号方式)

共通鍵暗号方式では、暗号化と復号化(解読)に同じ鍵(秘密鍵)を使用します。これは非常に効率的な方法で、データの暗号化と復号化が迅速に行えます。ただし、鍵を安全に共有する必要があり、その鍵の配布がセキュリティのリスクとなる可能性があります。

公開鍵暗号方式(非対称暗号方式)

公開鍵暗号方式では、暗号化には公開鍵を、復号化には秘密鍵を使用します。この方法では、誰もが公開鍵を使用してメッセージを暗号化でき、ただ一人(秘密鍵を持つ者)のみがそれを復号化できます。これにより、公開鍵は安全に配布することが可能で、秘密鍵は所有者のみが保持します。

公開鍵と秘密鍵は数学的に関連しており、一方がもう一方と密接に結びついていますが、秘密鍵から公開鍵を導出することは計算上実行不可能です。

この2つの暗号方式は、様々なセキュリティ要件に応じて適切に選択されます。例えば、公開鍵暗号方式はデジタル署名やSSL/TLS通信などに広く使用されています。一方、共通鍵暗号方式はデータの大量暗号化に適しています。

RS256はどっち?

JWTのエンコード

エンコードは、ユーザーの情報やその他のクレームを含むJWTを生成するプロセスです。ここでは、ユーザーIDや役割などのデータをエンコードしてJWTを生成します。

python
import jwt
from datetime import datetime, timedelta

# 秘密鍵
secret_key = 'your_secret_key'

# エンコードするデータ
payload = {
    'user_id': 123,
    'role': 'admin',
    'exp': datetime.utcnow() + timedelta(days=1)  # トークンの有効期限は1日
}

# JWTの生成
encoded_jwt = jwt.encode(payload, secret_key, algorithm='HS256')
print("生成されたJWT:", encoded_jwt)

JWTのデコード

デコードは、生成されたJWTから元の情報を取り出すプロセスです。このプロセスでは、トークンの署名が正しいこと、およびクレームが有効であることが確認されます。

python
import jwt

# トークンと秘密鍵(エンコード時と同じものを使用)
token = encoded_jwt  # 上記のエンコード例で生成されたJWT
secret_key = 'your_secret_key'

# JWTのデコードと検証
try:
    decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
    print("デコードされたデータ:", decoded)
except jwt.ExpiredSignatureError:
    print("トークンが期限切れです")
except jwt.InvalidTokenError:
    print("無効なトークンです")

上記が秘密鍵でエンコード、秘密鍵でデコードするパターンです。(HS256)

Clerkは公開鍵でデコードするパターンでデコードします。

ではやっていきますぜ。

ClerkのJWT検証に必要な情報

必要なのは

  1. JWT template
  2. token
  3. public_key
  4. [RS256]

1. JWT template


Claimsペイロードは好きなの作れるぞ

{
	"id": "{{user.id}}",
	"primary_email_address": "{{user.primary_email_address}}"
}

2. token

remixの場合はこれです

ts
export const loader: LoaderFunction = async (args) => {
  const { getToken } = await getAuth(args);
  // トークンを取得
  const token = await getToken({ template: 'GCP' });
  if (!token) {
    throw new Response("Token is missing", { status: 401 });
  }

3. public_key

4. [RS256]

デコードは公開鍵でデコードするRS256です。ローカルでテストしてみるといいかも

JWT検証をローカルでテストする方法

  1. tokenの文字列をコンソールに出力する
  2. tokenをPythonでデコードする

1.tokenの文字列をコンソールに出力する

  const token = await getToken({ template: 'GCP' });
  console.log(token);

2.tokenをPythonでデコードする

python
import jwt

# ここにコンソールからコピーしたトークンを貼り付けてください
token = 'eyJhbGciOiJSUzI1NiIsImNhdCI6ImNsX0I3ZDRQRDIyMkFBQSIsImtpZCI6Imluc18yZXgzTVRCSVlONk5MeFlCU2JOb1U3NTRJdlEiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE3MTI5OTIxNjQsImlhdCI6MTcxMjk5MjEwNCwiaWQiOiJ1c2VyXzJleDhUdmtmWHZFemZPN2ZnRk56bE5IMjNkMSIsImlzcyI6Imh0dHBzOi8vdXAtcXVhZ2dhLTYuY2xlcmsuYWNjb3VudHMuZGV2IiwianRpIjoiOWIwMDNkMTM3ZDg5NzgzM2U1NTQiLCJuYmYiOjE3MTI5OTIwOTksInByaW1hcnlfZW1haWxfYWRkcmVzcyI6Imx1a2VoYXJ1QGdtYWlsLmNvbSIsInN1YiI6InVzZXJfMmV4OFR2a2ZYdkV6Zk83ZmdGTnpsTkgyM2QxIn0.Iu106XggLk-IXLAf2aj68oDpY8Scsgn7BNYvwuIiY_ZSJ15khR6TSaCDUxGmvltef1sT8IsHzs654RlGwfpWFxOjRaggZOoqE3uFN9Q6UneO4Q2ag0oyx48m-MCrQWte51aW_GfLooTq7lQSOKydLKQVqhASCif8VJift3Hdw09b1s8ucG0eJGcveOaP8TYOun3KCUmUUrrCLOlDM1XrJ5QGtoXxZ9_DwuABA1Mr8BIcLF7C5lt3WLw_T03J2Qav4abxrzUBMOb5k0jQEQajeE2-HdmZaQ3uxKKE7US76DXtNQpPz6l_5bVNNHd3vmIkUBHS0yNWJ3H9Yi2jezCbKA'
public_key = '''
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3QhTWOJZ79uJqjS1Yq7Y
fpq1JhyNyz4Pwt7gTzVsq/fr/bRvWFObrW0sSWdkMaosXVvrZ0RDxCZK+ELTzqc5
z5+nbazMlp7fnVAehcnQ+M0BflgR7jBI+BRxv7IiMxjVAa44toBz953p0qmeh5Nu
M3a6KFIjzBqqL8jkxOVwMj9iF5KsXDvLstl2iLIt9wYuzHdyKMzCiFfwc/cdGspx
JlGHP7hFN4OOeufvjhTm1wO1vss7FNw/VlSrVnU6XVU+Z2J/cc0w8u05v+BKGHdg
hJtHfrkAs5g3dkGgnHIO8Un3FRJAPw1OK8xj5VwJeJ5Re4asfobE7gSQR0yLDFVJ
GwIDAQAB
-----END PUBLIC KEY-----

'''  # 実際の公開鍵をここに貼り付ける

try:
    decoded = jwt.decode(token, public_key, algorithms=["RS256"])
    print("デコードされたトークン:", decoded)
except jwt.ExpiredSignatureError:
    print("トークンの有効期限が切れています。")
except jwt.InvalidTokenError:
    print("無効なトークンです。")
except Exception as e:
    print("デコード中にエラーが発生しました:", e)

hoge.tsx

remixのページで外部APIにJWTのtokenをつけて送る方法です。

routes/hoge.tsx
// hoge.tsx
import { json,LoaderFunction } from '@remix-run/node';
import { useLoaderData } from '@remix-run/react';
import { getAuth } from "@clerk/remix/ssr.server";

const your_api_endpoint = "";

export const loader: LoaderFunction = async (args) => {
  const { getToken } = await getAuth(args);
  // トークンを取得
  const token = await getToken({ template: 'GCP' });
  if (!token) {
    throw new Response("Token is missing", { status: 401 });
  }

  
  // URLにトークンをクエリパラメータとして追加
  const url = new URL(your_api_endpoint);
  url.searchParams.append('token', token);


  // APIリクエストを送信
  try {
    const response = await fetch(url.toString(), {
      headers: { accept: 'application/json' }
    });

    if (!response.ok) {
      throw new Error('Network response was not ok');
    }


    const result = await response.json();
    console.log(result);
    
    return json(result);  // jsonヘルパーを使用して適切に返す
  } catch (error) {
    console.error("Failed to fetch data:", error);
    throw new Response('Failed to fetch data', { status: 500 });
  }
};

export default function Hoge() {
  const data = useLoaderData();

  return (
    <div>
      <h1>Resource Data</h1>
      <pre>{JSON.stringify(data, null, 2)}</pre>
    </div>
  );
}

TestAPI:Cloud Functions

以下は外部APIを作ってテストしてみるサンプルコードです。混同しないようにAPIはPythonで作りました。

ランタイム Python3.12

環境変数

enviroment
JWT_PUBLIC_KEY = "-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3QhTWOJZ79uJqjS1Yq7Y fpq1JhyNyz4Pwt7gTzVsq/fr/bRvWFObrW0sSWdkMaosXVvrZ0RDxCZK+ELTzqc5 z5+nbazMlp7fnVAehcnQ+M0BflgR7jBI+BRxv7IiMxjVAa44toBz953p0qmeh5Nu M3a6KFIjzBqqL8jkxOVwMj9iF5KsXDvLstl2iLIt9wYuzHdyKMzCiFfwc/cdGspx JlGHP7hFN4OOeufvjhTm1wO1vss7FNw/VlSrVnU6XVU+Z2J/cc0w8u05v+BKGHdg hJtHfrkAs5g3dkGgnHIO8Un3FRJAPw1OK8xj5VwJeJ5Re4asfobE7gSQR0yLDFVJ GwIDAQAB -----END PUBLIC KEY-----
"
main.py
import os
import functions_framework
import jwt
from flask import jsonify, request
import logging

logging.basicConfig(level=logging.INFO)

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    headers = {"Access-Control-Allow-Origin": "*"}

    token = request_json.get('token') if request_json else request_args.get('token')
    if not token:
        error_response = jsonify({"error": "Token is missing"})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 200, headers

    public_key = os.getenv('JWT_PUBLIC_KEY')
    if not public_key:
        logging.error("JWT_PUBLIC_KEY is not set.")
        error_response = jsonify({"error": "Internal server error"})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 500, headers

    try:
        decoded = jwt.decode(token, public_key, algorithms=['RS256'])
        user_id = decoded.get('id', 'Unknown')
        primary_email_address = decoded.get('primary_email_address', 'Unknown')

        success_response = jsonify({
            "message": "JWT is valid",
            "id": user_id,
            "primary_email_address": primary_email_address
        })
        logging.info(f"Response: {success_response.get_data(as_text=True)}")

        return success_response, 200, headers
    except jwt.ExpiredSignatureError:
        error_response = jsonify({"error": "Token has expired"})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 200, headers
    except jwt.InvalidTokenError as e:
        error_response = jsonify({"error": "Invalid token", "details": str(e)})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 200, headers
    except jwt.DecodeError:
        error_response = jsonify({"error": "Error decoding token"})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 200, headers
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        error_response = jsonify({"error": str(e)})
        logging.info(f"Response: {error_response.get_data(as_text=True)}")
        return error_response, 200, headers
requirements.txt
functions-framework==3.*
pyjwt
cryptography
flask

Discussion