AppSync Eventsで作るチャットアプリ(Amplify SDKを使わずに)
以前、AppSync Eventsを使ったサンプルのアプリケーションをAmplifyでデプロイしたり、Postmanを使ってPubSubを試してみました。
当時のAppSync Eventsの用途としては何かしらの通知用途だけで、チャットアプリを実装しようとするとLambdaなどを使って工夫する必要がありました。
しかし、1ヶ月ほど前にデータ統合機能がリリースされました!
これがリリースされたことにより、Lambda以外にもDynamoDB、RDS、Bedrockなどのサービスと連携してリアルタイムなアプリケーションを構築できるようになります。
というわけで、今回はこのデータ統合機能を使ったアプリケーションを作成してみます。
今回作成するもの
今回は、リアルタイムアプリケーションのサンプルとして、AppSync Eventsを使ったチャットアプリを作成します。
DynamoDBと連携して送信したメッセージを保存してWebSocketで送信されたメッセージをリアルタイムに受信できるようにします。あとからチャットに入ったユーザーも過去のメッセージを取得できるようにするため、DynamoDBからチャットのメッセージを取得するためのAPIはLambdaで作成します。
なお、アプリケーションのフロントエンドはNext.jsを使用し、AppSync Eventsの通信ではあえてAmplifyのSDKを使用しません。

今回作成したアプリケーションは、以下のGithubリポジトリで公開しています。
事前準備
- AWSアカウント
- Node.js(v22以上)
構築手順
DynamoDBのテーブル作成
まずは、DynamoDBのテーブルを作成します。
AWSマネジメントコンソールからDynamoDBのサービスを開き、以下の内容でテーブルを作成します。
- テーブル名:
appsync_events_chat - プライマリキー:
id(文字列型) - ソートキー:
timestamp(文字列型)
その他の設定はデフォルトのままで問題ありません。
AppSync Eventsの作成
続いて、AppSync EventsのAPIを作成します。
AWSマネジメントコンソールからAppSyncのサービスを開き、 APIを作成 のプルダウンをクリックし、 Event API を選択します。

API名は chat-app とし、他の設定は特に変更せず 作成 をクリックします。

APIが作成されて、APIの詳細画面が表示されます。
ここで、 View data sources をクリックします。

任意のデータソース名を定義し、データソースタイプの中から Amazon DynamoDB を選択します。

データソースタイプを選択すると追加の設定が表示されます。
ここで、先程作成したDynamoDBのテーブルを作成したリージョンを選択し、テーブル名から appsync_events_chat を選択します。

最後にサービスロールの設定があります。ここは、 Create and use a new service-linked role が選択されていることを確認し、下の 作成 ボタンをクリックします。

これでデータソースの設定ができたので、次は実際にAPIでリクエストが来た時のハンドラーを設定します。
再びAPIのダッシュボードに戻り、 名前空間を編集 をクリックします。

名前空間を作成 をクリックして、名前空間名を default とし、名前空間の認証は特に設定しません。

ハンドラーの設定では、 データソースを含むコード を選択します。

パブリッシュ設定では、 データソース名に先ほど作成したデータソースを選択し、動作は CODE のままにします。

コードエディターにあるコードを以下の内容に書き換えます。
import * as ddb from '@aws-appsync/utils/dynamodb'
import { util } from '@aws-appsync/utils'
const TABLE = 'appsync_events_chat'
export const onPublish = {
request(ctx) {
return ddb.batchPut({
tables: {
[TABLE]: ctx.events.map(({id, payload}) => ({
id,
timestamp: util.time.nowISO8601(),
...payload
})),
},
})
},
response(ctx) {
return ctx.result.data[TABLE].map(({ id, ...payload }) => ({ id, payload }))
},
}
ここでの実装のポイントは、 request 関数と response 関数を用意することです。
request 関数では、受け取ったイベントをDynamoDBに保存するための処理を行います。
response 関数では、DynamoDBに保存したデータをクライアントに返すための処理を行います。
今回であれば、このハンドラーを入れたAPIをチャットアプリのバックエンドとして利用することで、AppSync EventsだけでDynamoDBにデータがたまり、メッセージを永続化できるようになります。
試しにメッセージをPublishしてメッセージがDynamoDBに保存されるか確認してみましょう。
PubSubエディタを開き、エディタにあるJsonを以下の内容に書き換えます。
[
{
"sender": "John",
"message": "Hello World"
}
]
パブリッシュのプルダウンから HTTP を選択するとHTTPリクエストでメッセージが送信されます。

