📑

AWS Amplify Gen 2 & IoT Core PubSubでストリーミングチャット

2024/08/12に公開

AWS SummitでAPI Gateway (WebSocket)の代わりに、IoT Core PubSub (MQTT Over WebSocket)を使用していて、興味を持ったのでやってみました!

(ClaudeとChatGPTを同時出力しているのはただの趣味です…w)

マニュアルは以下になりますが、設定はコード、マネジメントコンソール、CLIそれぞれで必要です
マネジメントコンソールは、ぱっと見て、どこで設定すればよいのか分かりませんでした…
https://docs.amplify.aws/nextjs/build-a-backend/add-aws-services/pubsub/set-up-pubsub/

そのため、AWS Amplify PubSubの設定を共有します!

AWS IoT エンドポイント

見つけるのが大変でした…

AWS IoT IAM Policy

これも見つけるのが大変でした…

AWS IoT IAM Policyに、Amazon Cognito Identity Idを割り当てる

マネジメントコンソールからはできないようです
CLIやSDKを使用する必要があります
手順自体は難しくないのですが、Amazon Cognito Identity Idが分かりませんでした
CognitoのユーザープールやIDプールのIDなのかと思ったら違いました

どうやらユーザのセッションのようなもので、サインインするたびに作成されています
サインインするたびにCLIで設定するのは面倒なので、サインインした際にCognito IDプールのIdentity IDを取得し、Lambdaを呼び出してSDKでポリシーを割り当てるようにしました

// Cognito Identity ID取得例
import { Amplify } from "aws-amplify";
import { fetchAuthSession } from "aws-amplify/auth";
Amplify.configure(outputs);
const session = await fetchAuthSession({ forceRefresh: true });
const identityId = session.identityId as string;
// Cognito Identity IDに、AWS IoT IAM Policyを割り当てる例
import { IoTClient, AttachPolicyCommand } from "@aws-sdk/client-iot";
import { NodeHttpHandler } from "@aws-sdk/node-http-handler";
import type { Schema } from "../../data/resource";

const config = {
  region: "us-west-2",
  maxAttempts: 30,
  requestHandler: new NodeHttpHandler({
    connectionTimeout: 900000,
    socketTimeout: 900000,
  }),
};
const iotClient = new IoTClient(config);
const iotPolicyName = "amplify-iot-policy";

export const handler: Schema["PubSub"]["functionHandler"] = async (event) => {
  try {
    const cognitoIdentityId = event.arguments.cognitoIdentityId as string | undefined;
    console.log("Cognit Identity ID:", cognitoIdentityId);
    const command = new AttachPolicyCommand({
      policyName: iotPolicyName,
      target: cognitoIdentityId,
    });
    await iotClient.send(command);
    console.log("PubSub設定が成功しました");
    return "PubSub設定が成功しました";
  } catch (error) {
    console.error("An error occurred:", error);
    if (error instanceof Error) {
      return `エラーが発生しました。詳細: ${error.message}`;
    } else {
      return "エラーが発生しました。詳細は不明です。";
    }
  }
};

Amazon Cognitoの認証されたユーザに割り当てるIAM RoleにIoTサービスへのアクセス許可をする

私はコードで行いました

// amplify/backend.ts
// Cognitoの認証されたユーザのIAM RoleにIoTへのアクセス許可を与える
import * as iam from "aws-cdk-lib/aws-iam";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource.js";
import { data } from "./data/resource.js";

const backend = defineBackend({
  auth,
  data,
});

backend.auth.resources.authenticatedUserIamRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName("AWSIoTDataAccess"));
backend.auth.resources.authenticatedUserIamRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName("AWSIoTConfigAccess"));

マネジメントコンソールから手動でIAM Roleにポリシーを割り当てることも可能です

Subscribe (Websocket) 受信する

これで生成AIのストリーミングメッセージを受信できます
※抜粋してます

import { useEffect, useState } from "react";
import { fetchUserAttributes } from "aws-amplify/auth";
import { PubSub } from "@aws-amplify/pubsub";
import { CONNECTION_STATE_CHANGE, ConnectionState } from "@aws-amplify/pubsub";
import { Hub } from "aws-amplify/utils";

