LangChain QA BotでAWSリソースの使い所を考えてみる
概要
LangChainで QA Bot with DocumentsをAWS環境で作るのであれば、いろいろAWSリソースを使いたい。ということでふわっと考えてみる。
前提
基本はAPI Gateway+Lamdbaで、それ以外のLangChainのコンポーネントについてはなるべくマネージドを使いたい。主にこの辺になると思う。
- Index
- Memory
とりあえず、実際にできるかどうか?は横においておいて、ざっとできそうかどうか?で見ていく。
Index
自前ドキュメントを扱うためのIndexについてはさらに以下のコンポーネントに分かれる。
- Document Loader
- Vector Store
- Retriever
Document Loader
Documentそのものを読み込んでくるならば、まあ無難にS3を使うのが良いと思う。boto3で直接読み込んでもいいわけだし。
ただいちいちDocument読み込んでチャンクに分割してEmbeddingsに変換してDBに入れて・・・みたいなことを毎回やる必要はない。予めIndex化したものを読み込むか、都度参照するほうが良さそう。レスポンスのこともあるし。
ということで、ここはインデックスを作る際の参考程度。
Vector Store
LangChainでビルトインされているものでAWSサービス使えそうなのは以下。
- OpenSearch/ElasticSearch → OpenSearch Service
- PGVector → RDS
- Redis → ElastiCache
OpenSearchやRDSは、データサイズやレスポンス要件で変わってくるのかもしれないけど、個人的にはちょっと仰々しい気がする。ElastiCacheがまだマシという雰囲気かなぁ、個人的に使ったことないのでまだ仰々しすぎる気もする。知らんけど。
いっそpineconeみたいな別サービスを使うのがいい気もする。
Retriever
Retrieverはあまりサービス関係なさそう。この時点でデータがVectorStoreにあれば標準的なvector store retrieverで取れるわけだし。あえてサービスと紐付けるならこの辺なのかな?
- ElasticSearch BM25 → OpenSearch Service
LlamaIndexをつかったアプローチ
IndexであればLlamaIndexと組み合わせるのも良さそう。
やってみた
Index周りはデータサイズ次第で変わってくるとは思いつつも、まずはシンプルにやるならこんな感じで良さそう。
- ドキュメントを予めVector Index化したJSONファイルをS3においておく
- Lambdaから上記を読み込んで検索して使う
LlamaIndexのチュートリアルで使われているPaul GrahamのEssayをサンプルに、SageMaker notebooksで試してみた。
まずファイルからVector Index化。
!pip install boto3 openai langchain llama-index
!git clone https://github.com/jerryjliu/gpt_index.git
%cd gpt_index/examples/paul_graham_essay/data
!ls # paul_graham_essay.txt
VectorIndex化
import os
os.environ["OPENAI_API_KEY"] = "xxxxxxxxxxxx"
from llama_index import GPTSimpleVectorIndex, SimpleDirectoryReader
documents = SimpleDirectoryReader('data').load_data()
index = GPTSimpleVectorIndex.from_documents(documents)
index.save_to_disk('sample_index.json')
作成されたJSONファイルをs3に持っていく
from pathlib import Path
import boto3
import io
s3 = boto3.resource('s3')
bucket = s3.Bucket('sample_bucket')
KEY = 'sample_index.json'
ORIGIN_PATH = Path('sample_index.json')
bucket.upload_file(str(ORIGIN_PATH), KEY)
次に読み出す場合。
S3上にVector Index化されたJSONファイルを呼び出して使う。
import boto3
from langchain.agents import initialize_agent, Tool
from langchain.tools import BaseTool
from langchain.llms import OpenAI
from llama_index import GPTSimpleVectorIndex
s3 = boto3.resource('s3')
json = s3.Bucket("sample_bucket").Object("sample_index.json").get()['Body'].read()
index = GPTSimpleVectorIndex.load_from_string(json)
tools = [
Tool(
name="Paul Graham Essay",
func=lambda q: str(index.query(q)),
description="Useful for the generating the answers about Paul Graham",
return_direct=False
),
]
llm = OpenAI(temperature=0)
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)
agent.run("What did the author do growing up?")
Lambdaで動かすように書き換える必要はあるけど、取り回しが単純で良さそう。JSONのファイルサイズと読み込み時のレスポンスは気になるところだけど、
上記にある通り、
- エフェメラルストレージを使う
- handler外にインデックスを読み出す
等の対応でなんとかなりそう。LLMのレスポンス時間そのものはプロンプト頑張るしかない。
Memory
会話履歴のため。ざっと見る限りは以下。
- Postgres Chat Message History → RDS
- Redis Chat Message History → ElastiCache
ただしAPIリファレンスを見ると、DynamoDBも使える模様。
以下のサイトでもDynamoDBを使っているが、boto3を使っている様子。
DynamoDBが良さそう。
まあdictでファイルに出力しておいてというのもアリではある。どこまでの会話履歴を残すかによるけど。
やってみた
DynamoDBChatMessageHistoryのドキュメントはまったくないのだが、以下のRedisChatMessageHistoryが参考になる。
予めDynamoDB側にtableを作成しておく。このときインデックスを"SessionId"という名前で作っておくこと。
では、まずは単純なChatMessageHistoryだけ。DynamoDBChatMessageHistoryでテーブル名とインデックスキーとなるセッションIDを指定する。あとはadd_user_message/add_ai_messageで会話を追加していくだけ。
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
message_history = DynamoDBChatMessageHistory(table_name="langchain-sample", session_id='s000001')
message_history.add_user_message("hi!")
message_history.add_ai_message("whats up?")
message_history.add_user_message("I'm great. How about you?")
message_history.add_ai_message("I'm great too. Thanks!")
message_history.messages
結果
[HumanMessage(content='hi!', additional_kwargs={}),
AIMessage(content='whats up?', additional_kwargs={}),
HumanMessage(content="I'm great. How about you?", additional_kwargs={}),
AIMessage(content="I'm great too. Thanks!", additional_kwargs={})]
DynamoDB側も見てみる。
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('langchain-sample')
ret = table.get_item(Key={"SessionId": "s000001"})
ret["Item"]
ちゃんとDynamoDB側に会話履歴が入っているのがわかる。
{'SessionId': 's000001',
'History': [{'type': 'human',
'data': {'content': 'hi!', 'additional_kwargs': {}}},
{'type': 'ai', 'data': {'content': 'whats up?', 'additional_kwargs': {}}},
{'type': 'human',
'data': {'content': "I'm great. How about you?", 'additional_kwargs': {}}},
{'type': 'ai',
'data': {'content': "I'm great too. Thanks!", 'additional_kwargs': {}}}]}
ではシンプルにConversationChainで使ってみる。
from langchain.llms import OpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
import os
os.environ["OPENAI_API_KEY"] = "xxxxxxxxxxxx"
llm = OpenAI(temperature=0)
message_history = DynamoDBChatMessageHistory(table_name="langchain-sample", session_id='s000001')
memory = ConversationBufferMemory(chat_memory=message_history)
conversation = ConversationChain(
llm=llm,
verbose=True,
memory=memory
)
conversation.predict(input="Hi there!")
> Entering new ConversationChain chain...
Prompt after formatting:
The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.
Current conversation:
Human: Hi there!
AI:
> Finished chain.
" Hi there! It's nice to meet you. How can I help you today?"
conversation.predict(input="How's weather there?")
> Entering new ConversationChain chain...
Prompt after formatting:
The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.
Current conversation:
Human: Hi there!
AI: Hi there! It's nice to meet you. How can I help you today?
Human: How's weather there?
AI:
> Finished chain.
" The weather here is sunny and warm. The temperature is currently 75 degrees Fahrenheit and the humidity is low. It's a great day for outdoor activities!"
ちゃんと会話に含まれているのがわかる。DynamoDB側にもちゃんと入っていた。
{'SessionId': 's000001',
'History': [{'type': 'human',
'data': {'content': 'Hi there!', 'additional_kwargs': {}}},
{'type': 'ai',
'data': {'content': " Hi there! It's nice to meet you. How can I help you today?",
'additional_kwargs': {}}},
{'type': 'human',
'data': {'content': "How's weather there?", 'additional_kwargs': {}}},
{'type': 'ai',
'data': {'content': " The weather here is sunny and warm. The temperature is currently 75 degrees Fahrenheit and the humidity is low. It's a great day for outdoor activities!",
'additional_kwargs': {}}}]}
ただ、DynamoDBChatMessageHistoryのコードを見るとちょっと書き方が古い様子。boto3はあまり詳しくないのだけど、この辺。
後はこれを全部組み合わせるだけ。
全然関係ないけど、元々は↓をやりたい、かつ、AWS上でどう構築するか?というが個人的なモチベーションだった。
まだ整理ができていないけど、ちょっと考えてみた。
- LlamaIndexとLangChainを組み合わせるパターンは、LlamaIndexをLangChain Agentの単にインデックス検索ツールとして使うパターン。LlamaIndexのメソッドを使ってドキュメント検索している。
- このときToolとしてはCustom Toolとして使っている
Contextual Compression Retrieverは、その名の通りretriver。retrieverはドキュメント検索の部分を抽象化したインタフェース。となると、LlamaIndexで作成したインデックスに対して、LangChainのVectorStoreクラスとしてas_retrieverメソッドでアクセスできる必要があると思う。
で、元々やりたかったのはVectorStoreをAWSリソースに入れておいていちいちVector化せずに読み出すだけにしたい、ということだったので話が変わってくる。
- S3を単にストレージとして使うならば、LangChainのVectorStoreクラスで扱える形式である必要にする。例えばFAISSならindexファイル一つだけで良い様子。
- OpenSearch/ElasticSearch、PGVector、Redis等のAWSサービスで使えそうなものにする。
- Pinecone等のVectorSearchに特化したサービスを使う。
つまりLangChainネイティブな方向になるということ。ともうLlamaIndexを使う必要性がなくなる。
LlamaIndexのメリットは、
- シンプルなインタフェース
- LlamaHubにあるいろいろなLoader
だと思っているのだけど、LangChainと組み合わせて使う場合には、
LlamaIndex provides both Tool abstractions for a Langchain agent as well as a memory module.
https://gpt-index.readthedocs.io/en/latest/how_to/integrations/using_with_langchain.html
という点を踏まえて、やりたいことが実現できるかを考える必要がある、というのは今の認識。
あとはやっぱり組み合わせて使うと、こういうケースで両方調べないといけないってのがしんどいかもしれない。
(LlamaIndexをそこまで使い込んでるわけではないので間違ってたらご容赦ください)
ざっと見た感じ、ファイルベースのものは読み出し・保存のインタフェースが用意されてる。
- FAISS
- Annoy
- Chroma
以前にChromaは少し試してみた限りはディレクトリの中にデータが保存されるように思える。S3だとチョット使いづらいかもしれない。
FAISSとAnnoyはどうやら1ファイルで使える様子。
この辺に比較もある。
ざっくり見た感じはとりあえずFAISSが良さそうな雰囲気を感じたので、これで進めてみる。
とりあえずFAISSもディレクトリになっていた。
from langchain.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
loader = UnstructuredFileLoader("sample.txt")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap = 100, length_function = len,)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local("sample_index")
!ls -lt
drwxrwxr-x 2 ec2-user ec2-user 4096 Apr 30 14:06 sample_index
! ls -lt sample_index
-rw-rw-r-- 1 ec2-user ec2-user 129069 Apr 30 14:09 index.faiss
-rw-rw-r-- 1 ec2-user ec2-user 30726 Apr 30 14:09 index.pkl
LangChainのソースはこの辺
なるほど、インデックスそのもの(.index)と、docstore と index_to_docstore_id がpickle化されたもの(.pkl)を含んだディレクトリとして出力される様子。
いろいろ考えたけど、考えるのが面倒になったので、ディレクトリごとコピーすることにした。雑に書いてみた。
import os
import boto3
from langchain.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
bucket_name = "sample_bucket"
orig_docs_dir = "orig_docs"
file_name = "sample.txt"
os.environ['OPENAI_API_KEY'] = "xxxxxxxxxx"
def upload_dir_s3(dirpath, s3bucket):
for root,dirs,files in os.walk(dirpath):
for file in files:
s3bucket.upload_file(os.path.join(root,file),os.path.join(root,file))
def download_dir_s3(dirpath, s3bucket):
for obj in s3bucket.objects.filter(Prefix = dirpath):
if not os.path.exists(os.path.dirname(obj.key)):
os.makedirs(os.path.dirname(obj.key))
s3bucket.download_file(obj.key, obj.key)
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket_name)
download_dir_s3(orig_docs_dir, s3_bucket)
loader = UnstructuredFileLoader(f"{orig_docs_dir}/{file_name}")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap = 100, length_function = len,)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local("sample_index")
upload_dir_s3("sample_index", s3_bucket)
- S3バケットにある"orig_docs"ディレクトリをまるっとコピー
- ファイルを読み出してベクトル化
- save_localメソッドでディスク上にディレクトリとして出力。上の例だと"sample_index"
- S3バケットにディレクトリごとまるっとコピー
これでベクトル化されたインデックスがS3上に作成される。
でインデックス作成と実際に呼び出すのは別プロセスとしてやりたいので、呼び出すときはこんな感じ。上とさして変わらない。
import os
import boto3
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
bucket_name = "my-sagemaker-notebook-bucket"
index_dir = "sample_index"
os.environ['OPENAI_API_KEY'] = "xxxxxxxxx"
def download_dir_s3(dirpath, s3bucket):
for obj in s3bucket.objects.filter(Prefix = dirpath):
if not os.path.exists(os.path.dirname(obj.key)):
os.makedirs(os.path.dirname(obj.key))
s3bucket.download_file(obj.key, obj.key)
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket_name)
download_dir_s3(index_dir, s3_bucket)
embeddings = OpenAIEmbeddings()
db = FAISS.load_local(index_dir, embeddings)
(・・・)
このテーマとはちょっとずれるけど、改めてLangChainのChainとAgentsについて考えてみた。
- Chainにはいろいろな種類がある。
- Chainだけで完結することもできるし、(複数の)ChainをAgentのツールとして使うこともできる
で慣れていくといろんなことをやりたくなる。
- LLMへのプロンプト、そしてそのレスポンスをキャッシュしておいて、同じような質問が来たらLLMに問い合わせずにキャッシュで返したい
- 自前のドキュメントをベクトルDB化しておいて、そこから返したい
- 会話の履歴を保持しておいて、永続的なコンテキストとして渡したい
- 特定の入力に対してはフィルタを掛けたい
- LLMの応答結果を検証して適切なものだけを返したい
- その他色々・・・
- キャッシュをChainの手前で最初にかけたい。
全て満たせるような既存のChainがあればそれ使えばよいのだけど、自分の要件を満たせるChainがない、既存のChainの挙動を少し変えたい、独自のToolを使いたい、とか考え出すと、一気に複雑になる。LangChainのAgentとChainはやっぱり概念が少し曖昧な気がする。