⛓️

Certbot用ACMEサーバーをFlaskで偽装し、DNS-01で自己証明書を取得

2024/01/28に公開

概要

  • レート制限のあるステージングhttps://acme-staging-v02.api.letsencrypt.orgや本番https://acme-v02.api.letsencrypt.org/directoryを実際に使わずに無制限でLet's Encryptの証明書を疑似的に自己証明書で取得するローカル検証環境
  • hosts修正とFlaskサーバー起動の簡易操作のみで、step-ca、step-cli不使用

TL;DR

  • ダミーのレスポンス処理をFlask(SSL)で実装

事前準備

前提:Docker版Certbot(certbot/certbot:amd64-v2.6.0)使用

  • Flask用サーバー証明書作成
    command
    export KEY_NAME=server.key && \
        export CSR_NAME=server.csr && \
        export CRT_NAME=server.crt && \
        export SUBJ_NAME=subject.txt && \
        echo "subjectAltName=DNS:*.api.letsencrypt.org,DNS:localhost" > $SUBJ_NAME && \
        openssl genrsa 2048 > $KEY_NAME && \
        openssl req -new -key $KEY_NAME -out $CSR_NAME -subj "/CN=letsencrypt.org" && \
        openssl x509 -req -days 36500 -in $CSR_NAME -signkey $KEY_NAME -out $CRT_NAME -extfile $SUBJ_NAME
    
  • hostsファイル修正(--serverでlocalhost指定する場合は不要)
    Dockerで起動するときの引数に--add-host=acme-v02.api.letsencrypt.org:127.0.0.1 --add-host=acme-staging-v02.api.letsencrypt.org:127.0.0.1を追加
    /etc/hosts
    ・・・
    127.0.0.1       acme-v02.api.letsencrypt.org
    127.0.0.1       acme-staging-v02.api.letsencrypt.org
    
    ※localhost指定と--test-certオプションを併用すると--server value conflicts with --stagingエラーとなる。
  • pythonライブラリインストール
    command
    pip install Flask==3.0.1
    
  • ダミーのFlaskサーバー作成
    server.py
    import json
    import os
    from base64 import urlsafe_b64decode
    from datetime import datetime, timedelta
    from pathlib import Path
    from uuid import uuid4
    
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives.asymmetric.rsa import (
        RSAPrivateKey,
        generate_private_key,
    )
    from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes
    from cryptography.hazmat.primitives.hashes import SHA256
    from cryptography.hazmat.primitives.serialization import Encoding
    from cryptography.x509 import (
        BasicConstraints,
        Certificate,
        CertificateBuilder,
        DNSName,
        Name,
        NameAttribute,
        NameOID,
        SubjectAlternativeName,
        load_pem_x509_csr,
        random_serial_number,
    )
    from flask import Flask, Response, jsonify, make_response, request
    
    
    def acme_server() -> None:
        app: Flask = Flask(__name__)
    
        PRIV_KEY_ROOT_DIR: Path = Path(
            os.environ.get("TMP_PRIV_KEY_DIR", os.path.expanduser("~"))
        )
        FIELD: str = "dummyField"
        TOKEN: str = "dummyToken" * 16
        NEW_ACCOUNT_PATH: str = "/account"
        CERT_PATH: str = "/certificate"
    
        domain_table: dict[str, list[str]] = {}
        pub_key_table: dict[str, CertificatePublicKeyTypes] = {}
    
        @app.route("/directory")
        def directory() -> Response:
            root_url: str = f"https://{request.host}"
            return jsonify(
                dict(
                    newAccount=f"{root_url}{NEW_ACCOUNT_PATH}",
                    newNonce=root_url,
                    newOrder=root_url,
                )
            )
    
        @app.route(NEW_ACCOUNT_PATH, methods=["HEAD", "POST"])
        def account() -> Response:
            return make_acme_response()
    
        @app.route("/", methods=["HEAD", "POST"])
        def root() -> Response:
            res: Response = make_acme_response()
            res.headers["Location"] = f"https://{request.host}"
            return res
    
        def make_acme_response() -> Response:
            if request.method == "HEAD":
                res: Response = Response()
                res.headers["Replay-Nonce"] = str(uuid4())
                return res
    
            protected: dict = json.loads(
                urlsafe_b64decode(f'{request.json["protected"]}==='.encode()).decode()
            )
            nonce: str = protected["nonce"]
            domains: list[str] = []
            if request.json["payload"] != "":
                payload: dict = json.loads(
                    urlsafe_b64decode(f'{request.json["payload"]}==='.encode()).decode()
                )
                if "identifiers" in payload.keys():
                    domains.extend(
                        [identifier["value"] for identifier in payload["identifiers"]]
                    )
                    domain_table.update({nonce: domains})
                if "csr" in payload.keys():
                    pub_key_table.update(
                        {
                            nonce: load_pem_x509_csr(
                                f"-----BEGIN CERTIFICATE REQUEST-----\n{payload['csr'].replace('_', '/').replace('-', '+')}\n-----END CERTIFICATE REQUEST-----\n".encode(),
                                backend=default_backend(),
                            ).public_key()
                        }
                    )
    
            url: str = f"https://{request.host}"
            data: dict[str, str | list[str | dict[str, str | list[str]]]] = dict(
                authorizations=[url],
                identifier=dict(type="dns", value=domains),
                challenges=[
                    dict(
                        type="dns-01",
                        field=FIELD,
                        token=TOKEN,
                        url=url,
                    )
                ],
                type="dns-01",
                field=FIELD,
                token=TOKEN,
                url=url,
                finalize=url,
                certificate=f"https://{request.host}{CERT_PATH}",
                status="valid",
            )
    
            res: Response = make_response(jsonify(data))
            res.headers["Replay-Nonce"] = nonce
            res.headers["Link"] = f'<{url}>;rel="up"'
    
            return res
    
        @app.post(CERT_PATH)
        def certificate() -> Response:
            protected: dict = json.loads(
                urlsafe_b64decode(f'{request.json["protected"]}==='.encode()).decode()
            )
            nonce: str = protected["nonce"]
            domains: list[str] = domain_table.pop(nonce, [])
    
            current_date: datetime = datetime.now()
            not_valid_before: datetime = current_date
            not_valid_after: datetime = current_date + timedelta(days=365)
    
            priv_key: RSAPrivateKey = generate_private_key(65537, 2048)
            pub_key: CertificatePublicKeyTypes = pub_key_table.pop(nonce, None)
            subject = issuer = Name(
                [
                    NameAttribute(NameOID.COMMON_NAME, "localhost"),
                    NameAttribute(NameOID.ORGANIZATION_NAME, "Let's Encrypt"),
                ]
            )
            root_cert: Certificate = (
                CertificateBuilder()
                .subject_name(subject)
                .issuer_name(issuer)
                .public_key(pub_key)
                .serial_number(random_serial_number())
                .not_valid_before(not_valid_before)
                .not_valid_after(not_valid_after)
                .add_extension(
                    BasicConstraints(True, None),
                    True,
                )
                .add_extension(
                    SubjectAlternativeName([DNSName(domain) for domain in domains]),
                    False,
                )
                .sign(priv_key, SHA256(), default_backend())
            )
            intermediate_cert: Certificate = (
                CertificateBuilder()
                .subject_name(subject)
                .issuer_name(root_cert.subject)
                .public_key(pub_key)
                .serial_number(random_serial_number())
                .not_valid_before(not_valid_before)
                .not_valid_after(not_valid_after)
                .sign(priv_key, SHA256(), default_backend())
            )
    
            res: Response = make_response(
                (
                    root_cert.public_bytes(Encoding.PEM)
                    + intermediate_cert.public_bytes(Encoding.PEM)
                ).decode()
            )
            res.headers["Replay-Nonce"] = nonce
    
            return res
    
        app.run(host="0.0.0.0", port=443, ssl_context=("server.crt", "server.key"))
    
    
    if __name__ == "__main__":
        acme_server()
    

