🔥

ChatGPT APIとCloudflareを使って過去の会話を覚えてるLINEボットを構築する

2023/03/05に公開
3

ChatGPT APIのChat Completion APIを用いて、チャットの入力に対してその回答をレスポンスで返してくれます。
このチャットの入力に過去のチャットの内容を含めることで、過去の内容を前提とした回答を行うことができますが、これを実現するには、過去のチャットの内容を永続化しておく必要があります。

ユーザーインターフェースとしてLINE(LINE Messaging API)、LINEからの処理受付とChatGPTへのリクエスト、チャット内容の永続化をCloudflareを使って、過去の会話を覚えてるLINEボットを実現することができました。
本記事では、Cloudflare側の構成について紹介します。

[ChatGPT API][AWSサーバーレス]ChatGPT APIであなたとの会話・文脈を覚えてくれるLINEボットを作る方法まとめのCloudflare版の内容になります。(ただし、この記事内で言及されているLINEプラットフォーム署名検証についてはできていません。)

Cloudflare側の構成では、LINEからの処理受付とChatGPTへのリクエストでCloudflare Workers、チャット内容の永続化でCloudflare D1を利用します。
さらに、この構成ではCloudflare Queuesを用いています。
2023/3/5現在、Cloudflare Queuesの利用にはWorkers Paid Planが必要になりますので、参考にされる方はご注意ください。

なぜ Cloudflare Queues を使用したか

LINE Messaging API のwebhookを利用していますが、ボットサーバーは1秒以内にレスポンスを返す必要がありそうでした。
https://developers.line.biz/ja/docs/messaging-api/receiving-messages/#check-error-reason

webhookのリクエストを受けたWorkerで直接レスポンスを返そうとすると、ChatGPT APIからのレスポンスに時間がかかる場合があるため、クライアント側(webhook側)からのリクエストがキャンセルされる事象を確認しました。
その結果としてworkerの処理が途中で終了してしまい、LINE側に応答メッセージを返せないことがありました。

そこで、webhookからのリクエストを受けるWorkerをChatGPT APIへのリクエスト・LINE側への応答リクエスト・D1への登録に必要なデータをQueueに送りステータスコード200のレスポンスだけ返す処理とし、それとは別のWorkerでQueueからデータを取り出し必要な処理を行う構成としました。

[ChatGPT API][AWSサーバーレス]ChatGPT APIであなたとの会話・文脈を覚えてくれるLINEボットを作る方法まとめ では、Amazon APIGateway + AWS Lambdaの構成でこのような話は出ていなかったですが、おそらくLINE−API Gateway間の接続が終了してもAPI Gateway-Lambda間の接続は終了しておらずLambda側の処理が継続できたからなのではと推測しています。
(AWS Lambda Functions URLを使った構成でどうなるか、確認してみたいです。)

LINEの設定と動作検証

LINEの設定と、その検証のためのCloudflare Workersの構成は、Cloudflare Worker + D1 + Hono + OpenAIでLINE Botを作る を参考にしました。

上記と同じですが、後述するCloudflare WorkerではPOST /api/webhookを受ける構成としており、LINE側に設定するwebhook urlも[ベースURL]/api/webhookを指定しています。

以下は古い記載のため、読み飛ばして問題ないです。

Hono導入前の構成時の本文

ただし、最終的なCloudflare Workersの構成ではhonoを使用しませんでした。これは、Queuesを使用するため、Producer WorkerとConsumer Workerを以下のように構成する必要があると考えたからでした。

export default {
  async fetch(req: Request, env: Environment): Promise<Response> {
    ・・・・・・
  },
  async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {
    ・・・・・・
  },
};

上記はここからの一部引用。

honoを使ってもQueueを使う構成を取れるか、知見をお持ちの方教えていただきたく。

D1の構成

公式のGet startedが詳しいです。
https://developers.cloudflare.com/d1/get-started/

D1の作成

下記を実行して、Cloudflare上にD1のリソースを作成します。

npx wrangler d1 create cloudflare-linebot-chatgpt-api-db

wrangler.tomlに追加

上記で作成した内容を追加します。bindingは、Cloudflare Workers内で使用するD1のリソースにアクセスするために使用する設定になります。

[[ d1_databases ]]
binding = "DB"
database_name = "cloudflare-linebot-chatgpt-api-db"
database_id = "<UUID>"

テーブル構成と適用

今回は下記のようにDDLを作成しました。

schema.sql
DROP TABLE IF EXISTS messages;
CREATE TABLE messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  user_id TEXT NOT NULL,
  role TEXT NOT NULL,
  content TEXT NOT NULL
);

下記でDDLを適用します。

npx wrangler d1 execute cloudflare-linebot-chatgpt-api-db --file=./schema.sql

