AppSync Eventsの実用スニペット
AWS AppSync Eventsは、リアルタイムメッセージング機能を提供するサーバーレスWebSocket APIサービスです。接続管理やスケーリングといったインフラ運用の負担なしに、何百万ものクライアントへ高パフォーマンスかつセキュアなイベント配信を実現できます。AppSync Eventsの概要やメリットは、AppSync Eventsの何がいいのか の記事で紹介されていますので、この記事ではAppSync Eventsの実用的なスニペットを紹介します。
Subscribeできるチャンネルに制限をかける
デフォルトの設定では、受信側は対象のNamespace内の全てのチャンネルをSubscribeすることができます。個別のチャットのようなシステムでは、特定のユーザーにだけSubscribeして欲しいことがあります。たとえば、/default/${Cognito sub} を対象のユーザー専用のチャンネルにしたい場合は以下のように実装します。
CDK の実装
onSubscribe ハンドラーを使用して、ユーザーが自分専用のチャンネルのみにアクセスできるように制限します。
new appsync.CfnChannelNamespace(this, 'Namespace', {
apiId: eventApi.attrApiId,
name: 'default',
codeHandlers: `
export function onSubscribe(ctx) {
const requested = ctx.info.channel.path
const allowed = \`/default/\${ctx.identity.sub}\`
if (requested !== allowed) {
util.unauthorized()
}
}
`
});
フロントエンドの実装
ユーザー自身のIDを取得して、専用チャンネルに接続します。
// 現在のユーザーIDを取得
const session = await fetchAuthSession();
const sub = session.tokens?.idToken?.payload.sub as string;
setUserId(sub);
const userChannel = `default/${sub}`;
// ユーザー専用チャンネルに接続
const channel = await events.connect(userChannel);
動作確認
AWS ConsoleのAppSyncのページからPub/Subエディタを開き、サブスクライブを押すとエラーが出ることを確認できます。

Python (IAM認証+WebSocket) で Publishする
PythonからIAM認証でAppSync Eventsに接続する際は、AWS Signature Version4(SigV4)による署名が必要です。boto3で取得した認証情報を使い、リクエストのメソッド・URL・ヘッダー・ボディから暗号署名を計算します。この署名をAuthorizationヘッダーに含めることで認証できるのですが、WebSocket接続の場合は署名済みヘッダーをJSON化してBase64エンコードし、WebSocketのsubprotocolとして送信する必要があります。以下に実装例を示します。PayloadをJSONにするときにseparatorsを指定するのもポイントです。
import json
import uuid
import base64
from urllib.parse import urlparse
import boto3
import websocket
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
APPSYNC_HTTP_ENDPOINT = "https://****.appsync-api.us-west-2.amazonaws.com/event"
APPSYNC_WS_ENDPOINT = "wss://****.appsync-realtime-api.us-west-2.amazonaws.com/event/realtime"
CHANNEL = "default/channel"
REGION = "us-west-2"
def get_iam_headers(http_endpoint: str, body: str = "{}") -> dict:
session = boto3.Session()
credentials = session.get_credentials()
url = urlparse(http_endpoint)
headers = {
"accept": "application/json, text/javascript",
"content-encoding": "amz-1.0",
"content-type": "application/json; charset=UTF-8",
"host": url.netloc
}
request = AWSRequest(
method="POST",
url=http_endpoint,
data=body,
headers=headers
)
auth = SigV4Auth(credentials, 'appsync', REGION)
auth.add_auth(request)
return dict(request.headers)
def base64url_encode(data: str) -> str:
encoded = base64.b64encode(data.encode('utf-8')).decode('utf-8')
return encoded.replace('+', '-').replace('/', '_').replace('=', '')
def get_auth_subprotocol() -> str:
headers = get_iam_headers(APPSYNC_HTTP_ENDPOINT, "{}")
print("Connection headers:", headers)
auth_header = json.dumps(headers)
encoded_header = base64url_encode(auth_header)
return f"header-{encoded_header}"
def on_open(ws):
print("WebSocket connection opened")
ws.send(json.dumps({"type": "connection_init"}))
def on_message(ws, message):
print(f"Received message: {message}")
msg = json.loads(message)
if msg.get("type") == "connection_ack":
print("Connection established! Publishing events...")
publish_event(ws)
def on_error(ws, error):
print(f"Error occurred: {error}")
def on_close(ws, close_status_code, close_msg):
print(f"Connection closed: {close_status_code} - {close_msg}")
def publish_event(ws):
event_data = {"message": "Hello from Python!"}
event_json = json.dumps(event_data)
events = [event_json]
request_body = {
"channel": CHANNEL,
"events": events
}
# separators=(',', ':')が期待する署名と完全に一致させるために必要
stringified_payload = json.dumps(request_body, separators=(',', ':'))
print(f"Publishing to channel: {CHANNEL}")
print(f"Request body: {stringified_payload}")
auth_headers = get_iam_headers(APPSYNC_HTTP_ENDPOINT, stringified_payload)
request_id = str(uuid.uuid4())
publish_msg = {
"type": "publish",
"id": request_id,
"channel": CHANNEL,
"events": events,
"authorization": auth_headers
}
ws.send(json.dumps(publish_msg))
print(f"Event published with request ID: {request_id}")
def main():
print(f"AppSync WebSocket endpoint: {APPSYNC_WS_ENDPOINT}")
print(f"Channel: {CHANNEL}")
print(f"Region: {REGION}")
subprotocols = ["aws-appsync-event-ws", get_auth_subprotocol()]
ws = websocket.WebSocketApp(
APPSYNC_WS_ENDPOINT,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
subprotocols=subprotocols
)
ws.run_forever()
if __name__ == "__main__":
main()
まとめ
この記事では、AppSync Eventsの実用的なスニペットを2つ紹介しました。AppSync Eventsを使うことで、WebSocket接続の管理やスケーリングといったインフラ運用から解放され、ビジネスロジックの実装に集中できます。リアルタイム機能が必要なアプリケーション開発の際は、ぜひ検討してみてください。
Discussion