🤖

Gemini Proと会話できるSlackBotをAWSサーバレス上に作る

2024/01/01に公開

はじめに

2024年あけましておめでとうございます。

AWSサーバレス上にGemini ProバックエンドのSlack Botを作る方法を紹介します。

APIGateway+Lambda+DynamoDB上にWeb APIを構築し、SlackのWebhookを受けるようにします。

先にデモ

下記のようにSlack上でボットに話しかけるとスレッドに返答をしてくれます。

Slack Botでやってみた

また、見ていただくとわかるように、Streamで出てくれるので返事投稿までのタイムラグが少ないです。

インフラ構成

構成図

ごくごくシンプルなAWSサーバレス構成です。

テーブル設計

会話履歴をDynamoDBに保存します。以下がテーブル設計です。

項目 説明 PK,SK,Index Required 値の例 備考
id メッセージID+ロール PK String Yes "xxx-xxxxx-xxx-xxxx#user"
content メッセージ内容 GSI1 String Yes "こんにちは"
threadTs スレッドタイムスタンプ GSI1 String Yes "2022-01-01T00:00:00Z" Slackでは本項目がスレッド識別子になる
saidAt 発言日時 String Yes "2022-03-01T12:34:56Z" ISO8601形式
role ロール String Yes "user", "assistant"

環境

  • node 18.14.2
  • typescript 5.3.3
  • esbuild 0.19.9
  • esbuild-register 3.4.2
  • dotenv 16.3.1
  • dayjs 1.11.10
  • slack/bolt 3.16.0
  • lodash-es 4.17.21
  • @aws-sdk/client-dynamodb 3.470.0
  • @aws-sdk/lib-dynamodb 3.470.0
  • @google/generative-ai 0.1.2

事前準備

Slack App

Slack App設定

https://api.slack.com/apps/new より「Create new app」を押下し新規アプリを作成します。

左メニューから各項目を選択しつつ以下のような設定をします。

Basic Information

Add features and functionality で以下を選択します。

  • Bots
  • Event Subscriptions
  • Permissions

OAuth & Permissions

Scopes に以下を加えます。

  • app_mention:read
  • chat:write
  • Event Subscriptions

Subscribe to bot events に以下を加えます。

  • app_mention

シークレットを控える

  • Basic Information より Signing Secret
  • OAuth & Permissions より Bot User OAuth Token

の値を控えておきます。

Google Cloud

GoogleCloudプロジェクトの作成については割愛します。

GoogleCloudプロジェクトを開き、以下に沿って設定します。

Vertex AI APIの有効化

  1. 「API ライブラリ」を開く
  2. 「vertex」で検索
  3. 「Vertex AI API」を選択
  4. 「有効にする」を押下

APIキーの作成(もしくは権限の変更)

  1. 「APIとサービス」を開く
  2. 「認証情報」を開く
  3. APIキーを作成する(もしくは既存のAPIキーがあれば選択する)
  4. 「APIの制限」の「Generative Language API」にチェックを入れて保存

キー類の準備

環境変数にセットしておきましょう。

以下はdotenvファイル形式での例です。

.env
# Slackの署名検証用のシークレット
SLACK_SIGNING_SECRET=xxxxxxxxxxxxxx

# Slackのボットトークン
SLACK_BOT_TOKEN=xxxxxxxxxxxxxx

# Google CloudのAPIキー
GOOGLE_API_KEY=xxxxxxxxxxxxxx

Geminiと会話する

Geminiを呼ぶコード

import { GoogleGenerativeAI } from "@google/generative-ai";

const main = async () => {
  const googleGenerativeAi = new GoogleGenerativeAI(
    process.env["GOOGLE_API_KEY"] ?? ""
  );

  const model = googleGenerativeAi.getGenerativeModel({
    model: "gemini-pro",
  });

  const chat = model.startChat({
    history: [
      {
        parts: "こんにちは。あなたは誰?",
        role: "user",
      },
      {
        parts: "私はAIです",
        role: "model",
      },
    ],
  });
  const { response } = await chat.sendMessage("こんにちは。あなたは誰?");

  const aiMessage = response.text();
  console.log(aiMessage);
};

main();

getGenerativeModel メソッドで使用したいモデル(今回は gemini-pro )を指定します。

今までの履歴込みで会話する(マルチターンというらしいです)には startChat メソッドを使います。会話の履歴を配列として startChart に渡しています。サンプルとして 「こんにちは。あなたは誰?」「私はAIです」 という会話履歴を持たせています。「私はAIです」はあくまで筆者がセットしたサンプルであり、実際にGeminiから全く同一の返信が来たわけではありません(念のため)。