Queueの構成

公式のGet started guideが詳しいです。
https://developers.cloudflare.com/queues/get-started/

キューの作成

wranglerを使ってキューを作成します。

npx wrangler queues create cloudflare-linebot-chatgpt-api-queue

wrangler.tomlに追加

キューにメッセージを送るProducer Workerにキューをバインドする設定と、キューからメッセージを取り出すConsumer Workerの設定を追加します。

https://developers.cloudflare.com/queues/platform/configuration/

[[queues.producers]]
 queue = "cloudflare-linebot-chatgpt-api"
 binding = "QUEUE"

[[queues.consumers]]
 queue = "cloudflare-linebot-chatgpt-api"
 max_batch_size = 10   // キュー内のメッセージの数が10になったらメッセージを取り出す
 max_batch_timeout = 1 // キュー内のメッセージの数がmax_batch_sizeになっていなくても、1秒おきにキュー内のメッセージを確認するようにする

Workersの構成

Workersで参照している秘匿情報の設定

LINEに投稿するために必要なトークンと、ChatGPT APIにリクエストを投げるために必要なシークレットキーを登録します。
https://developers.cloudflare.com/workers/wrangler/commands/#secret

npx wrangler secret put CHANNEL_ACCESS_TOKEN

npx wrangler secret put OPENAI_API_KEY

Workersのソースコード

全体

Queueを使うときの構成でfetchという名前のProducer Workerとqueueという名前のConsumer Workerの関数を実装する必要があります。
Producer WorkerでHonoを使う場合は、以下のようにすればよいとyusukebeさんよりコメント頂きました。ありがとうございます。

