Amazon SNSのHTTP/HTTPSエンドポイントをFastAPIで実装する
はじめに
Amazon Simple Notification Service (SNS) は、アプリケーション間やアプリケーションとユーザー間のメッセージング機能を提供するAWSのマネージドサービスです。SNSでは、HTTP/HTTPSのエンドポイントを購読先として登録することができ、自作のAPIサーバーでSNSからの情報を受け取ることが可能です。
本記事では、PythonのWebフレームワークであるFastAPIを使用して、Amazon SNSのHTTP/HTTPSエンドポイントを実装する方法を詳しく解説します。実装にあたっては、以下の要素が必要となります:
- 購読/受信サーバーの構築
- 購読リクエストの処理
- Amazon SNSから送られてきたデータの署名検証
また、FastAPIを使用する上でいくつかのハマりポイントがあったので、それらについても触れていきます。
FastAPIについて
FastAPIは、PythonのWebフレームワークで、Pythonの型ヒントの仕組みやPydanticを活用してAPIサーバーを構築することができます。
Amazon SNSと連携する際には、いくつかの注意点があります:
- Amazon SNSが送信するJSONのキーはPascalCase(UpperCamelCase)であり、Pythonで一般的に使用されるsnake_caseではありません。
- Amazon SNS用の型を定義する際は、Discriminated Unions(Tagged Union)を使用する必要があります。
- Amazon SNSが送信するJSONリクエストのヘッダーは
Content-Type: text/plain
です。 - テスタビリティを考慮して、購読処理の実装や署名検証を行いたいです。
これらの点を踏まえて、実装を進めていきます。
最終的なコード
このコードでは、FastAPI に関して、以下のようなテクニックを使っています(後述)。
- alias_generator によるフィールド名の変換
- Discriminated Union の活用
- Type Adapter による、Union で定義されたデータのパース
- Depends による副作用のある処理の注入と、テスタビリティの向上
- pydantic-settings による設定ファイルの読み込み・注入と、テスタビリティの向上
- FastAPI で作った API のテストと、fixture、mock の活用
リクエストの型の定義
まずは、Amazon SNSから受け取るデータを型安全に扱えるようにするための型をPydanticで定義します。
from typing import Annotated, Literal
from pydantic import BaseModel, ConfigDict, Field, HttpUrl, TypeAdapter
from pydantic.alias_generators import to_pascal
class SNSMessageBase(BaseModel):
message_id: str
topic_arn: str
timestamp: str
signature_version: str
signature: str
signing_cert_url: Annotated[HttpUrl, Field(alias="SigningCertURL")]
message: str
model_config = ConfigDict(
alias_generator=to_pascal,
extra="allow",
)
class SNSNotification(SNSMessageBase):
type: Literal["Notification"]
subject: str | None = None
class SNSSubscription(SNSMessageBase):
type: Literal["SubscriptionConfirmation"]
subscribe_url: Annotated[HttpUrl, Field(alias="SubscribeURL")]
token: str
class SNSUnsubscribe(SNSMessageBase):
type: Literal["UnsubscribeConfirmation"]
subscribe_url: Annotated[HttpUrl, Field(alias="SubscribeURL")]
token: str
SNSMessage = Annotated[
SNSNotification | SNSSubscription | SNSUnsubscribe, Field(discriminator="type")
]
sns_message_adapter: TypeAdapter[SNSMessage] = TypeAdapter(SNSMessage)
@app.post("/")
async def sns_receiver(
request: Request,
):
try:
# text/plain で送られてくるため、引数として直接定義せず、自前で json に変換
message = sns_message_adapter.validate_json(await request.body())
except ValidationError as e:
raise RequestValidationError(e.errors())
return "ok"
この実装について、いくつかのポイントを説明します:
-
Content-Type: Amazon SNSはデフォルトでtext/plainでデータを送信します。そのため、
async def sns_receiver(message: SNSMessage)
のように定義すると、FastAPIはInput should be a valid dictionary or object to extract fields from
というエラーを出します。この問題を回避するために、request.body()
の結果を手動でバリデーションしています。 -
TypeAdapter:
SNSMessage
はUnion型で定義されているため、SNSMessage.model_validate()
のように直接呼び出すことはできません。代わりに、PydanticのTypeAdapterを使用しています。 -
Discriminated Union: SNSのメッセージは"Type"キーに基づいて異なる構造を持ちます。PydanticのDiscriminated Union機能を使用して、この構造を表現しています。
-
alias_generator: SNSのメッセージはPythonで一般的な
snake_case
ではなくPascalCase
のキーを使用します。Pydanticのalias_generatorを使用して、この違いを吸収しています。
Content Type については、設定をすれば、 application/json
で送ることも可能です。この場合は、1と2のテクニックは不要で、コードもシンプルになります。可能であれば Content Type を変更してください。
環境変数による設定の埋め込みとトピック名のフィルター
特定のトピックからのメッセージのみを受け付けたい場合、環境変数で設定した値とメッセージのトピックARNが一致しているかをチェックする必要があります。
pydantic-settingsライブラリを使用すると、環境変数から設定を読み込み、Pythonモデルとして扱うことができます。
from fastapi import Depends
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
sns_topic_arn: str = "arn:aws:sns:region:123456789012:test-topic"
@lru_cache
def get_settings() -> Settings:
return Settings()
@app.post("/")
async def sns_receiver(
request: Request,
settings: Annotated[Settings, Depends(get_settings)],
):
# 略
if message.topic_arn != settings.sns_topic_arn:
raise HTTPException(status_code=400, detail="Invalid TopicArn")
return "ok"
この実装では、Depends で設定値(環境変数)の読み込みを注入しています。後述するように、テスト時に設定値を簡単に注入できるようになります。
また、今回の実装は「特定のSNSトピックからの送信のみ」を購読したかったために実装しています。任意のSNSトピックを購読できるようにする場合は、この実装は不要です。
メッセージの署名検証
公開されたHTTPSサーバーは、信頼できない送信元からの通信を受け取る可能性があります。そのため、Amazon SNSによって送信された信頼できるデータかを署名検証する必要があります。
以下は署名検証を行うクラスの実装例です:
import base64
import logging
from typing import cast
import httpx
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from app.models import SNSMessage
logger = logging.getLogger(__name__)
class SignatureVerifier:
async def verify(self, message: SNSMessage) -> bool:
signable = ""
if message.type == "Notification":
signable += f"Message\n{message.message}\n"
signable += f"MessageId\n{message.message_id}\n"
if message.subject:
signable += f"Subject\n{message.subject}\n"
signable += f"Timestamp\n{message.timestamp}\n"
signable += f"TopicArn\n{message.topic_arn}\n"
signable += f"Type\n{message.type}\n"
else:
signable += f"Message\n{message.message}\n"
signable += f"MessageId\n{message.message_id}\n"
signable += f"SubscribeURL\n{message.subscribe_url}\n"
signable += f"Timestamp\n{message.timestamp}\n"
signable += f"Token\n{message.token}\n"
signable += f"TopicArn\n{message.topic_arn}\n"
signable += f"Type\n{message.type}\n"
cert_url = str(message.signing_cert_url)
if not cert_url or "amazonaws.com" not in cert_url:
logger.warning(f"Invalid SigningCertURL: {cert_url}")
return False
try:
async with httpx.AsyncClient() as client:
response = await client.get(cert_url, timeout=5)
response.raise_for_status()
cert_pem = response.text
except Exception as e:
logger.error(f"Failed to fetch certificate: {str(e)}")
return False
decoded_signature = base64.b64decode(message.signature)
cert = x509.load_pem_x509_certificate(cert_pem.encode("utf-8"))
public_key = cast(rsa.RSAPublicKey, cert.public_key())
sig_ver = message.signature_version
logger.debug(f"Verifying signature with version {sig_ver}")
try:
public_key.verify(
decoded_signature,
signable.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256() if sig_ver == "2" else hashes.SHA1(),
)
return True
except Exception as e:
logger.error(f"Signature verification failed: {str(e)}")
return False
def get_signature_verifier() -> SignatureVerifier:
return SignatureVerifier()
@app.post("/")
async def sns_receiver(
request: Request,
signature_verifier: Annotated[SignatureVerifier, Depends(get_signature_verifier)],
settings: Annotated[Settings, Depends(get_settings)],
):
# 略
if not await signature_verifier.verify(message):
raise HTTPException(status_code=400, detail="Invalid signature")
return "ok"
この実装でも、Depends
を使用して依存性注入を行っています。証明書の検証は、証明書ダウンロードが必要=副作用がありますが、テスト時に処理をモック化してスキップすることができます。
SNSトピックの購読
Amazon SNSトピックの購読は、"SubscriptionConfirmation"タイプのメッセージに含まれる"SubscribeURL"にGETリクエストを送ることで完了します。以下は購読処理を行うクラスの実装例です:
import httpx
from pydantic import HttpUrl
class URLSubscriber:
"""SNSのsubscribe_urlを検証するクラス"""
async def subscribe(self, url: HttpUrl) -> bool:
"""
subscribe_urlにアクセスして購読確認を行うメソッド
"""
async with httpx.AsyncClient() as client:
response = await client.get(str(url))
return response.is_success
def get_url_subscriber() -> URLSubscriber:
return URLSubscriber()
@app.post("/")
async def sns_receiver(
request: Request,
signature_verifier: Annotated[SignatureVerifier, Depends(get_signature_verifier)],
url_subscriber: Annotated[URLSubscriber, Depends(get_url_subscriber)],
settings: Annotated[Settings, Depends(get_settings)],
):
# 略
if message.type == "SubscriptionConfirmation":
if await url_subscriber.subscribe(message.subscribe_url):
return {"status": "success", "message": "Subscription confirmed"}
raise HTTPException(status_code=400, detail="Invalid SubscribeURL")
elif message.type == "Notification":
logger.info(f"Received SNS message: {message.message}")
return {"status": "success", "message": "Message received"}
elif message.type == "UnsubscribeConfirmation":
return {"status": "success", "message": "Unsubscribe confirmed"}
この実装でもDepends
を使用して依存性注入を行っています。また、型の絞り込み(narrowing)により、if message.type == "SubscriptionConfirmation"
の条件分岐後はmessage
の型がSNSSubscription
に絞り込まれています。 pyright
での型チェックも通りますし、VSCode で書いていても補完が聞きます。
テストコードの実装
最後に、テストコードの実装例を示します。署名検証処理、購読処理、環境変数からの設定値の読み込みについては、テスト時にモックを注入することで、副作用を発生させることなくテストを行うことができます。
長いので全体はGitHubへ。
以下は主要な処理の抜粋です。
@pytest.fixture
def valid_subscription():
return {
"Type": "SubscriptionConfirmation",
"MessageId": "test-message-id",
"TopicArn": "arn:aws:sns:region:123456789012:test-topic",
"Message": "Test message",
"Timestamp": "2023-01-01T00:00:00.000Z",
"SignatureVersion": "1",
"Signature": "test-signature",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/cert.pem",
"SubscribeURL": "https://sns.us-east-1.amazonaws.com/subscribe",
"Token": "test-token",
}
@pytest.fixture
def mock_get_verifier(mocker):
"""署名検証のモックを提供するフィクスチャ"""
mock_verifier = mocker.Mock()
mock_verifier.verify = mocker.AsyncMock(return_value=True)
return lambda: mock_verifier
@pytest.fixture
def mock_get_subscriber(mocker):
"""subscribe_url検証のモックを提供するフィクスチャ"""
mock_subscriber = mocker.Mock()
mock_subscriber.subscribe = mocker.AsyncMock(return_value=True)
return lambda: mock_subscriber
@pytest.fixture
def mock_get_settings():
return lambda: Settings(sns_topic_arn="arn:aws:sns:region:123456789012:test-topic")
def test_subscription_confirmation_success(valid_subscription, mock_get_verifier, mock_get_subscriber, mock_get_settings):
"""正常なサブスクリプション確認を送信した場合のテスト"""
app.dependency_overrides[get_signature_verifier] = mock_get_verifier
app.dependency_overrides[get_url_subscriber] = mock_get_subscriber
app.dependency_overrides[get_settings] = mock_get_settings
response = client.post("/", json=valid_subscription)
assert response.status_code == 200
assert response.json() == {
"status": "success",
"message": "Subscription confirmed",
}
まとめ
本記事では、FastAPIを使用してAmazon SNSのHTTP/HTTPSエンドポイントを実装する方法を詳しく解説しました。主なポイントは以下の通りです:
- Pydanticを使用したSNSメッセージの型定義
- 環境変数を使用した設定の管理
- 署名検証の実装
- サブスクリプション確認の処理
- テスタビリティを考慮した設計
FastAPIとAmazon SNSを組み合わせることで、スケーラブルで堅牢なメッセージング基盤を簡単に構築できます。
Discussion