少し注意点として、 usermodel の会話は必ず交互でないといけないようです。ChatGPTのAPIだとユーザ→ユーザ→AIのような会話履歴でもよしなに取り扱ってくれるのですが、GeminiのAPIでは、ユーザの発言が2連続するとエラーを返ってきました。

そして sendMessage メソッドでGeminiにメッセージを送信することで、Geminiからの応答を受け取れます。

SlackBotを実装

上記処理をBoltフレームワークでラップし、Slack Botとして体裁を整えたのが以下です。あとDynamoDBに会話履歴を保存、取り出す処理なども加わっています。

import {
  App,
  AwsLambdaReceiver,
} from "@slack/bolt";
import dotenv from "dotenv";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
  DeleteCommand,
  DynamoDBDocumentClient,
  PutCommand,
  QueryCommand,
} from "@aws-sdk/lib-dynamodb";
import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
import advancedFormat from "dayjs/plugin/advancedFormat";
import { orderBy } from "lodash-es";
import type { MessageDdbItem } from "./schema";
import type { APIGatewayProxyHandler } from "aws-lambda";
import {
  EnhancedGenerateContentResponse,
  GoogleGenerativeAI,
} from "@google/generative-ai";

dayjs.extend(utc);
dayjs.extend(advancedFormat);

const googleGenerativeAi = new GoogleGenerativeAI(
  process.env["GOOGLE_API_KEY"] ?? ""
);

const model = googleGenerativeAi.getGenerativeModel({
  model: "gemini-pro",
});

const nanoSecondFormat = "YYYY-MM-DDTHH:mm:ss.SSSSSSSSS[Z]";

const messagesTableName = process.env["MESSAGES_TABLE_NAME"] ?? "";
const threadTsIndexName = "threadTsIndex";

const ddbDocClient = DynamoDBDocumentClient.from(
  new DynamoDBClient({
    region: "ap-northeast-1",
  })
);

// @see https://slack.dev/bolt-js/deployments/aws-lambda
const awsLambdaReceiver = new AwsLambdaReceiver({
  signingSecret: process.env["SLACK_SIGNING_SECRET"] ?? "",
});
const app = new App({
  token: process.env["SLACK_BOT_TOKEN"] ?? "",
  receiver: awsLambdaReceiver,
});

// @see https://zenn.dev/yukiueda/articles/ef0f085f2bef8e
app.event(
  "app_mention",
  async ({ event, say, context, logger, body, ...rest }) => {

    try {
      // @see https://dev.classmethod.jp/articles/slack-resend-matome/
      if (context.retryNum != null && context.retryReason === "http_timeout") {
        // Slackからのタイムアウト再送リクエストのため無視
        return;
      }

      const threadTs = event.thread_ts ?? event.ts;
      const mentionRegex = /<@.*?>/g;

      const userMessageContent = event.text.replaceAll(mentionRegex, "").trim();

      // 会話中ユーザのこれまでの発言履歴を取得する
      const { Items: messages = [] } = await ddbDocClient.send(
        new QueryCommand({
          TableName: messagesTableName,
          IndexName: threadTsIndexName,
          KeyConditionExpression: "#threadTs = :threadTs",
          ExpressionAttributeNames: {
            "#threadTs": "threadTs",
          },
          ExpressionAttributeValues: {
            ":threadTs": threadTs,
          },
        })
      );

      // 時系列順にソートする
      const orderedMessages = orderBy(messages, "saidAt", "asc");
      // 直近15件を取得
      // orderedMessagesは .splice の破壊的操作により古い要素のみになる
      const resentMessages = orderedMessages.splice(-15);
      // 15件を超える発言は削除する
      await Promise.all(
        orderedMessages.map((message) =>
          ddbDocClient.send(
            new DeleteCommand({
              TableName: messagesTableName,
              Key: {
                id: message["id"],
              },
            })
          )
        )
      );

      const history = resentMessages.map(
        (message) => ({
          parts: message["content"],
          role: message["role"],
        }),
        ...[{ parts: userMessageContent, role: "user" }]
      );
      const chat = model.startChat({
        history,
      });

      // ユーザとChatGPTの会話履歴をChatGPT APIに投げ、返答を得る
      const aiMessage = await chat.sendMessageStream(userMessageContent);

      const aiMessageContent = await streamSay({
        stream: aiMessage.stream,
        event,
        say,
        client: app.client,
      });

      // ユーザの発言を保存する
      await ddbDocClient.send(
        new PutCommand({
          TableName: messagesTableName,
          Item: {
            // @ts-expect-error
            id: `${event.client_msg_id}#user`,
            content: userMessageContent,
            threadTs,
            saidAt: dayjs().format(nanoSecondFormat),
            role: "user",
          } satisfies MessageDdbItem,
        })
      );

      // ChatGPTの発言を保存する
      await ddbDocClient.send(
        new PutCommand({
          TableName: messagesTableName,
          Item: {
            // @ts-expect-error
            id: `${event.client_msg_id}#model`,
            content: aiMessageContent,
            threadTs,
            saidAt: dayjs().format(nanoSecondFormat),
            role: "model",
          } satisfies MessageDdbItem,
        })
      );
    } catch (error) {
      logger.error(error);
      await say({
        channel: event.channel,
        // @ts-expect-error
        text: `[システム]エラーが発生しました。\nclient_msg_id=${event.client_msg_id}\nerror=${error.message}`,
        thread_ts: event.thread_ts ?? event.ts,
      });
    }
  }
);

