Closed9

LangChain QA BotでAWSリソースの使い所を考えてみる

kun432kun432

概要

LangChainで QA Bot with DocumentsをAWS環境で作るのであれば、いろいろAWSリソースを使いたい。ということでふわっと考えてみる。

前提

基本はAPI Gateway+Lamdbaで、それ以外のLangChainのコンポーネントについてはなるべくマネージドを使いたい。主にこの辺になると思う。

  • Index
  • Memory

とりあえず、実際にできるかどうか?は横においておいて、ざっとできそうかどうか?で見ていく。

kun432kun432

Index

自前ドキュメントを扱うためのIndexについてはさらに以下のコンポーネントに分かれる。

  • Document Loader
  • Vector Store
  • Retriever

Document Loader

https://python.langchain.com/en/latest/modules/indexes/document_loaders.html

Documentそのものを読み込んでくるならば、まあ無難にS3を使うのが良いと思う。boto3で直接読み込んでもいいわけだし。

ただいちいちDocument読み込んでチャンクに分割してEmbeddingsに変換してDBに入れて・・・みたいなことを毎回やる必要はない。予めIndex化したものを読み込むか、都度参照するほうが良さそう。レスポンスのこともあるし。

ということで、ここはインデックスを作る際の参考程度。

Vector Store

https://python.langchain.com/en/latest/modules/indexes/vectorstores.html

LangChainでビルトインされているものでAWSサービス使えそうなのは以下。

  • OpenSearch/ElasticSearch → OpenSearch Service
  • PGVector → RDS
  • Redis → ElastiCache

OpenSearchやRDSは、データサイズやレスポンス要件で変わってくるのかもしれないけど、個人的にはちょっと仰々しい気がする。ElastiCacheがまだマシという雰囲気かなぁ、個人的に使ったことないのでまだ仰々しすぎる気もする。知らんけど。

いっそpineconeみたいな別サービスを使うのがいい気もする。

Retriever

Retrieverはあまりサービス関係なさそう。この時点でデータがVectorStoreにあれば標準的なvector store retrieverで取れるわけだし。あえてサービスと紐付けるならこの辺なのかな?

  • ElasticSearch BM25 → OpenSearch Service

LlamaIndexをつかったアプローチ

IndexであればLlamaIndexと組み合わせるのも良さそう。

https://llamahub.ai/

やってみた

Index周りはデータサイズ次第で変わってくるとは思いつつも、まずはシンプルにやるならこんな感じで良さそう。

  • ドキュメントを予めVector Index化したJSONファイルをS3においておく
  • Lambdaから上記を読み込んで検索して使う

LlamaIndexのチュートリアルで使われているPaul GrahamのEssayをサンプルに、SageMaker notebooksで試してみた。

まずファイルからVector Index化。

!pip install boto3 openai langchain llama-index
!git clone https://github.com/jerryjliu/gpt_index.git
%cd gpt_index/examples/paul_graham_essay/data
!ls     # paul_graham_essay.txt

VectorIndex化

import os
os.environ["OPENAI_API_KEY"] = "xxxxxxxxxxxx"
from llama_index import GPTSimpleVectorIndex, SimpleDirectoryReader

documents = SimpleDirectoryReader('data').load_data()
index = GPTSimpleVectorIndex.from_documents(documents)
index.save_to_disk('sample_index.json')

作成されたJSONファイルをs3に持っていく

from pathlib import Path
import boto3
import io

s3 = boto3.resource('s3')
bucket = s3.Bucket('sample_bucket')

KEY = 'sample_index.json'
ORIGIN_PATH = Path('sample_index.json')

bucket.upload_file(str(ORIGIN_PATH), KEY)

次に読み出す場合。

S3上にVector Index化されたJSONファイルを呼び出して使う。

import boto3
from langchain.agents import initialize_agent, Tool
from langchain.tools import BaseTool
from langchain.llms import OpenAI
from llama_index import GPTSimpleVectorIndex

s3 = boto3.resource('s3')
json = s3.Bucket("sample_bucket").Object("sample_index.json").get()['Body'].read()

index = GPTSimpleVectorIndex.load_from_string(json)