左側のログで、以下のようなレスポンスが返ってくれば成功です。

DynamoDBのテーブルを確認すると、先程Publishしたメッセージが保存されていることが確認できます。

ここまで確認できたら、フロントエンドで動作確認するために、必要な値をコピーしておきます。
APIのエンドポイントを取得するために「設定」タブをクリックします。

設定画面の中にあるDNSエンドポイントに2種類のエンドポイントが表示されていますが、今回はフロントエンドで両方のエンドポイントを使うので、2つともコピーしておきます。

認可モードにあるAPIキーを使って認証をするためこれもコピーしておきます。

チャットのデータを取得するLambdaの作成
次に画面を開いた時のチャット画面表示用にDynamoDBからチャットのデータを取得するLambda関数を作成します。
まずは任意の作業ディレクトリ上で、 package.json を作成します。
npm init -y
次に、必要なパッケージをインストールします。
npm install aws-sdk
次に、 index.js というファイルを作成し、以下の内容を記述します。
index.js
const AWS = require('aws-sdk');
// DynamoDBクライアントの初期化
const dynamoDB = new AWS.DynamoDB.DocumentClient();
// テーブル名
const TABLE_NAME = 'appsync_events_chat';
/**
* appsync_events_chatテーブルから全メッセージを取得するLambda関数
*/
exports.handler = async (event) => {
console.log('Received event:', JSON.stringify(event, null, 2));
try {
// DynamoDBからすべてのデータを取得
const params = {
TableName: TABLE_NAME
};
// DynamoDBのscanオペレーションを使用して全データを取得
const result = await dynamoDB.scan(params).promise();
// 成功レスポンスを返す
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*', // CORS対応
'Access-Control-Allow-Credentials': true
},
body: JSON.stringify({
messages: result.Items,
count: result.Count,
scannedCount: result.ScannedCount
})
};
} catch (error) {
console.error('Error fetching messages:', error);
// エラーレスポンスを返す
return {
statusCode: 500,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*', // CORS対応
'Access-Control-Allow-Credentials': true
},
body: JSON.stringify({
message: 'Failed to fetch messages',
error: error.message
})
};
}
};
以下のコマンドでデプロイする関数を圧縮します。
zip -r function.zip .
後は、CLIやAWSマネジメントコンソールからLambda関数を作成し、先ほど作成した function.zip をアップロードします。
Lambda関数は以下の設定を行います。
- ランタイム: Node.js 22
- ハンドラー:
index.handler - IAMロール: DynamoDBへのアクセス権限(
dynamodb:Scan)を持つロールを作成
API Gatewayの作成
次に、Lambda関数を呼び出すためのAPI Gatewayを作成します。
AWSマネジメントコンソールからAPI Gatewayのサービスを開き、 APIを作成 をクリックします。
APIのタイプは REST API を選択し、API名は chat-data とします。

APIが作成されたら、 リソースの作成 をクリックし、リソース名を messages とします。この時、必ず CORS(クロスオリジンリソース共有) にチェックを入れておいて、フロントのリクエストが通るようにしておきます。

リソースが作成されたら、 メソッドの作成 をクリックし、 GET メソッドを選択します。
メソッドでは以下の設定を行います。
- メソッドタイプ:GET
- Lambdaプロキシ統合を有効にする
- Lambda関数:デプロイしたLambda関数のARN(関数名から検索して選択できます)

メソッドを作成したら、作成したメソッドが動作するか確認するために、 テスト タブを開き、 テスト ボタンをクリックします。
スクショの通り、DynamoDBに保存したデータが取得できれば成功です。

