👀

RAGのTutorialやってみた part4

2024/12/21に公開

前回したこと

from langgraph.graph import START, StateGraph

graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

retrieveとgenerateを使用してStateGraphのインスタンスを作りました。このインスタンスを今回は使用してRAGを体験していきます。

LangGraph

実際graphを使用していきます。

sync・async・streamについて

ここでは同期処理・非同期処理・ストリーム処理について説明します。もうわかっているよという方は飛ばして結構です。

graphはsyncとasyncとstreamに対応しています。syncというの同期処理のことで、ある処理Aをした時のその処理が終わるの待つのが同期処理です。その処理が終わらないと次の処理に行けないということです。これは普段書いているコードです。

import time
def process_A():
    time.sleep(1000)
    print(1)
    return 1

def process_B(num):
    time.sleep(100)
    print(num)
    added_num = num + 1
    return added_num

a = process_A() #Aという処理が終わるのを待つ
b = process_B(a) #Aという処理が終わったらBという処理をする 

timeというライブラリのsleepメソッドを使用すると、指定した秒数だけ待ちます。
この場合、process_Aで1000秒待って、1をプリントして、1を変数aに返し、それをprocess_Bに渡し、process_Bで100秒待って、受け取った値(a)に1を足し最後にその値を返却するという流れです。
感覚的は当たり前ですが、これを同期処理(sync)と言います。

次にasyncです。

import asyncio

if __name__ == '__main__':
    async def process_A():
        print('process A', flush=True)
        await asyncio.sleep(5)
        return 1

    def process_B():
        print('process B')
        return 2

    async def sample():
        a = await process_A()
        b = process_B()
        await asyncio.sleep(5)
        print(a, b)
        return a + b

    async def main():
        task = asyncio.create_task(sample())

        while not task.done():
            print('Task is not done. Loading')
            await asyncio.sleep(1)

        print('Task is done. Finished')
        result = await task
        print(f'Task result: {result}')

    asyncio.run(main())

と実行すると

Task is not done. Loading
process A
Task is not done. Loading
Task is not done. Loading
Task is not done. Loading
Task is not done. Loading
process B
Task is not done. Loading
Task is not done. Loading
Task is not done. Loading
Task is not done. Loading
Task is not done. Loading

このように表示されます。
そして10秒まつと

1 2
Task is done. Finished
Task result: 3

が追加で出力されます
awaitは非同期処理(async)を待ちますという宣言です。(同期処理にさせる)
上の例から、asyncの使い方にはルールがあります。

  1. async関数の中でしかawaitは使えない
    • asyncの中でawait asyncを使用するとき。
    • async関数をawaitせずに実行すると<coroutine object>というオブジェクトが返ってくる。
  2. task.done()などで待ち状態なのかそれとも処理が全て終わった状態なのかを判別して処理を分けることができる
    • これがasync処理の特徴です

他にも用途があるかもしれませんが、処理待ちなら〜で処理後なら〜のように処理を分けて使用する印象があります。LangGraphもこれに対応しているということです。

最後にストリーム処理(stream)これは常に通信し続けるものです。小川のように情報が常に流れ続けるイメージです。流れ続けて入ってくる情報を一つずつ処理できます。非同期だと全て情報が出揃うまで待たないといけないですが、streamは今入ってきている断片的な情報も取得できます。チャットなどはastreamです。チャットルームに入ると常にどこかのチャットデータベースを監視し、その情報が常に流れ続けるからリアルタイムでチャットができるのです。
コードでの例は示しませんがイメージが湧いたでしょうか?

graphでRAGの体験

同期処理

result = graph.invoke({"question": "What is Task Decomposition?"})

print(f'Context: {result["context"]}\n\n')
print(f'Answer: {result["answer"]}')

非同期処理

result = await graph.ainvoke(...)

このようにasync関数内部で使用することができます

stream処理(nodeごと)

for step in graph.stream(
    {"question": "What is Task Decomposition?"}, stream_mode="updates"
):
    print(f"{step}\n\n----------------\n")

出力

これはノードの処理をstreamで小出しにして送っています。そのため、先にretrieveの結果をprintして、ちょっと待った後に、generateの結果がprintされます。stream_modeをupdatesにすれば、このようなstreamにできるようです

stream処理(tokenごと)

for message, metadata in graph.stream(
    {"question": "What is Task Decomposition?"}, stream_mode="messages"
):
    print(message.content, end="|")

これはtokenをstreamで小出しにして渡しています。そのため、あるワードが届いたら表示が繰り返されます。

astream処理も対応しているようで、これはstreamで情報が流れてくる間に別の処理もしたいときに使えるようです。

もうすでにここでRAGを体験しています。今回はここまでですが、少しイメージが掴めたでしょうか?
次回はカスタムプロンプトやクエリ分析をやってみたいと思います。
https://zenn.dev/kurutazoku/articles/a387fe4091855b

Discussion