🙌

Amazon SNSのHTTP/HTTPSエンドポイントをFastAPIで実装する

2024/12/24に公開

はじめに

Amazon Simple Notification Service (SNS) は、アプリケーション間やアプリケーションとユーザー間のメッセージング機能を提供するAWSのマネージドサービスです。SNSでは、HTTP/HTTPSのエンドポイントを購読先として登録することができ、自作のAPIサーバーでSNSからの情報を受け取ることが可能です。

本記事では、PythonのWebフレームワークであるFastAPIを使用して、Amazon SNSのHTTP/HTTPSエンドポイントを実装する方法を詳しく解説します。実装にあたっては、以下の要素が必要となります:

  1. 購読/受信サーバーの構築
  2. 購読リクエストの処理
  3. Amazon SNSから送られてきたデータの署名検証

また、FastAPIを使用する上でいくつかのハマりポイントがあったので、それらについても触れていきます。

FastAPIについて

FastAPIは、PythonのWebフレームワークで、Pythonの型ヒントの仕組みやPydanticを活用してAPIサーバーを構築することができます。

Amazon SNSと連携する際には、いくつかの注意点があります:

  1. Amazon SNSが送信するJSONのキーはPascalCase(UpperCamelCase)であり、Pythonで一般的に使用されるsnake_caseではありません。
  2. Amazon SNS用の型を定義する際は、Discriminated Unions(Tagged Union)を使用する必要があります。
  3. Amazon SNSが送信するJSONリクエストのヘッダーはContent-Type: text/plainです。
  4. テスタビリティを考慮して、購読処理の実装や署名検証を行いたいです。

これらの点を踏まえて、実装を進めていきます。

最終的なコード

https://github.com/yamitzky/amazon-sns-fastapi-subscription

このコードでは、FastAPI に関して、以下のようなテクニックを使っています(後述)。

  • alias_generator によるフィールド名の変換
  • Discriminated Union の活用
    • Type Adapter による、Union で定義されたデータのパース
  • Depends による副作用のある処理の注入と、テスタビリティの向上
  • pydantic-settings による設定ファイルの読み込み・注入と、テスタビリティの向上
  • FastAPI で作った API のテストと、fixture、mock の活用

リクエストの型の定義

まずは、Amazon SNSから受け取るデータを型安全に扱えるようにするための型をPydanticで定義します。

from typing import Annotated, Literal

from pydantic import BaseModel, ConfigDict, Field, HttpUrl, TypeAdapter
from pydantic.alias_generators import to_pascal


class SNSMessageBase(BaseModel):
    message_id: str
    topic_arn: str
    timestamp: str
    signature_version: str
    signature: str
    signing_cert_url: Annotated[HttpUrl, Field(alias="SigningCertURL")]
    message: str

    model_config = ConfigDict(
        alias_generator=to_pascal,
        extra="allow",
    )


class SNSNotification(SNSMessageBase):
    type: Literal["Notification"]
    subject: str | None = None


class SNSSubscription(SNSMessageBase):
    type: Literal["SubscriptionConfirmation"]
    subscribe_url: Annotated[HttpUrl, Field(alias="SubscribeURL")]
    token: str


class SNSUnsubscribe(SNSMessageBase):
    type: Literal["UnsubscribeConfirmation"]
    subscribe_url: Annotated[HttpUrl, Field(alias="SubscribeURL")]
    token: str


SNSMessage = Annotated[
    SNSNotification | SNSSubscription | SNSUnsubscribe, Field(discriminator="type")
]
sns_message_adapter: TypeAdapter[SNSMessage] = TypeAdapter(SNSMessage)


@app.post("/")
async def sns_receiver(
    request: Request,
):
    try:
        # text/plain で送られてくるため、引数として直接定義せず、自前で json に変換
        message = sns_message_adapter.validate_json(await request.body())
    except ValidationError as e:
        raise RequestValidationError(e.errors())

    return "ok"

この実装について、いくつかのポイントを説明します:

  1. Content-Type: Amazon SNSはデフォルトでtext/plainでデータを送信します。そのため、async def sns_receiver(message: SNSMessage)のように定義すると、FastAPIはInput should be a valid dictionary or object to extract fields fromというエラーを出します。この問題を回避するために、request.body()の結果を手動でバリデーションしています。

  2. TypeAdapter: SNSMessageはUnion型で定義されているため、SNSMessage.model_validate()のように直接呼び出すことはできません。代わりに、PydanticのTypeAdapterを使用しています。

  3. Discriminated Union: SNSのメッセージは"Type"キーに基づいて異なる構造を持ちます。PydanticのDiscriminated Union機能を使用して、この構造を表現しています。

  4. alias_generator: SNSのメッセージはPythonで一般的なsnake_caseではなくPascalCaseのキーを使用します。Pydanticのalias_generatorを使用して、この違いを吸収しています。