// @see https://slack.dev/bolt-js/deployments/aws-lambda
export const handler: APIGatewayProxyHandler = async (
  event,
  context,
  callback
) => {
  const awsLambdaReceiverHandler = await awsLambdaReceiver.start();
  return awsLambdaReceiverHandler(event, context, callback);
};

app.event を使用して、Slackの app_mention イベント(ユーザがボットにメンションで話しかけた時)に反応する処理を設定しています。ユーザのメッセージに対してGeminiが生成した返答を返しています。

ユーザとGeminiの会話履歴はDynamoDBに保存します。これにより、ボットが以前の会話を参照して文脈を保ちながら返事をしてくれます。

CDKを書く

AWSにリソースをデプロイするCDKコードを書きます。

import type { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";

export class SlackGeminiBotNodeStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // DynamoDBテーブル
    const messagesTable = new cdk.aws_dynamodb.Table(this, "messagesTable", {
      tableName: "slackGeminiBotNode-messages",
      partitionKey: {
        name: "id",
        type: cdk.aws_dynamodb.AttributeType.STRING,
      },
      billingMode: cdk.aws_dynamodb.BillingMode.PAY_PER_REQUEST,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });
    messagesTable.addGlobalSecondaryIndex({
      indexName: "threadTsIndex",
      partitionKey: {
        name: "threadTs",
        type: cdk.aws_dynamodb.AttributeType.STRING,
      },
    });

    // SlackとGoogle Cloudの各種シークレット・APIキーをSSMパラメータストアから取得
    const slackSigningSecret =
      cdk.aws_ssm.StringParameter.valueForStringParameter(
        this,
        "slackGeminiBotNode-slackSigningSecret"
      );
    const slackBotToken = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "slackGeminiBotNode-slackBotToken"
    );
    const googleApiKey = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "slackGeminiBotNode-googleApiKey"
    );

    // APIGW Lambda関数
    const apiFn = new cdk.aws_lambda_nodejs.NodejsFunction(this, "apiFn", {
      functionName: "slackGeminiBotNodeApiFn",
      runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
      entry: "../server/src/handler.ts",
      environment: {
        // 環境変数にシークレットとAPIキーをセット
        SLACK_SIGNING_SECRET: slackSigningSecret,
        SLACK_BOT_TOKEN: slackBotToken,
        GOOGLE_API_KEY: googleApiKey,
        MESSAGES_TABLE_NAME: messagesTable.tableName,
      },
      bundling: {
        sourceMap: true,
      },
      timeout: cdk.Duration.minutes(5),
    });
    messagesTable.grantReadWriteData(apiFn);

    // APIGW
    const api = new cdk.aws_apigateway.RestApi(this, "api", {
      restApiName: "slackGeminiBotNodeApi",
      deployOptions: {
        tracingEnabled: true,
        stageName: "api",
      },
    });
    api.root.addProxy({
      defaultIntegration: new cdk.aws_apigateway.LambdaIntegration(apiFn),
    });
  }
}

環境変数はAWS Systems Manager (SSM) パラメータストアを通してセットしています。Slackの署名シークレット、Slackボットトークン、Google APIキーをセットしています。

DynamoDBは、Slackボットのメッセージを保存するための message テーブルを定義しています。パーティションキーはid(文字列型)を設定します。固定費がかからないよう、PAY_PER_REQUEST でオンデマンドモードにしています。Slackのスレッドを1連の会話の単位としており、これをキーに会話を取得するため threadTs にGSIを貼っています。

Lambda関数は、Node.js 18.xをランタイム、../server/src/handler.ts のハンドラファイルをエントリポイントとし、環境変数には前述のSSMパラメータストアの値をセットしています。DynamoDBテーブルへの読み書き権限を付与しています。

