AWS Amplify Gen 2 & IoT Core PubSubでストリーミングチャット
AWS SummitでAPI Gateway (WebSocket)の代わりに、IoT Core PubSub (MQTT Over WebSocket)を使用していて、興味を持ったのでやってみました!
(ClaudeとChatGPTを同時出力しているのはただの趣味です…w)
マニュアルは以下になりますが、設定はコード、マネジメントコンソール、CLIそれぞれで必要です
マネジメントコンソールは、ぱっと見て、どこで設定すればよいのか分かりませんでした…
そのため、AWS Amplify PubSubの設定を共有します!
AWS IoT エンドポイント
見つけるのが大変でした…
AWS IoT IAM Policy
これも見つけるのが大変でした…
AWS IoT IAM Policyに、Amazon Cognito Identity Idを割り当てる
マネジメントコンソールからはできないようです
CLIやSDKを使用する必要があります
手順自体は難しくないのですが、Amazon Cognito Identity Idが分かりませんでした
CognitoのユーザープールやIDプールのIDなのかと思ったら違いました
どうやらユーザのセッションのようなもので、サインインするたびに作成されています
サインインするたびにCLIで設定するのは面倒なので、サインインした際にCognito IDプールのIdentity IDを取得し、Lambdaを呼び出してSDKでポリシーを割り当てるようにしました
// Cognito Identity ID取得例
import { Amplify } from "aws-amplify";
import { fetchAuthSession } from "aws-amplify/auth";
Amplify.configure(outputs);
const session = await fetchAuthSession({ forceRefresh: true });
const identityId = session.identityId as string;
// Cognito Identity IDに、AWS IoT IAM Policyを割り当てる例
import { IoTClient, AttachPolicyCommand } from "@aws-sdk/client-iot";
import { NodeHttpHandler } from "@aws-sdk/node-http-handler";
import type { Schema } from "../../data/resource";
const config = {
region: "us-west-2",
maxAttempts: 30,
requestHandler: new NodeHttpHandler({
connectionTimeout: 900000,
socketTimeout: 900000,
}),
};
const iotClient = new IoTClient(config);
const iotPolicyName = "amplify-iot-policy";
export const handler: Schema["PubSub"]["functionHandler"] = async (event) => {
try {
const cognitoIdentityId = event.arguments.cognitoIdentityId as string | undefined;
console.log("Cognit Identity ID:", cognitoIdentityId);
const command = new AttachPolicyCommand({
policyName: iotPolicyName,
target: cognitoIdentityId,
});
await iotClient.send(command);
console.log("PubSub設定が成功しました");
return "PubSub設定が成功しました";
} catch (error) {
console.error("An error occurred:", error);
if (error instanceof Error) {
return `エラーが発生しました。詳細: ${error.message}`;
} else {
return "エラーが発生しました。詳細は不明です。";
}
}
};
Amazon Cognitoの認証されたユーザに割り当てるIAM RoleにIoTサービスへのアクセス許可をする
私はコードで行いました
// amplify/backend.ts
// Cognitoの認証されたユーザのIAM RoleにIoTへのアクセス許可を与える
import * as iam from "aws-cdk-lib/aws-iam";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource.js";
import { data } from "./data/resource.js";
const backend = defineBackend({
auth,
data,
});
backend.auth.resources.authenticatedUserIamRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName("AWSIoTDataAccess"));
backend.auth.resources.authenticatedUserIamRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName("AWSIoTConfigAccess"));
マネジメントコンソールから手動でIAM Roleにポリシーを割り当てることも可能です
Subscribe (Websocket) 受信する
これで生成AIのストリーミングメッセージを受信できます
※抜粋してます
import { useEffect, useState } from "react";
import { fetchUserAttributes } from "aws-amplify/auth";
import { PubSub } from "@aws-amplify/pubsub";
import { CONNECTION_STATE_CHANGE, ConnectionState } from "@aws-amplify/pubsub";
import { Hub } from "aws-amplify/utils";
const YourComponent = () => {
const [email, setEmail] = useState<string>("");
const [message, setMessage] = useState<string>("");
useEffect(() => {
const setupPubSub = async () => {
try {
if (!email) {
// Cognitoの認証したユーザからemailを取得
const attributes = await fetchUserAttributes();
setEmail(attributes.email!);
}
const pubsub = new PubSub();
// トピック名をemailでサブスクライブ
const sub = pubsub.subscribe({ topics: email }).subscribe({
// メッセージを受信したらmessageに格納する
next: (data: any) => {
setMessage((prevMessage) => prevMessage + data.message);
},
error: (error) => {
console.error("Error in PubSub subscription:", error);
},
complete: () => {
console.log("PubSub Session Completed");
},
});
// コネクションの状態に変化があった場合のイベントリスナー
const hubListener = Hub.listen("pubsub", (data: any) => {
const { payload } = data;
if (payload.event === CONNECTION_STATE_CHANGE) {
const newState = payload.data.connectionState as ConnectionState;
console.log("PubSub connection state changed:", newState);
}
});
return () => {
sub.unsubscribe();
hubListener();
};
} catch (error) {
console.error("Error setting up PubSub:", error);
}
};
setupPubSub();
}, [email]);
};
Publish 送信する
Bedrockで生成したストリーミングメッセージをemailのトピックに送信する例です
import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand } from "@aws-sdk/client-bedrock-runtime";
import { NodeHttpHandler } from "@aws-sdk/node-http-handler";
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import type { Schema } from "../../data/resource";
const config = {
region: "us-west-2",
maxAttempts: 30,
requestHandler: new NodeHttpHandler({
connectionTimeout: 900000,
socketTimeout: 900000,
}),
};
const bedrock_client = new BedrockRuntimeClient(config);
const iot_client = new IoTDataPlaneClient(config);
const model_id = "anthropic.claude-3-sonnet-20240229-v1:0";
interface Message {
role: string;
message: string;
}
export const handler: Schema["ChatClaude"]["functionHandler"] = async (event) => {
try {
const rawContent = event.arguments.content as string[] | undefined;
const topic = event.arguments.email as string | undefined;
console.log("Raw content:", rawContent);
let newContent;
if (rawContent && Array.isArray(rawContent) && rawContent.length > 0) {
const parsedContent = Array.isArray(rawContent[0]) ? rawContent[0] : JSON.parse(rawContent[0]);
newContent = parsedContent.map((item: Message) => ({
role: item.role,
content: [{ type: "text", text: item.message }],
}));
} else {
newContent = [{ role: "user", content: [{ type: "text", text: "こんにちは" }] }];
}
const payload = {
anthropic_version: "bedrock-2023-05-31",
max_tokens: 4000,
messages: newContent,
};
const command = new InvokeModelWithResponseStreamCommand({
modelId: model_id,
contentType: "application/json",
accept: "application/json",
body: JSON.stringify(payload),
});
const response = await bedrock_client.send(command);
if (response.body) {
for await (const chunk of response.body) {
const decodedChunk = new TextDecoder().decode(chunk.chunk?.bytes);
try {
const parsedChunk = JSON.parse(decodedChunk);
if (parsedChunk.type === "content_block_delta" && parsedChunk.delta.text) {
const chunkText = parsedChunk.delta.text;
// チャンクを受信するたびに IoT Core に publish
const publishParams = {
topic: topic,
payload: JSON.stringify({ role: "claude", message: chunkText }), // claude role
};
await iot_client.send(new PublishCommand(publishParams));
console.log("Published chunk successfully");
}
} catch (parseError) {
console.error("Error parsing chunk:", parseError);
}
}
} else {
console.log("No response body received from Bedrock");
}
return "Streaming completed";
} catch (error) {
console.error("An error occurred:", error);
throw new Error(`エラーが発生しました。詳細: ${error instanceof Error ? error.message : "不明なエラー"}`);
}
};
サンプルコード
作成中ですが完全なコードは以下になります
参考にどうぞ!
2024/09/28 追記
IoT CoreのメッセージのPublishは、メッセージの順序が保証されていないようです
そのため、生成AIのストリーミングメッセージを送信する際は、sequence numberをつけて、フロント側でメッセージの順序を並び替えるようにしてください
Discussion