テストが成功したら、APIをデプロイします。
アクション メニューから APIのデプロイ を選択し、ステージ名を prod としてデプロイします。

デプロイが完了すると、APIのエンドポイントURLが表示されます。このURLは後でフロントエンドからAPIをリクエストする際に使用するので、どこかにメモしておきます。

フロントエンドの実装
次に、フロントエンドの実装を行います。
フロントエンドはNext.jsを使用して実装します。
まずは、Next.jsのプロジェクトを作成します。
npx create-next-app@latest appsync-chat-app
cd appsync-chat-app
次に、必要なパッケージをインストールします。
npm install axios uuid @types/uuid reconnecting-websocket
次に、APIのインターフェースのコードを用意します。
app/utils/rest.ts というファイルを作成し、以下の内容を記述します。
app/utils/rest.ts
import axios from "axios";
interface Event {
sender: string;
message: string;
}
interface ChatMessage {
id: string;
sender: string;
message: string;
timestamp: string;
}
// AppSync API client
const appsyncPublish = axios.create({
baseURL: `https://${process.env.NEXT_PUBLIC_APPSYNC_ENDPOINT_HTTP as string}/event`,
headers: {
"x-api-key": process.env.NEXT_PUBLIC_APPSYNC_API_KEY as string,
}
});
// Chat Data API client
const chatDataApi = axios.create({
baseURL: process.env.NEXT_PUBLIC_CHAT_DATA_API_URL as string,
headers: {
"Content-Type": "application/json",
}
});
// AppSync publish event function
export const publishEvent = async (event: Event) => {
try {
const response = await appsyncPublish.post("", {
"channel": "default/channel",
"events": [
JSON.stringify({
sender: event.sender,
message: event.message,
})
]
});
return response.data;
} catch (error) {
console.error("Error publishing event:", error);
// ネットワークエラーの場合はより詳細な情報を表示
if (axios.isAxiosError(error) && !error.response) {
console.error("ネットワークエラー: AppSyncエンドポイントに接続できません。");
}
throw error;
}
};
// Chat Data API functions
export const fetchMessages = async (): Promise<ChatMessage[]> => {
try {
const response = await chatDataApi.get("/messages");
const messages: ChatMessage[] = response.data.messages;
return messages;
} catch (error) {
console.error("Error fetching messages:", error);
if (axios.isAxiosError(error) && !error.response) {
console.error("Network error: Cannot connect to Chat Data API.");
}
throw error;
}
};
このコードでは、AppSync Eventsにメッセージを送信するための publishEvent 関数と、画面を開いた時にチャットのメッセージを取得するための fetchMessages 関数を定義しています。
続けて、チャットのメッセージをSubscribeするためのコードを用意します。
app/utils/websocket.ts というファイルを作成し、以下の内容を記述します。
app/utils/websocket.ts
import ReconnetWebSocket from "reconnecting-websocket";
import { v4 as uuidv4 } from "uuid";
export class SubscribeEvent {
private ws: ReconnetWebSocket;
constructor() {
const headerInfo = {
host: process.env.NEXT_PUBLIC_APPSYNC_ENDPOINT_HTTP as string,
"x-api-key": process.env.NEXT_PUBLIC_APPSYNC_API_KEY as string,
};
const encodedHeaderInfo = btoa(JSON.stringify(headerInfo))
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=+$/, "");
this.ws = new ReconnetWebSocket(
`wss://${
process.env.NEXT_PUBLIC_APPSYNC_ENDPOINT_REALTIME as string
}/event/realtime`,
["aws-appsync-event-ws", `header-${encodedHeaderInfo}`]
);
this.ws.onopen = () => {
console.log("WebSocket connection opened");
};
this.ws.send(JSON.stringify({
"type": "subscribe",
"id": uuidv4(),
"channel": "/default/channel",
"authorization": {
"host": process.env.NEXT_PUBLIC_APPSYNC_ENDPOINT_HTTP as string,
"x-api-key": process.env.NEXT_PUBLIC_APPSYNC_API_KEY as string,
}
}));
}
onMessage(callback: (data: any) => void) {
this.ws.addEventListener("message", (event) => {
callback(JSON.parse(event.data));
});
}
onErrors(callback: (error: any) => void) {
this.ws.addEventListener("error", (event) => {
callback(event);
});
}
close() {
this.ws.close();
}
removeEventListener() {
this.ws.removeEventListener("message", () => {});
this.ws.removeEventListener("error", () => {});
}
/* eslint @typescript-eslint/no-explicit-any: 0 */
}
このコードでは、WebSocketを使ってAppSync Eventsからメッセージを受信するための SubscribeEvent クラスを定義しています。Websocket自体は標準ライブラリでも実装できますが、Websocketがクローズしたときの再接続を自動的に行える reconnecting-websocket を使っています。
イベントをSubscribeするためには、以下の手順を行う必要があります。
- 認証情報をSubprotocolで設定するために認証情報をBase64エンコードしてヘッダーに設定。
const encodedHeaderInfo = btoa(JSON.stringify(headerInfo))
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=+$/, "");
- 任意のUUIDを生成して、以下のメッセージを送信する。
{
"type": "subscribe",
"id": "<任意のUUID>",
"channel": "/default/channel",
"authorization": {
"host": "<AppSync EventsでコピーしたHTTPエンドポイント>",
"x-api-key": "<AppSync EventsでコピーしたAPIキー>"
}
}
上記の送信処理を行わないといつまで経ってもメッセージがSubscribeが行われないのをすっかり忘れていて、結構ハマりましたw。
ちなみにこのことは、以下の記事でも触れています。
ここからUIを実装していきます。
まずは、チャットを開いたときにユーザー名を入力するためのコンポーネントを作成します。
app/components/input-user-name.tsx というファイルを作成し、以下の内容を記述します。
app/components/input-user-name.tsx
import React from "react";
const InputUserName: React.FC<{
username: string;
setUsername: (username: string) => void;
handleUserNameKeyPress: (event: React.KeyboardEvent<HTMLInputElement>) => void;
handleSetUserName: () => void;
}> = ({ username, setUsername, handleUserNameKeyPress, handleSetUserName }) => {
return (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center p-4">
<div className="bg-white rounded-lg shadow-xl max-w-md w-full p-6">
<div className="text-center mb-6">
<h2 className="text-2xl font-bold text-gray-800 mb-2">チャットへようこそ</h2>
<p className="text-gray-600">ユーザー名を入力してください</p>
</div>
<div className="space-y-4">
<input
type="text"
value={username}
onChange={(e) => setUsername(e.target.value)}
onKeyDown={handleUserNameKeyPress}
placeholder="ユーザー名を入力..."
className="w-full border border-gray-300 rounded-lg px-4 py-3 text-gray-900 placeholder-gray-500 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-transparent"
autoFocus
/>
<button
onClick={handleSetUserName}
disabled={!username.trim()}
className="w-full bg-blue-500 hover:bg-blue-600 disabled:bg-gray-300 disabled:cursor-not-allowed text-white font-medium py-3 px-4 rounded-lg transition-colors duration-200"
>
チャットを開始
</button>
</div>
</div>
</div>
);
}
export default InputUserName;
ここまで実装したら、アプリケーションのメイン画面を作成します。
app/page.tsx を以下の内容に書き換えます。
app/page.tsx
"use client";
import React, { useState, useEffect, useRef } from "react";
import { FaUser, FaPaperPlane } from "react-icons/fa";
import InputUserName from "@/app/components/input-user-name";
import { publishEvent, fetchMessages } from "@/app/utils/rest";
import { SubscribeEvent } from "@/app/utils/websocket";
interface Message {
id: string;
message: string;
sender: string;
timestamp: Date;
}
const ChatApp: React.FC = () => {
const [userName, setUserName] = useState<string>("");
const [showNameDialog, setShowNameDialog] = useState<boolean>(true);
const [tempUserName, setTempUserName] = useState<string>("");
const [newMessage, setNewMessage] = useState("");
const messagesEndRef = useRef<HTMLDivElement>(null);
const websocketRef = useRef<SubscribeEvent>(null);
const [messages, setMessages] = useState<Message[]>([]);
// 初期メッセージのフェッチ
useEffect(() => {
const fetchInitialMessages = async () => {
try {
const initialMessages = await fetchMessages();
initialMessages.sort((a, b) => {
return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime();
});
const formattedMessages = initialMessages.map((msg) => ({
id: msg.id,
message: msg.message,
sender: msg.sender,
timestamp: new Date(msg.timestamp),
}));
setMessages(formattedMessages);
} catch (error) {
console.error("メッセージの取得中にエラーが発生しました:", error);
}
};
fetchInitialMessages();
}, []);
// WebSocketの初期化
useEffect(() => {
const subscriber = new SubscribeEvent();
websocketRef.current = subscriber;
subscriber.onMessage((data: any) => {
console.log("受信したデータ:", data);
switch (data.type) {
case "connection_ack":
console.log("WebSocket接続が確立されました");
break;
case "data":
const eventData = JSON.parse(data.event);
const newMessage: Message = {
id: data.id,
message: eventData.message,
sender: eventData.sender,
timestamp: new Date(eventData.timestamp),
};
setMessages((prevMessages) => [...prevMessages, newMessage]);
break;
default:
break;
}
/* eslint @typescript-eslint/no-explicit-any: 0 */
});
// クリーンアップ関数:コンポーネントがアンマウントされたときに実行
return () => {
if (websocketRef.current) {
websocketRef.current.removeEventListener();
websocketRef.current.close();
}
};
}, []);
// メッセージが更新されたら自動的に最下部にスクロール
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
const formatTime = (date: Date): string => {
return date.toLocaleTimeString("ja-JP", {
hour: "2-digit",
minute: "2-digit",
});
};
const handleSendMessage = async () => {
if (newMessage.trim()) {
try {
// メッセージを送信
await publishEvent({
sender: userName,
message: newMessage,
});
console.log("メッセージが正常に送信されました");
} catch (error) {
console.error("メッセージの送信中にエラーが発生しました:", error);
}
setNewMessage("");
}
};
const handleKeyPress = async (e: React.KeyboardEvent) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
await handleSendMessage();
}
};
const handleSetUserName = () => {
if (tempUserName.trim()) {
setUserName(tempUserName.trim());
setShowNameDialog(false);
}
};
const handleUserNameKeyPress = (e: React.KeyboardEvent) => {
if (e.key === "Enter") {
e.preventDefault();
handleSetUserName();
}
};
if (showNameDialog) {
return (
<InputUserName
username={tempUserName}
setUsername={setTempUserName}
handleUserNameKeyPress={handleUserNameKeyPress}
handleSetUserName={handleSetUserName}
/>
);
}
return (
<div className="flex flex-col h-screen bg-white">
{/* Header */}
<div className="bg-blue-600 text-white p-4 shadow-md">
<h1 className="text-xl font-bold">チャット</h1>
</div>
{/* Messages Container */}
<div className="flex-1 overflow-y-auto p-4 space-y-4"> {messages.map((message) => (
<div
key={`${message.id}-${message.timestamp.getTime()}-${message.sender}`}
className={`flex items-start space-x-3 ${
message.sender === userName
? "flex-row-reverse space-x-reverse"
: ""
}`}
>
{/* User Icon */}
<div className="flex-shrink-0">
<div
className={`w-10 h-10 rounded-full flex items-center justify-center ${
message.sender === userName ? "bg-blue-500" : "bg-gray-400"
}`}
>
<FaUser className="text-white text-sm" />
</div>
</div>
{/* Message Content */}
<div
className={`flex flex-col max-w-xs lg:max-w-md ${
message.sender === userName ? "items-end" : "items-start"
}`}
>
{/* Sender Name */}
<div
className={`text-sm text-gray-600 mb-1 ${
message.sender === userName ? "text-right" : "text-left"
}`}
>
{message.sender}
</div>
{/* Message Bubble */}
<div
className={`relative px-4 py-2 rounded-2xl shadow-sm ${
message.sender === userName
? "bg-blue-500 text-white rounded-br-sm"
: "bg-gray-100 text-gray-800 rounded-bl-sm"
}`}
>
<p className="text-sm leading-relaxed">{message.message}</p>
{/* Speech Bubble Tail */}
<div
className={`absolute top-4 w-0 h-0 ${
message.sender === userName
? "right-0 translate-x-full border-l-8 border-l-blue-500 border-t-4 border-t-transparent border-b-4 border-b-transparent"
: "left-0 -translate-x-full border-r-8 border-r-gray-100 border-t-4 border-t-transparent border-b-4 border-b-transparent"
}`}
/>
</div>
{/* Timestamp */}
<div
className={`text-xs text-gray-500 mt-1 ${
message.sender === userName ? "text-right" : "text-left"
}`}
>
{formatTime(message.timestamp)}
</div>
</div>
</div>
))}
{/* 自動スクロール用の参照要素 */}
<div ref={messagesEndRef} />
</div>
{/* Input Area */}
<div className="border-t bg-gray-50 p-4">
<div className="flex items-center space-x-3">
<input
type="text"
value={newMessage}
onChange={(e) => setNewMessage(e.target.value)}
onKeyDown={handleKeyPress}
placeholder="メッセージを入力..."
className="flex-1 border border-gray-300 rounded-full px-4 py-2 text-gray-900 placeholder-gray-500 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-transparent"
/>
<button
onClick={handleSendMessage}
disabled={!newMessage.trim()}
className="bg-blue-500 hover:bg-blue-600 disabled:bg-gray-300 text-white rounded-full p-2 transition-colors duration-200"
>
<FaPaperPlane className="w-5 h-5" />
</button>
</div>
</div>
</div>
);
};
export default ChatApp;
API周りのロジック以外の大枠のUIの実装はAIで頼りましたが、かなりそれっぽいデザインになっているかと思います。
最後に、環境変数を設定します。
.env を作成し、以下の内容を記述します。
NEXT_PUBLIC_APPSYNC_ENDPOINT_HTTP=<AppSync EventsでコピーしたHTTPエンドポイント>
NEXT_PUBLIC_APPSYNC_ENDPOINT_REALTIME=<AppSync EventsでコピーしたWebSocketエンドポイント>
NEXT_PUBLIC_APPSYNC_API_KEY=<AppSync EventsでコピーしたAPIキー>
NEXT_PUBLIC_CHAT_DATA_API_URL=<API GatewayでコピーしたエンドポイントURL>
これで、アプリケーションの実装は完了です。
動作確認
それでは実際にアプリケーションを起動して動作確認を行います。
以下のコマンドでアプリケーションを起動します。
npm run dev
ブラウザで http://localhost:3000 を開くと、チャットアプリが表示されます。
最初にユーザー名を入力するダイアログが表示させるので、任意のユーザー名を入力してチャットを開始します。
以下のように複数人のユーザーが同時に入ってチャットのやり取りができるようになったら、アプリケーションは正常に動作しています。
あとはAPIキーを使ってAppSync EventsのAPIを呼び出しており、APIキーを環境変数に設定すればVecelなどのAWS以外のホスティングサービスでも動作するようになっています。(認証機能が入っておらず、誰でもメッセージ送信しまくれるのでそこはご注意を!)
まとめ
今回は、AppSync Eventsのデータ統合機能を使ってチャットアプリを作成してみました。
今までは、AppSync EventsはただのPubSubをAPIとして使うだけでしたが、データ統合機能が追加されたことで、DynamoDBやRDSなどのデータソースと連携してリアルタイムなアプリケーションを構築できるようになりました。
AWSでWebSocketを使おうとすると、API Gateway、 Lambdaを使ってDynamoDBで接続を管理するという組み合わせて実装する必要がありましたが、AppSync Eventsを使うことで、よりシンプルにリアルタイムなアプリケーションを構築できるようになって接続管理のためにDynamoDBを使う必要もなくなりました。
何よりLambdaを使わないことで、応答速度もかなり速くなっているのでリアルタイムアプリケーションを手軽に始めるための最高の選択肢になるんじゃないかと思います!
Discussion