tools = [
    Tool(
        name="Paul Graham Essay",
        func=lambda q: str(index.query(q)),
        description="Useful for the generating the answers about Paul Graham",
        return_direct=False
    ),
]

llm = OpenAI(temperature=0)
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)

agent.run("What did the author do growing up?")

Lambdaで動かすように書き換える必要はあるけど、取り回しが単純で良さそう。JSONのファイルサイズと読み込み時のレスポンスは気になるところだけど、

https://dev.classmethod.jp/articles/llamaindex-lambda-query/

上記にある通り、

  • エフェメラルストレージを使う
  • handler外にインデックスを読み出す

等の対応でなんとかなりそう。LLMのレスポンス時間そのものはプロンプト頑張るしかない。

kun432kun432

Memory

https://python.langchain.com/en/latest/modules/memory/how_to_guides.html

会話履歴のため。ざっと見る限りは以下。

  • Postgres Chat Message History → RDS
  • Redis Chat Message History → ElastiCache

ただしAPIリファレンスを見ると、DynamoDBも使える模様。

https://python.langchain.com/en/latest/_modules/langchain/memory/chat_message_histories/dynamodb.html

以下のサイトでもDynamoDBを使っているが、boto3を使っている様子。

https://qiita.com/hideki/items/ebd3b0e83912c2d72453

DynamoDBが良さそう。

まあdictでファイルに出力しておいてというのもアリではある。どこまでの会話履歴を残すかによるけど。

やってみた

DynamoDBChatMessageHistoryのドキュメントはまったくないのだが、以下のRedisChatMessageHistoryが参考になる。

https://python.langchain.com/en/latest/modules/memory/examples/agent_with_memory_in_db.html

予めDynamoDB側にtableを作成しておく。このときインデックスを"SessionId"という名前で作っておくこと。

では、まずは単純なChatMessageHistoryだけ。DynamoDBChatMessageHistoryでテーブル名とインデックスキーとなるセッションIDを指定する。あとはadd_user_message/add_ai_messageで会話を追加していくだけ。

from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory

message_history = DynamoDBChatMessageHistory(table_name="langchain-sample", session_id='s000001')
message_history.add_user_message("hi!")
message_history.add_ai_message("whats up?")
message_history.add_user_message("I'm great. How about you?")
message_history.add_ai_message("I'm great too. Thanks!")

message_history.messages

結果

[HumanMessage(content='hi!', additional_kwargs={}),
 AIMessage(content='whats up?', additional_kwargs={}),
 HumanMessage(content="I'm great. How about you?", additional_kwargs={}),
 AIMessage(content="I'm great too. Thanks!", additional_kwargs={})]

DynamoDB側も見てみる。

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('langchain-sample')

ret = table.get_item(Key={"SessionId": "s000001"})
ret["Item"]

ちゃんとDynamoDB側に会話履歴が入っているのがわかる。

{'SessionId': 's000001',
 'History': [{'type': 'human',
   'data': {'content': 'hi!', 'additional_kwargs': {}}},
  {'type': 'ai', 'data': {'content': 'whats up?', 'additional_kwargs': {}}},
  {'type': 'human',
   'data': {'content': "I'm great. How about you?", 'additional_kwargs': {}}},
  {'type': 'ai',
   'data': {'content': "I'm great too. Thanks!", 'additional_kwargs': {}}}]}

ではシンプルにConversationChainで使ってみる。

from langchain.llms import OpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
import os

os.environ["OPENAI_API_KEY"] = "xxxxxxxxxxxx"

llm = OpenAI(temperature=0)

message_history = DynamoDBChatMessageHistory(table_name="langchain-sample", session_id='s000001')

memory = ConversationBufferMemory(chat_memory=message_history)

conversation = ConversationChain(
    llm=llm, 
    verbose=True, 
    memory=memory
)

conversation.predict(input="Hi there!")
> Entering new ConversationChain chain...
Prompt after formatting:
The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.

Current conversation:

Human: Hi there!
AI:

> Finished chain.
" Hi there! It's nice to meet you. How can I help you today?"
conversation.predict(input="How's weather there?")
> Entering new ConversationChain chain...
Prompt after formatting:
The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.

