😃

LangChain(JS)にて DynamoDB-Backed Chat Memoryを試す(1)

2023/08/24に公開

はじめに

もともとタイトルとは別の作業だったのですが、久々にLangChainのドキュメントを読んでいたら
DynamoDB-Backed Chat Memory

というのが追加されていたので試してみました。
今回も軽い気持ちで記事を書いたのですが、長くなりそうです。。。

前提

Node.js は v18系。

主なパッケージのバージョンは以下になります。

"@aws-sdk/client-dynamodb": "^3.395.0",
"langchain": "^0.0.131",
"dotenv": "^16.3.1",
"openai": "^4.0.1",
"dayjs": "^1.11.9",

既にLangChain、OpenAI APIを実行する環境は整っているものとします。

追加パッケージのインストール

npm install @aws-sdk/client-dynamodb

上記の他に、公式ドキュメントのサンプルコードで利用するものとは別に、dotenv、dayjsを独自に入れています。

npm install dayjs dotenv

OpenAI APIなしで試してみる

Usage(サンプルコード)を修正し、ConversationChain無し(OpenAI API無し)で履歴の追加、取得を行い、DynamoDBにどのようなレコードが追加されるか見てみます。

テーブルはidという名前でパーティションキーを指定しただけのものです。

memory.js
import { BufferMemory } from "langchain/memory";
import { DynamoDBChatMessageHistory } from "langchain/stores/message/dynamodb";

const memory = new BufferMemory({
  chatHistory: new DynamoDBChatMessageHistory({  
    tableName: "langchain",
    partitionKey: "id",
    sessionId: new Date().toISOString(), // Or some other unique identifier for the conversation
    config: {
      //region: "us-east-2",
      region: "ap-northeast-1",
      // プロファイルで別途指定済み
      //credentials: {
        //accessKeyId: "<your AWS access key id>",
        //secretAccessKey: "<your AWS secret access key>",
      //},
    },
  }),
});

// chatHistoryに直接履歴を追加
await memory.chatHistory.addUserMessage("Hi!");
await memory.chatHistory.addAIChatMessage("What's up?");
// chatHistoryから直接履歴を取得
const messages = await memory.chatHistory.getMessages();
console.log(messages, 'messages')

//const model = new ChatOpenAI();
//const chain = new ConversationChain({ llm: model, memory });

//const res1 = await chain.call({ input: "Hi! I'm Jim." });
//console.log({ res1 });
//const res2 = await chain.call({ input: "What did I just say my name was?" });

2回実行します。

node memory.js
node memory.js

追加されたレコードを確認します。

aws dynamodb scan --table-name langchain

2レコード追加されています。
Itemsの中身だけ抜粋します。
※messagesの中身は同一なので省略しています。

{
  "Items": [
    {
      "id": {
        "S": "2023-08-23T12:13:08.925Z"
      },
      "messages": {
        "L": [
          {
            "M": {
              "type": {
                "S": "human"
              },
              "text": {
                "S": "Hi!"
              }
            }
          },
          {
            "M": {
              "type": {
                "S": "ai"
              },
              "text": {
                "S": "What's up?"
              }
            }
          }
        ]
      }
    },
    {
      "id": {
        "S": "2023-08-23T12:11:33.381Z"
      },
      "messages": {
        //略
      }
    }
  ]
}

partitionKeyで指定したカラムにsessionIdが保存され、その値がnew Date().toISOString()の値になっています。
そして、セッションごとにレコードが作成されます。今回は実行するたびにキー(時刻)が変わったので2レコードとなります。セッション内の会話は上記の結果のようにmessages内に連想配列で格納されます。

キーを変更する

キーが変わらなければ実行するたびに同じsessionIdのレコードに会話が追加されていくことになります。既存のsessionIdを直接指定します。

//sessionId: new Date().toISOString(),
sessionId: "2023-08-23T12:13:08.925Z",

に変更して実行します。

2回実行し、レコードを確認します。

{
  "M": {
      "type": {
          "S": "human"
      },
      "text": {
          "S": "Hi!"
      }
  }
},
{
  "M": {
      "type": {
          "S": "ai"
      },
      "text": {
          "S": "What's up?"
      }
  }
}

上記が3組、messagesの中に含まれていました。
初めの1組に追加して、今回追加の2組で3組になっています。

clear()

せっかくなので、ついでに不要なレコードを消してみます。

memory.js
import { BufferMemory } from "langchain/memory";
import { DynamoDBChatMessageHistory } from "langchain/stores/message/dynamodb";

const memory = new BufferMemory({
    // 略
    sessionId: "2023-08-23T12:13:08.925Z",
    // 略
});

await memory.chatHistory.clear();

上記を実行します。

node memory.js

テーブルの中を確認すると指定したsessionIdのレコードが削除されていました。

OpenAI APIを組み込む

ConversationChainを戻して(OpenAI APIを利用して)
Usage(サンプルコード)
をベースに作成したコード実行してみます。

sessionIdは一旦シンプルにYYYY-MM-DD形式で1日単位で作成することにします。
(日付が変わるとリセットされる)