Content Type については、設定をすれば、 application/json で送ることも可能です。この場合は、1と2のテクニックは不要で、コードもシンプルになります。可能であれば Content Type を変更してください。

環境変数による設定の埋め込みとトピック名のフィルター

特定のトピックからのメッセージのみを受け付けたい場合、環境変数で設定した値とメッセージのトピックARNが一致しているかをチェックする必要があります。

pydantic-settingsライブラリを使用すると、環境変数から設定を読み込み、Pythonモデルとして扱うことができます。

from fastapi import Depends
from functools import lru_cache
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    sns_topic_arn: str = "arn:aws:sns:region:123456789012:test-topic"


@lru_cache
def get_settings() -> Settings:
    return Settings()


@app.post("/")
async def sns_receiver(
    request: Request,
    settings: Annotated[Settings, Depends(get_settings)],
):
    # 略

    if message.topic_arn != settings.sns_topic_arn:
        raise HTTPException(status_code=400, detail="Invalid TopicArn")

    return "ok"

この実装では、Depends で設定値(環境変数)の読み込みを注入しています。後述するように、テスト時に設定値を簡単に注入できるようになります。

また、今回の実装は「特定のSNSトピックからの送信のみ」を購読したかったために実装しています。任意のSNSトピックを購読できるようにする場合は、この実装は不要です。

メッセージの署名検証

公開されたHTTPSサーバーは、信頼できない送信元からの通信を受け取る可能性があります。そのため、Amazon SNSによって送信された信頼できるデータかを署名検証する必要があります。

以下は署名検証を行うクラスの実装例です:

import base64
import logging
from typing import cast

import httpx
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa

from app.models import SNSMessage

logger = logging.getLogger(__name__)


class SignatureVerifier:
    async def verify(self, message: SNSMessage) -> bool:
        signable = ""
        if message.type == "Notification":
            signable += f"Message\n{message.message}\n"
            signable += f"MessageId\n{message.message_id}\n"
            if message.subject:
                signable += f"Subject\n{message.subject}\n"
            signable += f"Timestamp\n{message.timestamp}\n"
            signable += f"TopicArn\n{message.topic_arn}\n"
            signable += f"Type\n{message.type}\n"
        else:
            signable += f"Message\n{message.message}\n"
            signable += f"MessageId\n{message.message_id}\n"
            signable += f"SubscribeURL\n{message.subscribe_url}\n"
            signable += f"Timestamp\n{message.timestamp}\n"
            signable += f"Token\n{message.token}\n"
            signable += f"TopicArn\n{message.topic_arn}\n"
            signable += f"Type\n{message.type}\n"

        cert_url = str(message.signing_cert_url)
        if not cert_url or "amazonaws.com" not in cert_url:
            logger.warning(f"Invalid SigningCertURL: {cert_url}")
            return False

        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(cert_url, timeout=5)
                response.raise_for_status()
                cert_pem = response.text
        except Exception as e:
            logger.error(f"Failed to fetch certificate: {str(e)}")
            return False

        decoded_signature = base64.b64decode(message.signature)

        cert = x509.load_pem_x509_certificate(cert_pem.encode("utf-8"))
        public_key = cast(rsa.RSAPublicKey, cert.public_key())
        sig_ver = message.signature_version
        logger.debug(f"Verifying signature with version {sig_ver}")
        try:
            public_key.verify(
                decoded_signature,
                signable.encode("utf-8"),
                padding.PKCS1v15(),
                hashes.SHA256() if sig_ver == "2" else hashes.SHA1(),
            )
            return True
        except Exception as e:
            logger.error(f"Signature verification failed: {str(e)}")
            return False


def get_signature_verifier() -> SignatureVerifier:
    return SignatureVerifier()


@app.post("/")
async def sns_receiver(
    request: Request,
    signature_verifier: Annotated[SignatureVerifier, Depends(get_signature_verifier)],
    settings: Annotated[Settings, Depends(get_settings)],
):
    # 略
    if not await signature_verifier.verify(message):
        raise HTTPException(status_code=400, detail="Invalid signature")

    return "ok"

この実装でも、Dependsを使用して依存性注入を行っています。証明書の検証は、証明書ダウンロードが必要=副作用がありますが、テスト時に処理をモック化してスキップすることができます。

SNSトピックの購読