import { Hono } from "hono";
type Bindings = {
  DB: D1Database;
  QUEUE: Queue;
  CHANNEL_ACCESS_TOKEN: string;
  ・・・・
};
const app = new Hono<{ Bindings: Bindings }>();
app.post("/api/webhook", async (c) => {
    ・・・
}
export default {
  fetch: app.fetch,
  async queue(batch: MessageBatch<Error>, env: Bindings): Promise<void> {
    ・・・・・・
  },
};

Producer Worker

Honoを使って、簡潔なルーティングの記載をしています。

const app = new Hono<{ Bindings: Bindings }>();

app.post("/api/webhook", async (c) => {
    ・・・
}

Queueへのメッセージ追加も以下で簡単にできます。lineのユーザーID、ユーザーの投稿した内容、LINE側に応答メッセージを送るために必要なreplyTokenをメッセージに含めています。

  const queueData = {
    userId,
    content: text,
    replyToken,
  };
  await c.env.QUEUE.send(queueData);

Consumer Worker

Consumer Worker(async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {...})の処理は以下のようになっています。

  1. キューのメッセージ(ユーザーの入力内容)を取り出す
    複数のメッセージを処理する可能性があるので、for文でぐるぐる回して以下の処理をする

  2. ユーザーの入力内容をD1に登録

    await env.DB.prepare(
        `insert into messages(user_id, role, content) values (?, "user", ?)`
      )
        .bind(userId, content)
        .run();
    
  3. lineのユーザーIDでテーブルからこれまでのチャットの内容を抽出

    const { results } = await env.DB.prepare(
        `select role, content from messages where user_id = ?1 order by id`
      )
        .bind(userId)
        .all<ChatGPTRequestMessage>();
    
  4. ChatGPT APIにリクエスト

    const res = await fetch("https://api.openai.com/v1/chat/completions", {
          method: "post",
          headers: {
            "Content-Type": "application/json",
            Authorization: `Bearer ${env.OPENAI_API_KEY}`,
          },
          body: JSON.stringify({
            model: "gpt-3.5-turbo",
            messages: chatGPTcontents,
          }),
        });
    const body = await res.json<ChatGPTResponse>();
    
  5. ChatGPT APIのレスポンスをDBに登録

    await env.DB.prepare(
          `insert into messages(user_id, role, content) values (?, "assistant", ?)`
        )
          .bind(userId, body.choices[0].message.content)
          .run();
    
  6. ChatGPT APIの回答をLINE側に登録する

    await fetch("https://api.line.me/v2/bot/message/reply", {
          body: JSON.stringify({
            replyToken: replyToken,
            messages: [response],
          }),
          method: "POST",
          headers: {
            Authorization: `Bearer ${accessToken}`,
            "Content-Type": "application/json",
          },
        });
    

Workersの処理全体は以下に記載しました。

Workersのソースコード
import { TextMessage, WebhookEvent } from "@line/bot-sdk";
import { Hono } from "hono";

type Bindings = {
  DB: D1Database;
  QUEUE: Queue;
  CHANNEL_ACCESS_TOKEN: string;
  CHANNEL_SECRET: string;
  OPENAI_API_KEY: string;
};

type Role = "user" | "system" | "assistant";

type RequestBody = {
  events: WebhookEvent[];
};

type QueueData = {
  userId: string;
  content: string;
  replyToken: string;
};

type QueueMessage = {
  body: QueueData;
  timestamp: string;
  id: string;
};

type ChatGPTRequestMessage = {
  role: Role;
  content: string;
};

type ChatGPTResponse = {
  id: string;
  object: "chat.completion";
  created: number;
  model: string;
  usage: {
    prompt_token: number;
    completion_token: number;
    total_tokens: number;
  };
  choices: {
    message: {
      role: "assistant";
      content: string;
    };
    finish_reason: string;
    index: number;
  }[];
};

const app = new Hono<{ Bindings: Bindings }>();

app.post("/api/webhook", async (c) => {
  // Extract From Request Body
  const data = await c.req.json<RequestBody>();
  const event = data.events[0];
  if (event.type !== "message" || event.message.type !== "text") {
    return new Response("body error", { status: 400 });
  }
  const { source, replyToken } = event;
  if (source.type !== "user") {
    return new Response("body error", { status: 400 });
  }
  const { userId } = source;
  const { text } = event.message;
  const queueData = {
    userId,
    content: text,
    replyToken,
  };
  await c.env.QUEUE.send(queueData);
  return c.json({ message: "ok" });
});

export default {
  fetch: app.fetch,
  async queue(batch: MessageBatch<Error>, env: Bindings): Promise<void> {
    let messages = JSON.stringify(batch.messages);
    const queueMessages = JSON.parse(messages) as QueueMessage[];
    for await (const message of queueMessages) {
      const { userId, content, replyToken } = message.body;
      // DBに登録する
      await env.DB.prepare(
        `insert into messages(user_id, role, content) values (?, "user", ?)`
      )
        .bind(userId, content)
        .run();
      // DBを参照する
      const { results } = await env.DB.prepare(
        `select role, content from messages where user_id = ?1 order by id`
      )
        .bind(userId)
        .all<ChatGPTRequestMessage>();
      const chatGPTcontents = results ?? [];
      try {
        const res = await fetch("https://api.openai.com/v1/chat/completions", {
          method: "post",
          headers: {
            "Content-Type": "application/json",
            Authorization: `Bearer ${env.OPENAI_API_KEY}`,
          },
          body: JSON.stringify({
            model: "gpt-3.5-turbo",
            messages: chatGPTcontents,
          }),
        });
        const body = await res.json<ChatGPTResponse>();
        // DBに登録する
        await env.DB.prepare(
          `insert into messages(user_id, role, content) values (?, "assistant", ?)`
        )
          .bind(userId, body.choices[0].message.content)
          .run();
        const accessToken: string = env.CHANNEL_ACCESS_TOKEN;
        const response: TextMessage = {
          type: "text",
          text: body.choices[0].message.content,
        };
        await fetch("https://api.line.me/v2/bot/message/reply", {
          body: JSON.stringify({
            replyToken: replyToken,
            messages: [response],
          }),
          method: "POST",
          headers: {
            Authorization: `Bearer ${accessToken}`,
            "Content-Type": "application/json",
          },
        });
      } catch (error) {
        if (error instanceof Error) {
          console.error(error);
        }
      }
    }
  },
};

デプロイ

以下のようにpackage.jsonに設定しておき

{
  "scripts": {
    ・・・・
    "deploy": "wrangler publish src/index.ts",
    ・・・・
  },
  ・・・・
}

以下のコマンドで実施

npm run deploy

リポジトリ

https://github.com/nmemoto/chatgpt-linebot-with-cloudflare

まとめ

Cloudflare Workers/D1/QueuesとChatGPT APIのChat Completion APIを用いて、過去の会話を覚えてくれるLINEボットを作りました。

参考

GitHubで編集を提案

Discussion

yusukebeyusukebe

こんにちわ。

honoを使ってもQueueを使う構成を取れるか、知見をお持ちの方教えていただきたく。

const app = new Hono()
app.get('/', (c) => c.text('Hono!'))

export default {
  fetch: app.fetch,
  async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {},
}

これでいけるかと!

nmemotonmemoto

こんにちは!教えて頂きありがとうございます。
本文もHonoを使うように修正します。

YamazakiYamazaki

こちらのエントリーを参考に、いくつか機能(処理)を追加したりして、自分なりのアプリの実装ができました!
せっかくでき上がったプログラムですから、皆さんに倣ってGitHubで公開してみました
https://github.com/yamazaki/Multilingual-Convo-Bot-in-LINE
とても参考になりました、ありがとうございました