APIキー(OPENAI_API_KEY)は .envに定義してください。

以下が修正したコードになります。
この後の検証でも使うため、エラーハンドリングも加え、Chainのverboseもtrueにしておきます。

memory.js
import { BufferMemory } from "langchain/memory";
import { DynamoDBChatMessageHistory } from "langchain/stores/message/dynamodb";

import { ChatOpenAI } from "langchain/chat_models/openai";
import { ConversationChain } from "langchain/chains";

import dayjs from "dayjs";
import utc from "dayjs/plugin/utc.js";
import timezone from "dayjs/plugin/timezone.js";

import dotenv from "dotenv";
// OPEN AI のAPIキーなどを .envに定義しておく
dotenv.config();

// 現在の日付を取得
dayjs.extend(utc);
dayjs.extend(timezone);
dayjs.tz.setDefault("Asia/Tokyo"); // 日本のタイムゾーン
const currentDate = dayjs.tz().format("YYYY-MM-DD");
//const currentDate = dayjs.tz().format("YYYY-MM-DD HH:mm:ss");

const chatHistory = new DynamoDBChatMessageHistory({
  tableName: "langchain",
  partitionKey: "id",
  sessionId: currentDate,
  config: {
    region: "ap-northeast-1",
  },
});

const memory = new BufferMemory({
  chatHistory: chatHistory,
});

const model = new ChatOpenAI();
const chain = new ConversationChain({ llm: model, memory, verbose: true });

try {
  const res1 = await chain.call({ input: "おすすめの映画を教えてください。" });
  console.log({ res1 });
} catch (error) {
  if (error.response) {
    console.log("Status:", error.response.status);
    console.log("Data:", error.response.data);
  } else if (error.request) {
    console.log("Request:", error.request);
  } else {
    console.log("Error:", error.message);
  }
}

const messages1 = await memory.chatHistory.getMessages();
console.log(messages1)

初回は例えばこんな返事をしてくれます。

おすすめの映画ですか?それは私の得意分野の一つです!あなたの好みに合わせた映画を見つけるのが得意ですよ。一体何のジャンルがお好きですか?

面倒ですが、結果を確認しながら、質問を書き換えてしばらく会話を続けていきます。

文字数制限対策

会話を続けていくと、出力されるログからも分かるように、履歴はどんどん長くなっていきます。
当然ですが上限を超えるとエラーになります。

[llm/error] [1:chain:ConversationChain > 2:llm:ChatOpenAI] [1.05s] LLM run errored with error: "Request failed with status code 400"
[chain/error] [1:chain:ConversationChain] [1.05s] Chain run errored with error: "Request failed with status code 400"
Status: 400
Data: {
  error: {
    message: "This model's maximum context length is 4097 tokens. However, your messages resulted in 4119 tokens. Please reduce the length of the messages.",
    type: 'invalid_request_error',
    param: 'messages',
    code: 'context_length_exceeded'
  }
}

DynamoDB-Backed Chat Memoryや、BufferMemoryConversationChainでは特にトークンの上限に関した制御までは行なっていないようです。
そもそもトークン上限に対応するためどのような制御を加えるべきか、から考える必要がありそうです。
既に大分長くなってしまったので、今回はここで区切ろうかと思ったのですが、最後にDynamoDBChatMessageHistoryのソースコードだけ確認しておこうと思います。

DynamoDBChatMessageHistory

コードはnode_modulesから読みやすい拡張子のファイルを参照しています。
親クラスのBaseListChatMessageHistoryには特筆するような具体的な実装がないので触れません。

まず、該当部分のコード全体を先に引用しておきます。

