🔑

AWS IoT Coreのクライアント証明書をOCSPで検証する

2024/10/16に公開

OCSP

IoTでは、クライアント証明書の有効性確認は重要です。クライアントが提示する証明書が不正に取得されたり、期限切れになっている場合、それを迅速かつ確実に検出することが求められます。ここで、従来の証明書失効リスト(CRL: Certificate Revocation List)の問題を解決するために登場したのが、OCSP(Online Certificate Status Protocol)です。

CRLにはいくつかの課題がありました。まず、リストが大きくなりがちで、クライアントが頻繁に大規模なリストをダウンロードする必要があるため、非効率的でした。また、リストの更新に遅れが生じることがあり、失効情報を即時に反映できないという問題もありました。これを解決するために、OCSPはリアルタイムで証明書の失効情報を取得できる仕組みとして導入されました。

今日は、AWS IoT Core で OCSPレスポンスの検証を実施する際に便利そうな Custom client certificate validation 機能について書いてみようと思います。

カスタムクライアント証明書検証

AWS IoT CoreのCustom client certificate validationは、Pre Authとも呼ばれ、クライアントがX.509証明書を使ってTLSセッションを確立する前にLambdaを呼び出す仕組みです。つまり、このLambdaでOCSP responderへ問い合わせを行えば、OCPSレスポンスの検証が簡単にできるという寸法です。

呼び出すLambda関数はカスタムドメインに対してマッピングされるため、カスタムドメインコンフィグを設定する必要があります。

OCSP Responder

今回はOCSPにAWS Private CAでOCSPレスポンダーを有効にして、クライアント証明書を発行します。ここから発行されたクライアント証明書にはAuthority Information AccessとしてOCSP ResponderのURLが含まれます。

Private CAの設定

CAのコンフィグを準備

cat ca-config.json 
{
  "KeyAlgorithm": "EC_prime256v1",
  "SigningAlgorithm": "SHA256WITHECDSA",
  "Subject": {
    "Country": "JP",
    "Organization": "My Corp",
    "OrganizationalUnit": "IT",
    "State": "Tokyo",
    "CommonName": "myc.com"
  }
}

CAの作成

aws acm-pca create-certificate-authority \
    --certificate-authority-configuration file://ca-config.json \
    --certificate-authority-type ROOT \
    --idempotency-token unique-token
{
    "CertificateAuthorityArn": "arn:aws:acm-pca:ap-northeast-1:<ACCOUNT_ID>:certificate-authority/669316fb-d4f5-4f7c-b4d3-0648a8584352"
}

ここではCAのステータスが "Status": "PENDING_CERTIFICATE" になっている。

OCSPを有効にする

revocation-config.json を以下のようにして

{
  "OcspConfiguration": {
    "Enabled": true
  }
}

アップデートする

# 繰り返し使うので環境変数をセットしておく
CA_ARN=arn:aws:acm-pca:ap-northeast-1:<ACCOUNT_ID>:certificate-authority/669316fb-d4f5-4f7c-b4d3-0648a8584352

aws acm-pca update-certificate-authority \
--certificate-authority-arn $CA_ARN \
--revocation-configuration file://revocation-config.json 

OcspConfigurationが有効になっているかを確認

aws acm-pca describe-certificate-authority —certificate-authority-arn $CA_ARN
        "RevocationConfiguration": {
            "CrlConfiguration": {
                "Enabled": false
            },
            "OcspConfiguration": {
                "Enabled": true
            }
        },

Webコンソールから Action > Install CA certificate する。CAのステータスが "Status": "ACTIVE" になる。

クライアント証明書を発行する

openssl ecparam -genkey -name prime256v1 -noout -out device.key
openssl req -new -key device.key -out device.csr -subj "/C=JP/ST=Tokyo/L=Tokyo/O=Example Corp/OU=IT/CN=$(uuidgen)"
aws acm-pca issue-certificate \
    --certificate-authority-arn $CA_ARN \
    --csr fileb://device.csr \
    --signing-algorithm "SHA256WITHECDSA" \
    --validity Value=365,Type="DAYS" \
    --output text
arn:aws:acm-pca:ap-northeast-1:<ACCOUNT_ID>:certificate-authority/669316fb-d4f5-4f7c-b4d3-0648a8584352/certificate/250360f1735d43abe012fgae8da6d105

作成した証明書をダウンロードする

CERTIFICATE_ARN=arn:aws:acm-pca:ap-northeast-1:<ACCOUNT_ID>:certificate-authority/669316fb-d4f5-4f7c-b5d3-0648a8584352/certificate/250360f1735d43abe012fgae8da6d105

aws acm-pca get-certificate \
--certificate-authority-arn $CA_ARN \
--certificate-arn $CERTIFICATE_ARN \
--output text > device.crt