Amazon SNSトピックの購読は、"SubscriptionConfirmation"タイプのメッセージに含まれる"SubscribeURL"にGETリクエストを送ることで完了します。以下は購読処理を行うクラスの実装例です:

import httpx
from pydantic import HttpUrl


class URLSubscriber:
    """SNSのsubscribe_urlを検証するクラス"""

    async def subscribe(self, url: HttpUrl) -> bool:
        """
        subscribe_urlにアクセスして購読確認を行うメソッド
        """
        async with httpx.AsyncClient() as client:
            response = await client.get(str(url))
            return response.is_success


def get_url_subscriber() -> URLSubscriber:
    return URLSubscriber()


@app.post("/")
async def sns_receiver(
    request: Request,
    signature_verifier: Annotated[SignatureVerifier, Depends(get_signature_verifier)],
    url_subscriber: Annotated[URLSubscriber, Depends(get_url_subscriber)],
    settings: Annotated[Settings, Depends(get_settings)],
):
    # 略
    if message.type == "SubscriptionConfirmation":
        if await url_subscriber.subscribe(message.subscribe_url):
            return {"status": "success", "message": "Subscription confirmed"}
        raise HTTPException(status_code=400, detail="Invalid SubscribeURL")

    elif message.type == "Notification":
        logger.info(f"Received SNS message: {message.message}")
        return {"status": "success", "message": "Message received"}

    elif message.type == "UnsubscribeConfirmation":
        return {"status": "success", "message": "Unsubscribe confirmed"}

この実装でもDependsを使用して依存性注入を行っています。また、型の絞り込み(narrowing)により、if message.type == "SubscriptionConfirmation"の条件分岐後はmessageの型がSNSSubscriptionに絞り込まれています。 pyright での型チェックも通りますし、VSCode で書いていても補完が聞きます。

テストコードの実装

最後に、テストコードの実装例を示します。署名検証処理、購読処理、環境変数からの設定値の読み込みについては、テスト時にモックを注入することで、副作用を発生させることなくテストを行うことができます。

長いので全体はGitHubへ。

https://github.com/yamitzky/amazon-sns-fastapi-subscription/blob/main/tests/test_main.py

以下は主要な処理の抜粋です。

@pytest.fixture
def valid_subscription():
    return {
        "Type": "SubscriptionConfirmation",
        "MessageId": "test-message-id",
        "TopicArn": "arn:aws:sns:region:123456789012:test-topic",
        "Message": "Test message",
        "Timestamp": "2023-01-01T00:00:00.000Z",
        "SignatureVersion": "1",
        "Signature": "test-signature",
        "SigningCertURL": "https://sns.us-east-1.amazonaws.com/cert.pem",
        "SubscribeURL": "https://sns.us-east-1.amazonaws.com/subscribe",
        "Token": "test-token",
    }

@pytest.fixture
def mock_get_verifier(mocker):
    """署名検証のモックを提供するフィクスチャ"""
    mock_verifier = mocker.Mock()
    mock_verifier.verify = mocker.AsyncMock(return_value=True)
    return lambda: mock_verifier

@pytest.fixture
def mock_get_subscriber(mocker):
    """subscribe_url検証のモックを提供するフィクスチャ"""
    mock_subscriber = mocker.Mock()
    mock_subscriber.subscribe = mocker.AsyncMock(return_value=True)
    return lambda: mock_subscriber

@pytest.fixture
def mock_get_settings():
    return lambda: Settings(sns_topic_arn="arn:aws:sns:region:123456789012:test-topic")

def test_subscription_confirmation_success(valid_subscription, mock_get_verifier, mock_get_subscriber, mock_get_settings):
    """正常なサブスクリプション確認を送信した場合のテスト"""
    app.dependency_overrides[get_signature_verifier] = mock_get_verifier
    app.dependency_overrides[get_url_subscriber] = mock_get_subscriber
    app.dependency_overrides[get_settings] = mock_get_settings

    response = client.post("/", json=valid_subscription)
    assert response.status_code == 200
    assert response.json() == {
        "status": "success",
        "message": "Subscription confirmed",
    }

まとめ

本記事では、FastAPIを使用してAmazon SNSのHTTP/HTTPSエンドポイントを実装する方法を詳しく解説しました。主なポイントは以下の通りです:

  1. Pydanticを使用したSNSメッセージの型定義
  2. 環境変数を使用した設定の管理
  3. 署名検証の実装
  4. サブスクリプション確認の処理
  5. テスタビリティを考慮した設計

FastAPIとAmazon SNSを組み合わせることで、スケーラブルで堅牢なメッセージング基盤を簡単に構築できます。

Discussion