APIGatewayは、Boltフレームワークのルーティングをそのまま使えるよう、URLパスとHTTPメソッドを限定せずデフォルト統合としてLambda関数と繋げています。

GeminiをStreamで応答させる

StreamでGeminiを呼ぶコード

ここまでのコードでもSlack上でボットと会話するという要件は満たせて問題ないですが、Streamで応答させるとユーザ話しかけてからの硬直が短縮され、体験が向上します。

Slackボットに組み込む前に generative-ai パッケージ単体で動作を確認します。

import { GoogleGenerativeAI } from "@google/generative-ai";

const main = async () => {
  const googleGenerativeAi = new GoogleGenerativeAI(
    process.env["GOOGLE_API_KEY"] ?? ""
  );

  const model = googleGenerativeAi.getGenerativeModel({
    model: "gemini-pro",
  });

  const chat = model.startChat({
    history: [
      {
        parts: "こんにちは。あなたは誰?",
        role: "user",
      },
      {
        parts: "どうも",
        role: "model",
      },
    ],
  });
  const { stream } = await chat.sendMessageStream("こんにちは。あなたは誰?");

  for await (const chunk of stream) {
    console.log(chunk.text());
  }
};

main();

chat.sendMessageStream を使用してメッセージを送信し、AIからの応答をストリームで取得するようにします。これにより、AIの応答がチャンクごとに受信され、受け取ったそばから処理できます。

Streamのデータは for await...of ループで扱います。個々のチャンクを取り出して chunk.text() を使用してテキスト応答を取得できます。

Streamで応答するSlackBotを実装

そしてSteam処理をSlackボットに組み込んだコードが以下です。

import {
  AllMiddlewareArgs,
  App,
  AppMentionEvent,
  AwsLambdaReceiver,
  SayFn,
} from "@slack/bolt";
import dotenv from "dotenv";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
  DeleteCommand,
  DynamoDBDocumentClient,
  PutCommand,
  QueryCommand,
} from "@aws-sdk/lib-dynamodb";
import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
import advancedFormat from "dayjs/plugin/advancedFormat";
import { orderBy } from "lodash-es";
import type { MessageDdbItem } from "./schema";
import type { APIGatewayProxyHandler } from "aws-lambda";
import {
  EnhancedGenerateContentResponse,
  GoogleGenerativeAI,
} from "@google/generative-ai";

dayjs.extend(utc);
dayjs.extend(advancedFormat);

const googleGenerativeAi = new GoogleGenerativeAI(
  process.env["GOOGLE_API_KEY"] ?? ""
);

const model = googleGenerativeAi.getGenerativeModel({
  model: "gemini-pro",
});

const nanoSecondFormat = "YYYY-MM-DDTHH:mm:ss.SSSSSSSSS[Z]";

const messagesTableName = process.env["MESSAGES_TABLE_NAME"] ?? "";
const threadTsIndexName = "threadTsIndex";

const ddbDocClient = DynamoDBDocumentClient.from(
  new DynamoDBClient({
    region: "ap-northeast-1",
  })
);

// @see https://slack.dev/bolt-js/deployments/aws-lambda
const awsLambdaReceiver = new AwsLambdaReceiver({
  signingSecret: process.env["SLACK_SIGNING_SECRET"] ?? "",
});
const app = new App({
  token: process.env["SLACK_BOT_TOKEN"] ?? "",
  receiver: awsLambdaReceiver,
});