取得実行

  • Flaskサーバー起動
    command
    python server.py
    
  • Certbot実行
    別ターミナル起動もしくはpythonコマンド実行時に最後に&をつけてバックグラウンドジョブに回しておく。
    環境変数REQUESTS_CA_BUNDLEにFlask用サーバー証明書を指定
    • 新規取得の場合
      command
      export REQUESTS_CA_BUNDLE=server.crt && \
      certbot certonly \
          --manual \
          --preferred-challenges=dns \
          --server https://acme-v02.api.letsencrypt.org/directory \
          --agree-tos -m メールアドレス \
          --manual-auth-hook "echo" \
          --manual-cleanup-hook "echo" \
          --expand -n -d *.メインドメイン
      
    • 更新のみの場合
      command
      export REQUESTS_CA_BUNDLE=server.crt && \
      certbot renew \
          --manual \
          --preferred-challenges=dns \
          --server https://acme-v02.api.letsencrypt.org/directory \
          --agree-tos -m メールアドレス \
          --manual-auth-hook "echo" \
          --manual-cleanup-hook "echo" \
          --cert-name メインドメイン
      

※hostsファイルを修正しなかった場合は--server https://localhost/directory

参考: Dockerfile

Dockerfile
FROM certbot/certbot:amd64-v2.6.0