node_modules/langchain/dist/stores/message/dynamodb.js
import { DynamoDBClient, GetItemCommand, UpdateItemCommand, DeleteItemCommand, } from "@aws-sdk/client-dynamodb";
import { BaseListChatMessageHistory, } from "../../schema/index.js";
import { mapChatMessagesToStoredMessages, mapStoredMessagesToChatMessages, } from "./utils.js";
export class DynamoDBChatMessageHistory extends BaseListChatMessageHistory {
    get lc_secrets() {
        return {
            "config.credentials.accessKeyId": "AWS_ACCESS_KEY_ID",
            "config.credentials.secretAccessKey": "AWS_SECRETE_ACCESS_KEY",
            "config.credentials.sessionToken": "AWS_SESSION_TOKEN",
        };
    }
    constructor({ tableName, sessionId, partitionKey, sortKey, messageAttributeName, config, }) {
        super();
        Object.defineProperty(this, "lc_namespace", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: ["langchain", "stores", "message", "dynamodb"]
        });
        Object.defineProperty(this, "tableName", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: void 0
        });
        Object.defineProperty(this, "sessionId", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: void 0
        });
        Object.defineProperty(this, "client", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: void 0
        });
        Object.defineProperty(this, "partitionKey", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: "id"
        });
        Object.defineProperty(this, "sortKey", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: void 0
        });
        Object.defineProperty(this, "messageAttributeName", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: "messages"
        });
        Object.defineProperty(this, "dynamoKey", {
            enumerable: true,
            configurable: true,
            writable: true,
            value: void 0
        });
        this.tableName = tableName;
        this.sessionId = sessionId;
        this.client = new DynamoDBClient(config ?? {});
        this.partitionKey = partitionKey ?? this.partitionKey;
        this.sortKey = sortKey;
        this.messageAttributeName =
            messageAttributeName ?? this.messageAttributeName;
        this.dynamoKey = {};
        this.dynamoKey[this.partitionKey] = { S: this.sessionId };
        if (this.sortKey) {
            this.dynamoKey[this.sortKey] = { S: this.sortKey };
        }
    }
    async getMessages() {
        const params = {
            TableName: this.tableName,
            Key: this.dynamoKey,
        };
        const response = await this.client.send(new GetItemCommand(params));
        const items = response.Item
            ? response.Item[this.messageAttributeName]?.L ?? []
            : [];
        const messages = items
            .map((item) => ({
            type: item.M?.type.S,
            data: {
                role: item.M?.role?.S,
                content: item.M?.text.S,
            },
        }))
            .filter((x) => x.type !== undefined && x.data.content !== undefined);
        return mapStoredMessagesToChatMessages(messages);
    }
    async clear() {
        const params = {
            TableName: this.tableName,
            Key: this.dynamoKey,
        };
        await this.client.send(new DeleteItemCommand(params));
    }
    async addMessage(message) {
        const messages = mapChatMessagesToStoredMessages([message]);
        const params = {
            TableName: this.tableName,
            Key: this.dynamoKey,
            ExpressionAttributeNames: {
                "#m": this.messageAttributeName,
            },
            ExpressionAttributeValues: {
                ":empty_list": {
                    L: [],
                },
                ":m": {
                    L: messages.map((message) => {
                        const dynamoSerializedMessage = {
                            M: {
                                type: {
                                    S: message.type,
                                },
                                text: {
                                    S: message.data.content,
                                },
                            },
                        };
                        if (message.data.role) {
                            dynamoSerializedMessage.M.role = { S: message.data.role };
                        }
                        return dynamoSerializedMessage;
                    }),
                },
            },
            UpdateExpression: "SET #m = list_append(if_not_exists(#m, :empty_list), :m)",
        };
        await this.client.send(new UpdateItemCommand(params));
    }
}

コンストラクタ

一部抜粋します。

this.tableName = tableName;
this.sessionId = sessionId;
this.client = new DynamoDBClient(config ?? {});
this.partitionKey = partitionKey ?? this.partitionKey;
this.sortKey = sortKey;
this.messageAttributeName =
    messageAttributeName ?? this.messageAttributeName;
this.dynamoKey = {};
this.dynamoKey[this.partitionKey] = { S: this.sessionId };
if (this.sortKey) {
    this.dynamoKey[this.sortKey] = { S: this.sortKey };
}

Dynamodbのクライアントの作成(DynamoDBClient)
テーブル名、パーティションキー、ソートキーの指定、メッセージを格納するカラムの指定をしています。

this.dynamoKeyはレコードの主キーを定義しています。
コードを見て分かるように Sで文字列型で限定されてしまっているので注意したいです。
また、パーティションキーは動的に与えられるようですが、ソートキーは指定できるものの大分使い方が限定されそうです。
※ どういう用途を想定しているのか気になりますが、今回は特にソートキーを必要としないこともあり、深追いしません。

getMessages

一部を抜粋します。

const params = {
    TableName: this.tableName,
    Key: this.dynamoKey,
};
const response = await this.client.send(new GetItemCommand(params));

主キーでレコードを1件取得します。レコード内の指定カラム(デフォルトmessages)に複数メッセージが格納されます。

また、他の履歴レコードを一覧取得するようなメソッドは用意されていません。

clear

async clear() {
    const params = {
        TableName: this.tableName,
        Key: this.dynamoKey,
    };
    await this.client.send(new DeleteItemCommand(params));
}

主キーでレコードを1件削除しています。
複数レコードを削除するメソッドは用意されていません。

addMessage

主キーで特定した1件のレコードをUpdateItemしています。
過去にレコードの構造に変更があったようで、mapChatMessagesToStoredMessagesの中に変換処理が入っています。

現在のメッセージの要素は

  • type: 発話者の種類。ai, humanなど。他に、systemfunctionchatがありますが、今回はまだ登場しません。今後出現した時、特記すべき点があれば扱います。ただし、chatは古いフォーマットのような気がします。
  • text: メッセージ本文

まとめと次回について

今回はサンプルコードの範囲程度ですが、実際に動かしながら仕様を確認しました。
また、参考程度にDynamoDBChatMessageHistoryの実装も確認しました。
しかし、トークン上限などやり取りを繰り返した際の処理に課題があることもわかりました。
他にも、実用的なシステムを作るにあたって、直近の会話履歴(トークン上限の範囲内)に収まらない長期的な記憶も必要かもしれません。
次回は、LangChainのChainやMemoryを中心に既存の実装で活用できる機能がないか確認したいと思います。また、必要な記憶の実装を洗い出し、LangChainで実現できないものがあれば拡張を検討してみたいと思います。

Discussion