ダウンロードした証明書は証明書の区切りの部分の改行が-----END CERTIFICATE----- -----BEGIN CERTIFICATE-----のようにおかしくなっているので修正しておきます。

以下のようなコマンドで証明書のAIAを探してみてください。OCSP ResponderのURLが見つかると思います。

openssl x509 -in device.crt -text -noout 

OCSP Responderにアクセスする

AWS CLIでOCSP Responderにアクセスしてみます。

CA証明書が必要なのでダウンロードしておきます。

aws acm-pca get-certificate-authority-certificate \
  --certificate-authority-arn $CA_ARN \
  --region ap-northeast-1 \
  --output text > ca-cert.pem

OCSPに問いあわせ

openssl ocsp -issuer ca-cert.pem -cert ./device-cert-1/device.crt -url http://ocsp.acm-pca.ap-northeast-1.amazonaws.com -text
  
OCSP Request Data:
    Version: 1 (0x0)
    Requestor List:
        Certificate ID:
          Hash Algorithm: sha1
          Issuer Name Hash: E096141C54C0AB86987004F8091EEF32D2125FDC
          Issuer Key Hash: 1BBFFA8882661698FC41E9995D9A89C718D56612
          Serial Number: 250360F1735D43ABE012FEAE8DA6D105
    Request Extensions:
        OCSP Nonce: 
            041080BF5E3734D48B9DCA54BA8BC7475DDB
OCSP Response Data:
    OCSP Response Status: successful (0x0)
    Response Type: Basic OCSP Response
    Version: 1 (0x0)
    Responder Id: C = JP, O = My Corp, OU = IT, ST = Tokyo, CN = myc.com
    Produced At: Oct  6 02:36:44 2024 GMT
    Responses:
    Certificate ID:
      Hash Algorithm: sha1
      Issuer Name Hash: E096141C54C0AB86987004F8091EEF32D2125FDC
      Issuer Key Hash: 1BBFFA8892661698FC41E9995D9A89C718D56612
      Serial Number: 250360F1735D43ABE012FEAE8DA6D105
    Cert Status: good
    This Update: Oct  6 02:36:44 2024 GMT
    Next Update: Oct 16 02:36:44 2024 GMT

    Signature Algorithm: ecdsa-with-SHA256
    Signature Value:
        30:45:02:21:00:91:8e:92:1f:05:8d:59:66:88:a2:87:32:bd:
        9f:70:89:4b:77:4e:19:bd:86:cc:f0:92:d3:f2:3c:06:fe:b1:
        ab:02:20:39:fd:7b:00:58:da:f7:63:67:3a:48:07:a8:aa:29:
        4b:66:2c:ff:e8:f4:d3:e2:c9:02:f8:e4:5e:07:47:f5:e5
WARNING: no nonce in response
Response verify OK
device.crt:good
    This Update: Oct  6 02:36:44 2024 GMT
    Next Update: Oct 16 02:36:44 2024 GMT

Response verify OK なので検証に成功していますね。

OCSP Lambda Function

Lambda関数 pre-auth のサンプルは以下のような感じになると思います。

  1. IoT Coreから渡されるクライアント証明書と中間証明書をevent から受け取る
  2. 証明書のAIAからOCSP ResponderのURLを取得
  3. OCSP Responderに署名書を送り、有効性を確認
  4. Lambdaが{ 'isAuthenticated': True }を返すことで認証は成功
import json
import logging
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.ocsp import OCSPRequestBuilder, OCSPResponseStatus

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def load_certificate_from_string(cert_str):
    """Load certificate from a PEM string."""
    logger.info("Loading certificate from string.")
    cert = x509.load_pem_x509_certificate(cert_str.encode('utf-8'), default_backend())
    logger.info(f"Certificate loaded: {cert.subject}")
    return cert

def get_ocsp_url(cert):
    """Extract OCSP URL from the certificate."""
    logger.info("Extracting OCSP URL from the certificate.")
    aia = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
    for access_description in aia.value:
        if access_description.access_method == x509.AuthorityInformationAccessOID.OCSP:
            ocsp_url = access_description.access_location.value
            logger.info(f"OCSP URL found: {ocsp_url}")
            return ocsp_url
    logger.warning("No OCSP URL found in the certificate.")
    return None

def create_ocsp_request(client_cert, issuer_cert):
    """Create OCSP request for the given client and issuer certificates."""
    logger.info("Creating OCSP request.")
    builder = OCSPRequestBuilder()
    builder = builder.add_certificate(client_cert, issuer_cert, hashes.SHA1())
    ocsp_request = builder.build()
    logger.info("OCSP request created successfully.")
    return ocsp_request