Current conversation:
Human: Hi there!
AI:  Hi there! It's nice to meet you. How can I help you today?
Human: How's weather there?
AI:

> Finished chain.
" The weather here is sunny and warm. The temperature is currently 75 degrees Fahrenheit and the humidity is low. It's a great day for outdoor activities!"

ちゃんと会話に含まれているのがわかる。DynamoDB側にもちゃんと入っていた。

{'SessionId': 's000001',
 'History': [{'type': 'human',
   'data': {'content': 'Hi there!', 'additional_kwargs': {}}},
  {'type': 'ai',
   'data': {'content': " Hi there! It's nice to meet you. How can I help you today?",
    'additional_kwargs': {}}},
  {'type': 'human',
   'data': {'content': "How's weather there?", 'additional_kwargs': {}}},
  {'type': 'ai',
   'data': {'content': " The weather here is sunny and warm. The temperature is currently 75 degrees Fahrenheit and the humidity is low. It's a great day for outdoor activities!",
    'additional_kwargs': {}}}]}
kun432kun432

全然関係ないけど、元々は↓をやりたい、かつ、AWS上でどう構築するか?というが個人的なモチベーションだった。

https://blog.langchain.dev/improving-document-retrieval-with-contextual-compression/

まだ整理ができていないけど、ちょっと考えてみた。

  • LlamaIndexとLangChainを組み合わせるパターンは、LlamaIndexをLangChain Agentの単にインデックス検索ツールとして使うパターン。LlamaIndexのメソッドを使ってドキュメント検索している。
  • このときToolとしてはCustom Toolとして使っている

Contextual Compression Retrieverは、その名の通りretriver。retrieverはドキュメント検索の部分を抽象化したインタフェース。となると、LlamaIndexで作成したインデックスに対して、LangChainのVectorStoreクラスとしてas_retrieverメソッドでアクセスできる必要があると思う。

で、元々やりたかったのはVectorStoreをAWSリソースに入れておいていちいちVector化せずに読み出すだけにしたい、ということだったので話が変わってくる。

  • S3を単にストレージとして使うならば、LangChainのVectorStoreクラスで扱える形式である必要にする。例えばFAISSならindexファイル一つだけで良い様子。
  • OpenSearch/ElasticSearch、PGVector、Redis等のAWSサービスで使えそうなものにする。
  • Pinecone等のVectorSearchに特化したサービスを使う。

つまりLangChainネイティブな方向になるということ。ともうLlamaIndexを使う必要性がなくなる。
LlamaIndexのメリットは、

  • シンプルなインタフェース
  • LlamaHubにあるいろいろなLoader

だと思っているのだけど、LangChainと組み合わせて使う場合には、

LlamaIndex provides both Tool abstractions for a Langchain agent as well as a memory module.

https://gpt-index.readthedocs.io/en/latest/how_to/integrations/using_with_langchain.html

という点を踏まえて、やりたいことが実現できるかを考える必要がある、というのは今の認識。

あとはやっぱり組み合わせて使うと、こういうケースで両方調べないといけないってのがしんどいかもしれない。

(LlamaIndexをそこまで使い込んでるわけではないので間違ってたらご容赦ください)

kun432kun432

ざっと見た感じ、ファイルベースのものは読み出し・保存のインタフェースが用意されてる。

  • FAISS
  • Annoy
  • Chroma

以前にChromaは少し試してみた限りはディレクトリの中にデータが保存されるように思える。S3だとチョット使いづらいかもしれない。

FAISSとAnnoyはどうやら1ファイルで使える様子。

https://python.langchain.com/en/latest/modules/indexes/vectorstores/examples/faiss.html

https://python.langchain.com/en/latest/modules/indexes/vectorstores/examples/annoy.html

この辺に比較もある。

https://qiita.com/wasnot/items/20c4f30a529ae3ed5f52

ざっくり見た感じはとりあえずFAISSが良さそうな雰囲気を感じたので、これで進めてみる。

kun432kun432

とりあえずFAISSもディレクトリになっていた。

from langchain.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS

loader = UnstructuredFileLoader("sample.txt")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap = 100, length_function = len,)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local("sample_index")
!ls -lt 
drwxrwxr-x 2 ec2-user ec2-user   4096 Apr 30 14:06 sample_index
! ls -lt sample_index
-rw-rw-r-- 1 ec2-user ec2-user 129069 Apr 30 14:09 index.faiss
-rw-rw-r-- 1 ec2-user ec2-user  30726 Apr 30 14:09 index.pkl

LangChainのソースはこの辺

https://github.com/hwchase17/langchain/blob/master/langchain/vectorstores/faiss.py#L430-L449

なるほど、インデックスそのもの(.index)と、docstore と index_to_docstore_id がpickle化されたもの(.pkl)を含んだディレクトリとして出力される様子。

いろいろ考えたけど、考えるのが面倒になったので、ディレクトリごとコピーすることにした。雑に書いてみた。

import os
import boto3
from langchain.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS

bucket_name = "sample_bucket"
orig_docs_dir = "orig_docs"
file_name = "sample.txt"

os.environ['OPENAI_API_KEY'] = "xxxxxxxxxx"

def upload_dir_s3(dirpath, s3bucket):
    for root,dirs,files in os.walk(dirpath):
        for file in files:
            s3bucket.upload_file(os.path.join(root,file),os.path.join(root,file))

def download_dir_s3(dirpath, s3bucket):
    for obj in s3bucket.objects.filter(Prefix = dirpath):
        if not os.path.exists(os.path.dirname(obj.key)):
            os.makedirs(os.path.dirname(obj.key))
        s3bucket.download_file(obj.key, obj.key)
        
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket_name)
download_dir_s3(orig_docs_dir, s3_bucket)

loader = UnstructuredFileLoader(f"{orig_docs_dir}/{file_name}")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap = 100, length_function = len,)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local("sample_index")

upload_dir_s3("sample_index", s3_bucket)
  • S3バケットにある"orig_docs"ディレクトリをまるっとコピー
  • ファイルを読み出してベクトル化
  • save_localメソッドでディスク上にディレクトリとして出力。上の例だと"sample_index"
  • S3バケットにディレクトリごとまるっとコピー

これでベクトル化されたインデックスがS3上に作成される。

でインデックス作成と実際に呼び出すのは別プロセスとしてやりたいので、呼び出すときはこんな感じ。上とさして変わらない。

import os
import boto3
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS

bucket_name = "my-sagemaker-notebook-bucket"
index_dir = "sample_index"

os.environ['OPENAI_API_KEY'] = "xxxxxxxxx"

def download_dir_s3(dirpath, s3bucket):
    for obj in s3bucket.objects.filter(Prefix = dirpath):
        if not os.path.exists(os.path.dirname(obj.key)):
            os.makedirs(os.path.dirname(obj.key))
        s3bucket.download_file(obj.key, obj.key)
        
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket_name)
download_dir_s3(index_dir, s3_bucket)

embeddings = OpenAIEmbeddings()
db = FAISS.load_local(index_dir, embeddings)

(・・・)
kun432kun432

このテーマとはちょっとずれるけど、改めてLangChainのChainとAgentsについて考えてみた。

  • Chainにはいろいろな種類がある。
  • Chainだけで完結することもできるし、(複数の)ChainをAgentのツールとして使うこともできる

で慣れていくといろんなことをやりたくなる。

  • LLMへのプロンプト、そしてそのレスポンスをキャッシュしておいて、同じような質問が来たらLLMに問い合わせずにキャッシュで返したい
  • 自前のドキュメントをベクトルDB化しておいて、そこから返したい
  • 会話の履歴を保持しておいて、永続的なコンテキストとして渡したい
  • 特定の入力に対してはフィルタを掛けたい
  • LLMの応答結果を検証して適切なものだけを返したい
  • その他色々・・・
  • キャッシュをChainの手前で最初にかけたい。

全て満たせるような既存のChainがあればそれ使えばよいのだけど、自分の要件を満たせるChainがない、既存のChainの挙動を少し変えたい、独自のToolを使いたい、とか考え出すと、一気に複雑になる。LangChainのAgentとChainはやっぱり概念が少し曖昧な気がする。

このスクラップは2023/07/08にクローズされました