ENV SERVER_DIR /acme_server

COPY ./server.py ${SERVER_DIR}/server.py

RUN pip install Flask==3.0.1 && \
    export KEY_NAME=${SERVER_DIR}/server.key && \
    export CSR_NAME=${SERVER_DIR}/server.csr && \
    export CRT_NAME=${SERVER_DIR}/server.crt && \
    export SUBJ_NAME=${SERVER_DIR}/subject.txt && \
    echo "subjectAltName=DNS:*.api.letsencrypt.org,DNS:localhost" > ${SUBJ_NAME} && \
    openssl genrsa 2048 > $KEY_NAME && \
    openssl req -new -key $KEY_NAME -out $CSR_NAME -subj "/CN=letsencrypt.org" && \
    openssl x509 -req -days 36500 -in $CSR_NAME -signkey $KEY_NAME -out $CRT_NAME -extfile $SUBJ_NAME && \
    echo -e "#!/bin/sh\ncd ${SERVER_DIR} && python server.py & certbot \"\$@\"" > ${SERVER_DIR}/start.sh && \
    chmod 775 ${SERVER_DIR}/start.sh

ARG REQUESTS_CA_BUNDLE
ENV REQUESTS_CA_BUNDLE ${SERVER_DIR}/server.crt

ENTRYPOINT [ "/acme_server/start.sh" ]

説明

Flaskサーバー側

acme-v02.api.letsencrypt.orgへのアクセス偽装は以下の記事の通り。

https://acme-v02.api.letsencrypt.org/directoryのアクセスを@app.route("/directory")にルーティングし、再度自分自身のサーバーURLを渡す。

make_acme_response()でサーバー証明書返却以外の共通処理を集約している。
この辺りはRFC 8555の仕様に従ってレスポンスを構築しているが、詳細は未調査。certbot側の動きをトレースして処理が通るようにしているだけ。
ただし、newAccountの時にヘッダにLocationが含まれていると弾かれるので、その部分は共通化せずルーティングも分けている。

make_acme_response()でリクエストのpayloadにidentifiersがあった場合、その中に要求のDNSリストがあるのでnonceと紐づける形で一旦保持する。

make_acme_response()
if "identifiers" in payload.keys():
    domains.extend(
        [identifier["value"] for identifier in payload["identifiers"]]
    )
    domain_table.update({nonce: domains})

CSR(署名要求)があった場合、公開鍵をnonceと紐づける形で一旦保持する。

make_acme_response()
if "csr" in payload.keys():
    pub_key_table.update(
        {
            nonce: load_pem_x509_csr(
                f"-----BEGIN CERTIFICATE REQUEST-----\n{payload['csr'].replace('_', '/').replace('-', '+')}\n-----END CERTIFICATE REQUEST-----\n".encode(),
                backend=default_backend(),
            ).public_key()
        }
    )

@app.post(CERT_PATH)で自己証明書を作成する。
上記で保持したドメインと公開鍵を取り出し、中間証明書付きで証明書を発行する。
ただし、証明書作成周りの仕組みはあまりよくわからないので未精査。

取得実行時

certbotコマンドのmanual-auth-hookとmanual-cleanup-hookは、初回のPOSTでstatus="valid"を返した場合には実行スキップされるようなので、PATHが通っているコマンドであればなんでも良い。
また、--no-verify-sslをつければFlaskの証明書のためにREQUESTS_CA_BUNDLEを設定する必要はなくなるが、本運用同様のコマンドとは異なることになるので今回は不採用。

Discussion