const YourComponent = () => {
  const [email, setEmail] = useState<string>("");
  const [message, setMessage] = useState<string>("");

  useEffect(() => {
    const setupPubSub = async () => {
      try {
        if (!email) {
          // Cognitoの認証したユーザからemailを取得
          const attributes = await fetchUserAttributes();
          setEmail(attributes.email!);
        }

        const pubsub = new PubSub();
        // トピック名をemailでサブスクライブ
        const sub = pubsub.subscribe({ topics: email }).subscribe({
          // メッセージを受信したらmessageに格納する
          next: (data: any) => {
            setMessage((prevMessage) => prevMessage + data.message);
          },
          error: (error) => {
            console.error("Error in PubSub subscription:", error);
          },
          complete: () => {
            console.log("PubSub Session Completed");
          },
        });

        // コネクションの状態に変化があった場合のイベントリスナー
        const hubListener = Hub.listen("pubsub", (data: any) => {
          const { payload } = data;
          if (payload.event === CONNECTION_STATE_CHANGE) {
            const newState = payload.data.connectionState as ConnectionState;
            console.log("PubSub connection state changed:", newState);
          }
        });

        return () => {
          sub.unsubscribe();
          hubListener();
        };
      } catch (error) {
        console.error("Error setting up PubSub:", error);
      }
    };
    setupPubSub();
  }, [email]);
};

Publish 送信する

Bedrockで生成したストリーミングメッセージをemailのトピックに送信する例です

import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand } from "@aws-sdk/client-bedrock-runtime";
import { NodeHttpHandler } from "@aws-sdk/node-http-handler";
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import type { Schema } from "../../data/resource";

const config = {
  region: "us-west-2",
  maxAttempts: 30,
  requestHandler: new NodeHttpHandler({
    connectionTimeout: 900000,
    socketTimeout: 900000,
  }),
};
const bedrock_client = new BedrockRuntimeClient(config);
const iot_client = new IoTDataPlaneClient(config);
const model_id = "anthropic.claude-3-sonnet-20240229-v1:0";

interface Message {
  role: string;
  message: string;
}

export const handler: Schema["ChatClaude"]["functionHandler"] = async (event) => {
  try {
    const rawContent = event.arguments.content as string[] | undefined;
    const topic = event.arguments.email as string | undefined;
    console.log("Raw content:", rawContent);
    let newContent;
    if (rawContent && Array.isArray(rawContent) && rawContent.length > 0) {
      const parsedContent = Array.isArray(rawContent[0]) ? rawContent[0] : JSON.parse(rawContent[0]);
      newContent = parsedContent.map((item: Message) => ({
        role: item.role,
        content: [{ type: "text", text: item.message }],
      }));
    } else {
      newContent = [{ role: "user", content: [{ type: "text", text: "こんにちは" }] }];
    }
    const payload = {
      anthropic_version: "bedrock-2023-05-31",
      max_tokens: 4000,
      messages: newContent,
    };
    const command = new InvokeModelWithResponseStreamCommand({
      modelId: model_id,
      contentType: "application/json",
      accept: "application/json",
      body: JSON.stringify(payload),
    });
    const response = await bedrock_client.send(command);

    if (response.body) {
      for await (const chunk of response.body) {
        const decodedChunk = new TextDecoder().decode(chunk.chunk?.bytes);
        try {
          const parsedChunk = JSON.parse(decodedChunk);
          if (parsedChunk.type === "content_block_delta" && parsedChunk.delta.text) {
            const chunkText = parsedChunk.delta.text;

            // チャンクを受信するたびに IoT Core に publish
            const publishParams = {
              topic: topic,
              payload: JSON.stringify({ role: "claude", message: chunkText }), // claude role
            };
            await iot_client.send(new PublishCommand(publishParams));
            console.log("Published chunk successfully");
          }
        } catch (parseError) {
          console.error("Error parsing chunk:", parseError);
        }
      }
    } else {
      console.log("No response body received from Bedrock");
    }

    return "Streaming completed";
  } catch (error) {
    console.error("An error occurred:", error);
    throw new Error(`エラーが発生しました。詳細: ${error instanceof Error ? error.message : "不明なエラー"}`);
  }
};

サンプルコード

作成中ですが完全なコードは以下になります
参考にどうぞ!
https://github.com/Flupinochan/amplify-next-template/
https://www.chatbot.metalmental.net/

2024/09/28 追記

IoT CoreのメッセージのPublishは、メッセージの順序が保証されていないようです

そのため、生成AIのストリーミングメッセージを送信する際は、sequence numberをつけて、フロント側でメッセージの順序を並び替えるようにしてください

https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/mqtt.html#mqtt-retain

GitHubで編集を提案

Discussion