// @see https://zenn.dev/yukiueda/articles/ef0f085f2bef8e
app.event(
  "app_mention",
  async ({ event, say, context, logger, body, ...rest }) => {
    try {
      // @see https://dev.classmethod.jp/articles/slack-resend-matome/
      if (context.retryNum != null && context.retryReason === "http_timeout") {
        // Slackからのタイムアウト再送リクエストのため無視
        return;
      }

      const threadTs = event.thread_ts ?? event.ts;
      const mentionRegex = /<@.*?>/g;

      const userMessageContent = event.text.replaceAll(mentionRegex, "").trim();

      // 会話中ユーザのこれまでの発言履歴を取得する
      const { Items: messages = [] } = await ddbDocClient.send(
        new QueryCommand({
          TableName: messagesTableName,
          IndexName: threadTsIndexName,
          KeyConditionExpression: "#threadTs = :threadTs",
          ExpressionAttributeNames: {
            "#threadTs": "threadTs",
          },
          ExpressionAttributeValues: {
            ":threadTs": threadTs,
          },
        })
      );

      // 時系列順にソートする
      const orderedMessages = orderBy(messages, "saidAt", "asc");
      // 直近15件を取得
      // orderedMessagesは .splice の破壊的操作により古い要素のみになる
      const resentMessages = orderedMessages.splice(-15);
      // 15件を超える発言は削除する
      await Promise.all(
        orderedMessages.map((message) =>
          ddbDocClient.send(
            new DeleteCommand({
              TableName: messagesTableName,
              Key: {
                id: message["id"],
              },
            })
          )
        )
      );

      const history = resentMessages.map(
        (message) => ({
          parts: message["content"],
          role: message["role"],
        }),
        ...[{ parts: userMessageContent, role: "user" }]
      );
      logger.info({ history });
      const chat = model.startChat({
        history,
      });

      // ユーザとChatGPTの会話履歴をChatGPT APIに投げ、返答を得る
      const aiMessage = await chat.sendMessageStream(userMessageContent);

      const aiMessageContent = await streamSay({
        stream: aiMessage.stream,
        event,
        say,
        client: app.client,
      });

      // ユーザの発言を保存する
      await ddbDocClient.send(
        new PutCommand({
          TableName: messagesTableName,
          Item: {
            // @ts-expect-error
            id: `${event.client_msg_id}#user`,
            content: userMessageContent,
            threadTs,
            saidAt: dayjs().format(nanoSecondFormat),
            role: "user",
          } satisfies MessageDdbItem,
        })
      );

      // ChatGPTの発言を保存する
      await ddbDocClient.send(
        new PutCommand({
          TableName: messagesTableName,
          Item: {
            // @ts-expect-error
            id: `${event.client_msg_id}#model`,
            content: aiMessageContent,
            threadTs,
            saidAt: dayjs().format(nanoSecondFormat),
            role: "model",
          } satisfies MessageDdbItem,
        })
      );
    } catch (error) {
      logger.error(error);
      await say({
        channel: event.channel,
        // @ts-expect-error
        text: `[システム]エラーが発生しました。\nclient_msg_id=${event.client_msg_id}\nerror=${error.message}`,
        thread_ts: event.thread_ts ?? event.ts,
      });
    }
  }
);

// @see https://slack.dev/bolt-js/deployments/aws-lambda
export const handler: APIGatewayProxyHandler = async (
  event,
  context,
  callback
) => {
  const awsLambdaReceiverHandler = await awsLambdaReceiver.start();
  return awsLambdaReceiverHandler(event, context, callback);
};

const streamSay = async ({
  stream,
  event,
  say,
  client,
}: {
  stream: AsyncGenerator<EnhancedGenerateContentResponse>;
  event: AppMentionEvent;
  say: SayFn;
  client: AllMiddlewareArgs["client"];
}): Promise<string> => {
  console.info(JSON.stringify({ message: "streamSayを開始します。", event }));
  let saidTs = "";
  let text = "";
  let prevLen = 0;

  for await (const chunk of stream) {
    const textChunk = chunk.text();
    if (textChunk === "") {
      continue;
    }

    text += textChunk;
    if (saidTs === "") {
      const res = await say({
        channel: event.channel,
        text,
        thread_ts: event.thread_ts ?? event.ts,
      });
      saidTs = res.ts!;
      continue;
    }

    // 15文字ごと
    if (text.length - prevLen > 15) {
      await client.chat.update({
        channel: event.channel,
        ts: saidTs,
        text,
      });
      prevLen = text.length;
    }
  }

  await client.chat.update({
    channel: event.channel,
    ts: saidTs,
    text,
  });

  console.info(JSON.stringify({ message: "streamSayを終了します。", text }));
  return text;
};

主な違いは streamSay 関数で、StreamのハンドリングとSlackの新規投稿/上書き更新する処理を streamSay として抽象化しています。

Geminiからの応答がStream生成されるため、応答のチャンクが利用可能になるとすぐにユーザに送信されます。ユーザは応答をリアルタイムで受け取ることができ、よりインタラクティブな体験になります。さらにテキストメッセージを逐次Slackで更新することで、長い応答を動的に表示します。

上記のコードではStreamで15文字以上受け取るごとに更新するようにしています。極端な話、1文字ごとにするとよりインタラクティブになる可能性がありますが、細かく更新をかけ過ぎると通信回数が増えてすべての応答内容を反映しきるまでの時間が長くなってしまうのと、あまりに高頻度にAPIを叩きすぎるとSlackプラットフォームへの不要な負荷に繋がりそうなので、筆者のさじ加減でこの値にしています。

おわりに

以上、AWSサーバレス上にGemini ProバックエンドのSlack Botを構築する方法でした。

Streamを活用することでAIによる1次返信までの時間が非常に短くなり、会話していてよりストレスがない体験を実現できたかと思います。

参考になれば幸いです。

参考

Discussion