ChatGPT APIとCloudflareを使って過去の会話を覚えてるLINEボットを構築する
ChatGPT APIのChat Completion APIを用いて、チャットの入力に対してその回答をレスポンスで返してくれます。
このチャットの入力に過去のチャットの内容を含めることで、過去の内容を前提とした回答を行うことができますが、これを実現するには、過去のチャットの内容を永続化しておく必要があります。
ユーザーインターフェースとしてLINE(LINE Messaging API)、LINEからの処理受付とChatGPTへのリクエスト、チャット内容の永続化をCloudflareを使って、過去の会話を覚えてるLINEボットを実現することができました。
本記事では、Cloudflare側の構成について紹介します。
[ChatGPT API][AWSサーバーレス]ChatGPT APIであなたとの会話・文脈を覚えてくれるLINEボットを作る方法まとめのCloudflare版の内容になります。(ただし、この記事内で言及されているLINEプラットフォーム署名検証についてはできていません。)
Cloudflare側の構成では、LINEからの処理受付とChatGPTへのリクエストでCloudflare Workers、チャット内容の永続化でCloudflare D1を利用します。
さらに、この構成ではCloudflare Queuesを用いています。
2023/3/5現在、Cloudflare Queuesの利用にはWorkers Paid Planが必要になりますので、参考にされる方はご注意ください。
なぜ Cloudflare Queues を使用したか
LINE Messaging API のwebhookを利用していますが、ボットサーバーは1秒以内にレスポンスを返す必要がありそうでした。
webhookのリクエストを受けたWorkerで直接レスポンスを返そうとすると、ChatGPT APIからのレスポンスに時間がかかる場合があるため、クライアント側(webhook側)からのリクエストがキャンセルされる事象を確認しました。
その結果としてworkerの処理が途中で終了してしまい、LINE側に応答メッセージを返せないことがありました。
そこで、webhookからのリクエストを受けるWorkerをChatGPT APIへのリクエスト・LINE側への応答リクエスト・D1への登録に必要なデータをQueueに送りステータスコード200のレスポンスだけ返す処理とし、それとは別のWorkerでQueueからデータを取り出し必要な処理を行う構成としました。
[ChatGPT API][AWSサーバーレス]ChatGPT APIであなたとの会話・文脈を覚えてくれるLINEボットを作る方法まとめ では、Amazon APIGateway + AWS Lambdaの構成でこのような話は出ていなかったですが、おそらくLINE−API Gateway間の接続が終了してもAPI Gateway-Lambda間の接続は終了しておらずLambda側の処理が継続できたからなのではと推測しています。
(AWS Lambda Functions URLを使った構成でどうなるか、確認してみたいです。)
LINEの設定と動作検証
LINEの設定と、その検証のためのCloudflare Workersの構成は、Cloudflare Worker + D1 + Hono + OpenAIでLINE Botを作る を参考にしました。
上記と同じですが、後述するCloudflare WorkerではPOST /api/webhook
を受ける構成としており、LINE側に設定するwebhook urlも[ベースURL]/api/webhook
を指定しています。
以下は古い記載のため、読み飛ばして問題ないです。
Hono導入前の構成時の本文
ただし、最終的なCloudflare Workersの構成ではhonoを使用しませんでした。これは、Queuesを使用するため、Producer WorkerとConsumer Workerを以下のように構成する必要があると考えたからでした。
export default {
async fetch(req: Request, env: Environment): Promise<Response> {
・・・・・・
},
async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {
・・・・・・
},
};
上記はここからの一部引用。
honoを使ってもQueueを使う構成を取れるか、知見をお持ちの方教えていただきたく。
D1の構成
公式のGet startedが詳しいです。
D1の作成
下記を実行して、Cloudflare上にD1のリソースを作成します。
npx wrangler d1 create cloudflare-linebot-chatgpt-api-db
wrangler.tomlに追加
上記で作成した内容を追加します。bindingは、Cloudflare Workers内で使用するD1のリソースにアクセスするために使用する設定になります。
[[ d1_databases ]]
binding = "DB"
database_name = "cloudflare-linebot-chatgpt-api-db"
database_id = "<UUID>"
テーブル構成と適用
今回は下記のようにDDLを作成しました。
DROP TABLE IF EXISTS messages;
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL
);
下記でDDLを適用します。
npx wrangler d1 execute cloudflare-linebot-chatgpt-api-db --file=./schema.sql
Queueの構成
公式のGet started guideが詳しいです。
キューの作成
wranglerを使ってキューを作成します。
npx wrangler queues create cloudflare-linebot-chatgpt-api-queue
wrangler.tomlに追加
キューにメッセージを送るProducer Workerにキューをバインドする設定と、キューからメッセージを取り出すConsumer Workerの設定を追加します。
[[queues.producers]]
queue = "cloudflare-linebot-chatgpt-api"
binding = "QUEUE"
[[queues.consumers]]
queue = "cloudflare-linebot-chatgpt-api"
max_batch_size = 10 // キュー内のメッセージの数が10になったらメッセージを取り出す
max_batch_timeout = 1 // キュー内のメッセージの数がmax_batch_sizeになっていなくても、1秒おきにキュー内のメッセージを確認するようにする
Workersの構成
Workersで参照している秘匿情報の設定
LINEに投稿するために必要なトークンと、ChatGPT APIにリクエストを投げるために必要なシークレットキーを登録します。
npx wrangler secret put CHANNEL_ACCESS_TOKEN
npx wrangler secret put OPENAI_API_KEY
Workersのソースコード
全体
Queueを使うときの構成でfetchという名前のProducer Workerとqueueという名前のConsumer Workerの関数を実装する必要があります。
Producer WorkerでHonoを使う場合は、以下のようにすればよいとyusukebeさんよりコメント頂きました。ありがとうございます。
import { Hono } from "hono";
type Bindings = {
DB: D1Database;
QUEUE: Queue;
CHANNEL_ACCESS_TOKEN: string;
・・・・
};
const app = new Hono<{ Bindings: Bindings }>();
app.post("/api/webhook", async (c) => {
・・・
}
export default {
fetch: app.fetch,
async queue(batch: MessageBatch<Error>, env: Bindings): Promise<void> {
・・・・・・
},
};
Producer Worker
Honoを使って、簡潔なルーティングの記載をしています。
const app = new Hono<{ Bindings: Bindings }>();
app.post("/api/webhook", async (c) => {
・・・
}
Queueへのメッセージ追加も以下で簡単にできます。lineのユーザーID、ユーザーの投稿した内容、LINE側に応答メッセージを送るために必要なreplyTokenをメッセージに含めています。
const queueData = {
userId,
content: text,
replyToken,
};
await c.env.QUEUE.send(queueData);
Consumer Worker
Consumer Worker(async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {...}
)の処理は以下のようになっています。
-
キューのメッセージ(ユーザーの入力内容)を取り出す
複数のメッセージを処理する可能性があるので、for文でぐるぐる回して以下の処理をする -
ユーザーの入力内容をD1に登録
await env.DB.prepare( `insert into messages(user_id, role, content) values (?, "user", ?)` ) .bind(userId, content) .run();
-
lineのユーザーIDでテーブルからこれまでのチャットの内容を抽出
const { results } = await env.DB.prepare( `select role, content from messages where user_id = ?1 order by id` ) .bind(userId) .all<ChatGPTRequestMessage>();
-
ChatGPT APIにリクエスト
const res = await fetch("https://api.openai.com/v1/chat/completions", { method: "post", headers: { "Content-Type": "application/json", Authorization: `Bearer ${env.OPENAI_API_KEY}`, }, body: JSON.stringify({ model: "gpt-3.5-turbo", messages: chatGPTcontents, }), }); const body = await res.json<ChatGPTResponse>();
-
ChatGPT APIのレスポンスをDBに登録
await env.DB.prepare( `insert into messages(user_id, role, content) values (?, "assistant", ?)` ) .bind(userId, body.choices[0].message.content) .run();
-
ChatGPT APIの回答をLINE側に登録する
await fetch("https://api.line.me/v2/bot/message/reply", { body: JSON.stringify({ replyToken: replyToken, messages: [response], }), method: "POST", headers: { Authorization: `Bearer ${accessToken}`, "Content-Type": "application/json", }, });
Workersの処理全体は以下に記載しました。
Workersのソースコード
import { TextMessage, WebhookEvent } from "@line/bot-sdk";
import { Hono } from "hono";
type Bindings = {
DB: D1Database;
QUEUE: Queue;
CHANNEL_ACCESS_TOKEN: string;
CHANNEL_SECRET: string;
OPENAI_API_KEY: string;
};
type Role = "user" | "system" | "assistant";
type RequestBody = {
events: WebhookEvent[];
};
type QueueData = {
userId: string;
content: string;
replyToken: string;
};
type QueueMessage = {
body: QueueData;
timestamp: string;
id: string;
};
type ChatGPTRequestMessage = {
role: Role;
content: string;
};
type ChatGPTResponse = {
id: string;
object: "chat.completion";
created: number;
model: string;
usage: {
prompt_token: number;
completion_token: number;
total_tokens: number;
};
choices: {
message: {
role: "assistant";
content: string;
};
finish_reason: string;
index: number;
}[];
};
const app = new Hono<{ Bindings: Bindings }>();
app.post("/api/webhook", async (c) => {
// Extract From Request Body
const data = await c.req.json<RequestBody>();
const event = data.events[0];
if (event.type !== "message" || event.message.type !== "text") {
return new Response("body error", { status: 400 });
}
const { source, replyToken } = event;
if (source.type !== "user") {
return new Response("body error", { status: 400 });
}
const { userId } = source;
const { text } = event.message;
const queueData = {
userId,
content: text,
replyToken,
};
await c.env.QUEUE.send(queueData);
return c.json({ message: "ok" });
});
export default {
fetch: app.fetch,
async queue(batch: MessageBatch<Error>, env: Bindings): Promise<void> {
let messages = JSON.stringify(batch.messages);
const queueMessages = JSON.parse(messages) as QueueMessage[];
for await (const message of queueMessages) {
const { userId, content, replyToken } = message.body;
// DBに登録する
await env.DB.prepare(
`insert into messages(user_id, role, content) values (?, "user", ?)`
)
.bind(userId, content)
.run();
// DBを参照する
const { results } = await env.DB.prepare(
`select role, content from messages where user_id = ?1 order by id`
)
.bind(userId)
.all<ChatGPTRequestMessage>();
const chatGPTcontents = results ?? [];
try {
const res = await fetch("https://api.openai.com/v1/chat/completions", {
method: "post",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-3.5-turbo",
messages: chatGPTcontents,
}),
});
const body = await res.json<ChatGPTResponse>();
// DBに登録する
await env.DB.prepare(
`insert into messages(user_id, role, content) values (?, "assistant", ?)`
)
.bind(userId, body.choices[0].message.content)
.run();
const accessToken: string = env.CHANNEL_ACCESS_TOKEN;
const response: TextMessage = {
type: "text",
text: body.choices[0].message.content,
};
await fetch("https://api.line.me/v2/bot/message/reply", {
body: JSON.stringify({
replyToken: replyToken,
messages: [response],
}),
method: "POST",
headers: {
Authorization: `Bearer ${accessToken}`,
"Content-Type": "application/json",
},
});
} catch (error) {
if (error instanceof Error) {
console.error(error);
}
}
}
},
};
デプロイ
以下のようにpackage.jsonに設定しておき
{
"scripts": {
・・・・
"deploy": "wrangler publish src/index.ts",
・・・・
},
・・・・
}
以下のコマンドで実施
npm run deploy
リポジトリ
まとめ
Cloudflare Workers/D1/QueuesとChatGPT APIのChat Completion APIを用いて、過去の会話を覚えてくれるLINEボットを作りました。
Discussion
こんにちわ。
これでいけるかと!
こんにちは!教えて頂きありがとうございます。
本文もHonoを使うように修正します。
こちらのエントリーを参考に、いくつか機能(処理)を追加したりして、自分なりのアプリの実装ができました!
せっかくでき上がったプログラムですから、皆さんに倣ってGitHubで公開してみました とても参考になりました、ありがとうございました