def send_ocsp_request(ocsp_url, ocsp_request):
    """Send OCSP request and return the response."""
    logger.info(f"Sending OCSP request to {ocsp_url}")
    headers = {
        'Content-Type': 'application/ocsp-request',
        'Accept': 'application/ocsp-response',
    }
    response = requests.post(ocsp_url, data=ocsp_request.public_bytes(Encoding.DER), headers=headers)
    logger.info(f"OCSP request sent, received response of length {len(response.content)}")
    return response.content

def check_ocsp_status(ocsp_response, client_cert):
    """Check the OCSP response status."""
    logger.info("Checking OCSP response status.")
    ocsp_resp = x509.ocsp.load_der_ocsp_response(ocsp_response)

    if ocsp_resp.response_status == OCSPResponseStatus.SUCCESSFUL:
        for single_response in ocsp_resp.responses:
            if single_response.certificate_status == x509.ocsp.OCSPCertStatus.GOOD:
                logger.info("Certificate is valid (OCSP status: good).")
                return True
            elif single_response.certificate_status == x509.ocsp.OCSPCertStatus.REVOKED:
                logger.info("Certificate is revoked (OCSP status: revoked).")
                return False
            else:
                logger.info("Certificate status is unknown.")
                return False
    else:
        logger.error(f"OCSP request failed with status: {ocsp_resp.response_status}")
        return False

def lambda_handler(event, context):
    """AWS Lambda handler function."""
    try:
        # Extract certificates from event
        client_cert_pem = event['clientCertificateChain'][0]
        issuer_cert_pem = event['clientCertificateChain'][1]

        # Log event data for debugging
        logger.info(f"Received event: {json.dumps(event, indent=2)}")

        # Load certificates
        client_cert = load_certificate_from_string(client_cert_pem)
        issuer_cert = load_certificate_from_string(issuer_cert_pem)

        # Get OCSP URL from client certificate
        ocsp_url = get_ocsp_url(client_cert)
        if not ocsp_url:
            logger.error("OCSP URL not found in client certificate.")
            return { 'isAuthenticated': False }

        # Create OCSP request
        ocsp_request = create_ocsp_request(client_cert, issuer_cert)

        # Send OCSP request and get response
        ocsp_response = send_ocsp_request(ocsp_url, ocsp_request)

        # Check OCSP response status
        is_authenticated = check_ocsp_status(ocsp_response, client_cert)

        logger.info(f"Authentication result: {is_authenticated}")
        return { 'isAuthenticated': is_authenticated }

    except Exception as e:
        logger.error(f"Error: {str(e)}")
        return { 'isAuthenticated': False }

Lambda関数をIoT Coreから呼び出せるように以下のようなResource-based policyを設定することを忘れないようにしましょう。またCloudWatchでログを確認できるようなRoleを設定しておくとデバッグがやりやすいです。

{
  "ArnLike": {
    "AWS:SourceArn": "arn:aws:iot:ap-northeast-1:<ACCOUNT_ID>:domainconfiguration/myCustomDomainConfig/xmncz"
  }
}

ドメインコンフィグを作成する

今回は以前に作成していたドメインコンフィグを使います。作成当時は以下のようにしていました。

aws iot create-domain-configuration \
--domain-configuration-name "myCustomDomainConfig" \
--service-type "DATA" \
--domain-name "iot.company-x.com" \
--server-certificate-arns $SERVER_CERT_ARN

これを、Custom client certificate validation つかうためにアップデートします。CLIのバージョンは aws-cli/2.18.0 を使用しています。

aws iot update-domain-configuration \
    --domain-configuration-name myCustomDomainConfig \
    --authentication-type AWS_X509 \
    --application-protocol SECURE_MQTT \
    --client-certificate-config '{"clientCertificateCallbackArn":"arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:pre-auth"}'

これにより、このドメインへのMQTTアクセスがLambdaをトリガーすることになります。

クライアントから接続

mosquitto_pub で接続してみましょう。以下のような出力になればOCSPのチェックは成功しています。

mosquitto_pub --cafile AmazonRootCA1.pem \
  --cert device.pem.crt \
  --key private.pem.key \
  -h iot.company-x.com \
  -p 8883 \
  -t t1 \
  -i pub_bc233a43 \
  -m {\"time\":\"2024-10-05T10:07:16+0900\"} \
  -d
  
Client pub_bc233a43 sending CONNECT
Client pub_bc233a43 received CONNACK (0)
Client pub_bc233a43 sending PUBLISH (d0, q0, r0, m1, 't1', ... (35 bytes))
Client pub_bc233a43 sending DISCONNECT

おつかれさま

AWS IoT CoreでOCSPによるクライアント証明書の検証を行う方法を試してみました。この仕組みに乗っかることで比較的簡単にOCSPの証明書検証ができました。Lambdaを使うのでいろいろ柔軟に処理できそうですね。それでは、また次回。

Cheers, 🍺

Discussion