🦁

AppSync Eventsの実用スニペット

に公開

AWS AppSync Eventsは、リアルタイムメッセージング機能を提供するサーバーレスWebSocket APIサービスです。接続管理やスケーリングといったインフラ運用の負担なしに、何百万ものクライアントへ高パフォーマンスかつセキュアなイベント配信を実現できます。AppSync Eventsの概要やメリットは、AppSync Eventsの何がいいのか の記事で紹介されていますので、この記事ではAppSync Eventsの実用的なスニペットを紹介します。

Subscribeできるチャンネルに制限をかける

デフォルトの設定では、受信側は対象のNamespace内の全てのチャンネルをSubscribeすることができます。個別のチャットのようなシステムでは、特定のユーザーにだけSubscribeして欲しいことがあります。たとえば、/default/${Cognito sub} を対象のユーザー専用のチャンネルにしたい場合は以下のように実装します。

CDK の実装

onSubscribe ハンドラーを使用して、ユーザーが自分専用のチャンネルのみにアクセスできるように制限します。

new appsync.CfnChannelNamespace(this, 'Namespace', {
  apiId: eventApi.attrApiId,
  name: 'default',
  codeHandlers: `
    export function onSubscribe(ctx) {
      const requested = ctx.info.channel.path
      const allowed = \`/default/\${ctx.identity.sub}\`

      if (requested !== allowed) {
        util.unauthorized()
      }
    }
  `
});

フロントエンドの実装

ユーザー自身のIDを取得して、専用チャンネルに接続します。

// 現在のユーザーIDを取得
const session = await fetchAuthSession();
const sub = session.tokens?.idToken?.payload.sub as string;
setUserId(sub);
const userChannel = `default/${sub}`;

// ユーザー専用チャンネルに接続
const channel = await events.connect(userChannel);

動作確認

AWS ConsoleのAppSyncのページからPub/Subエディタを開き、サブスクライブを押すとエラーが出ることを確認できます。

Python (IAM認証+WebSocket) で Publishする

PythonからIAM認証でAppSync Eventsに接続する際は、AWS Signature Version4(SigV4)による署名が必要です。boto3で取得した認証情報を使い、リクエストのメソッド・URL・ヘッダー・ボディから暗号署名を計算します。この署名をAuthorizationヘッダーに含めることで認証できるのですが、WebSocket接続の場合は署名済みヘッダーをJSON化してBase64エンコードし、WebSocketのsubprotocolとして送信する必要があります。以下に実装例を示します。PayloadをJSONにするときにseparatorsを指定するのもポイントです。

import json
import uuid
import base64
from urllib.parse import urlparse

import boto3
import websocket
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

APPSYNC_HTTP_ENDPOINT = "https://****.appsync-api.us-west-2.amazonaws.com/event"
APPSYNC_WS_ENDPOINT = "wss://****.appsync-realtime-api.us-west-2.amazonaws.com/event/realtime"
CHANNEL = "default/channel"
REGION = "us-west-2"

def get_iam_headers(http_endpoint: str, body: str = "{}") -> dict:
    session = boto3.Session()
    credentials = session.get_credentials()
    url = urlparse(http_endpoint)

    headers = {
        "accept": "application/json, text/javascript",
        "content-encoding": "amz-1.0",
        "content-type": "application/json; charset=UTF-8",
        "host": url.netloc
    }

    request = AWSRequest(
        method="POST",
        url=http_endpoint,
        data=body,
        headers=headers
    )

    auth = SigV4Auth(credentials, 'appsync', REGION)
    auth.add_auth(request)

    return dict(request.headers)

def base64url_encode(data: str) -> str:
    encoded = base64.b64encode(data.encode('utf-8')).decode('utf-8')
    return encoded.replace('+', '-').replace('/', '_').replace('=', '')


def get_auth_subprotocol() -> str:
    headers = get_iam_headers(APPSYNC_HTTP_ENDPOINT, "{}")
    print("Connection headers:", headers)

    auth_header = json.dumps(headers)
    encoded_header = base64url_encode(auth_header)

    return f"header-{encoded_header}"

def on_open(ws):
    print("WebSocket connection opened")
    ws.send(json.dumps({"type": "connection_init"}))


def on_message(ws, message):
    print(f"Received message: {message}")
    msg = json.loads(message)

    if msg.get("type") == "connection_ack":
        print("Connection established! Publishing events...")
        publish_event(ws)


def on_error(ws, error):
    print(f"Error occurred: {error}")


def on_close(ws, close_status_code, close_msg):
    print(f"Connection closed: {close_status_code} - {close_msg}")

def publish_event(ws):
    event_data = {"message": "Hello from Python!"}
    event_json = json.dumps(event_data)
    events = [event_json]

    request_body = {
        "channel": CHANNEL,
        "events": events
    }
    # separators=(',', ':')が期待する署名と完全に一致させるために必要
    stringified_payload = json.dumps(request_body, separators=(',', ':'))

    print(f"Publishing to channel: {CHANNEL}")
    print(f"Request body: {stringified_payload}")

    auth_headers = get_iam_headers(APPSYNC_HTTP_ENDPOINT, stringified_payload)
    request_id = str(uuid.uuid4())

    publish_msg = {
        "type": "publish",
        "id": request_id,
        "channel": CHANNEL,
        "events": events,
        "authorization": auth_headers
    }

    ws.send(json.dumps(publish_msg))
    print(f"Event published with request ID: {request_id}")

def main():
    print(f"AppSync WebSocket endpoint: {APPSYNC_WS_ENDPOINT}")
    print(f"Channel: {CHANNEL}")
    print(f"Region: {REGION}")

    subprotocols = ["aws-appsync-event-ws", get_auth_subprotocol()]

    ws = websocket.WebSocketApp(
        APPSYNC_WS_ENDPOINT,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        subprotocols=subprotocols
    )

    ws.run_forever()


if __name__ == "__main__":
    main()

まとめ

この記事では、AppSync Eventsの実用的なスニペットを2つ紹介しました。AppSync Eventsを使うことで、WebSocket接続の管理やスケーリングといったインフラ運用から解放され、ビジネスロジックの実装に集中できます。リアルタイム機能が必要なアプリケーション開発の際は、ぜひ検討してみてください。

アマゾン ウェブ サービス ジャパン (有志)

Discussion