AIとの会話をGitで管理する方法

2024/11/13に公開

概要

休日に「GPTとの会話、全部記録して後から分析できたら面白そうだな〜」と思って、Git でちょちょいと管理してみたんです。
これが色んな遊び方ができそうだったので共有します!

「あの時なんて話してたっけ?」ってすぐ検索できる
過去のどの時点の会話でも完全に復元できる(タイムマシンみたい)
色んな分析や集計もできそう(データ分析の練習台として👾)

コードは全部で100行程度なのでかんたん、休日のちょっとした実験にピッタリかもです。
正直、「ものは試し!」くらいの気持ちで始めただけなんですが、
改良を考えてみたら面白い使い方とか分析方法が思いつくかもしれません。

とりあえず実装の仕組みを説明していきます。
コードも載せてますが、雰囲気で読んでもらえれば🙆‍♂️

インフラの理解

ollama
open-webui
dockerの知識、pythonの知識

構成図

ドメイン

プロジェクトの中核となる「ドメイン」は、特定の技術(GitやOllamaなど)に依存しない、純粋なビジネスロジックを表現する部分です。

たとえば「AIと会話して、その履歴を保存する」という機能は、「どこかにAIがいて」「どこかに保存場所がある」ことだけを前提に設計できます。AIが具体的にOllamaなのかChatGPTなのか、保存先がGitなのかデータベースなのかは、この時点では考えなくて良いのです。

このように技術的な実装から切り離すことで、仕様変更に強く、テストが書きやすく、コードの意図が明確になります。以下のコードは、そのドメインレイヤーの実装です。

chat/bot.py

from typing import Protocol, List, Dict, Generator, Union, Iterator, Any


class Bot(Protocol):
    def chat(self, messages: List[Dict[str, str]]) -> Generator[Dict[str, Any], None, None]:
        """
        メッセージ履歴を受け取り、応答をストリーミングで返す
        """
        ...

chat/repository.py

class Repository(Protocol):
    def save(self, user_id: str, chat_id: str, messages: List[Dict[str, str]]) -> None:
        """
        会話を永続化する
        """
        ...

chat/manager.py

botにメッセージを取り次いだり、双方の会話をrepositoryに保存したりする、チャットのマネージャーさんです

class Manager:
    def __init__(self, bot: Bot, repository: Repository):
        self.bot = bot
        self.repository = repository

    def mediate(
        self,
        user_id: str,
        chat_id: str,
        messages: List[dict]
    ) -> Generator[Dict[str, Any], None, None]:
        """
        新しいメッセージを処理し、応答をストリーミングで返す

        Args:
            user_id: ユーザーID
            chat_id: チャットID
            messages: メッセージ履歴

        Returns:
            Union[str, Generator, Iterator]: API応答のストリーム
        """
        # ユーザーからのメッセージを保存
        self.repository.save(user_id, chat_id, messages)

        # ストリーミングレスポンスを生成
        response_stream = self.bot.chat(messages)

        return self._stream_and_save(response_stream, user_id, chat_id, messages)

    def _stream_and_save(
        self,
        response_stream: Generator[Dict[str, Any], None, None],
        user_id: str,
        chat_id: str,
        messages: List[dict],
    ) -> Generator[Dict[str, Any], None, None]:
        """
        レスポンスをストリームしながら内容を蓄積し、完了後に保存する
        """
        full_response = ""

        for chunk in response_stream:
            if chunk.get('message', {}).get('content'):
                content = chunk['message']['content']
                full_response += content
            yield chunk

        # ストリーミング完了後、履歴を保存
        assistant_message = {
            "role": "assistant",
            "content": full_response,
        }

        messages.append(assistant_message)
        self.repository.save(user_id, chat_id, messages)

実装

前章で示したドメインを、実際の技術を使って実装していきます。ここでは具体的に:

  • AIの対話にはOllamaを
  • 履歴の保存にはGitを
  • コントローラーにはopen-webuiのpipelinesを

使用しています。

先ほどのドメインレイヤーで定義したインターフェース(Bot、Repository)に対して、それぞれの具体的な実装(OllamaBot、GitRepository)を提供します。この方式のメリットは、例えばOllamaをChatGPTに置き換えたい場合、新しいChatGPTBotクラスを作るだけで、他のコードは一切変更する必要がないという点です。

また、これらの実装をopen-webuiのパイプラインとして統合することで、ブラウザから簡単に利用できるUIを提供しています。以下が各実装のコードです:

chat/ollama_bot.py

from typing import Dict, List, Any, Generator, Literal, cast
from ollama import Client, Message


class OllamaBot:
    def __init__(self, host: str = "http://ollama:11434"):
        self.client = Client(host=host)

    def chat(self, messages: List[Dict[str, str]]) -> Generator[Dict[str, Any], None, None]:
        ollama_messages = [Message(
            role=cast(
                Literal['user', 'assistant', 'system', 'tool'], m['role']
            ),
            content=m['content']
        ) for m in messages]
        stream = self.client.chat(
            model='jaahas/gemma-2-9b-it-abliterated',
            messages=ollama_messages,
            stream=True
        )

        for chunk in stream:
            if chunk.get('message', {}).get('content'):
                yield dict(chunk)

chat/git_repository.py

import os
import json
import git
from typing import Dict, List


class GitRepository:
    def __init__(self, base_path: str = "./repositories"):
        self.base_path = base_path
        if not os.path.exists(base_path):
            os.makedirs(base_path)

    def save(self, user_id: str, chat_id: str, messages: List[Dict[str, str]]) -> None:
        # Initialize user repository
        repo_path = os.path.join(self.base_path, user_id)
        if not os.path.exists(repo_path):
            os.makedirs(repo_path)
            repo = git.Repo.init(repo_path)
        else:
            repo = git.Repo(repo_path)

        # Save messages
        file_path = os.path.join(repo_path, f"{chat_id}.json")
        with open(file_path, 'w') as f:
            json.dump({"messages": messages}, f, indent=2, ensure_ascii=False)

        # Commit
        repo.index.add([f"{chat_id}.json"])
        repo.index.commit(f'Update chat {chat_id}')

pipelines/chathub.py

from typing import List, Union, Generator, Iterator, Optional
from chathub import chat
import json
from datetime import datetime


class Pipeline:
    def __init__(self):
        self.name = "ChatHub Pipeline"
        bot = chat.OllamaBot()
        repository = chat.GitRepository()
        self.manager = chat.Manager(bot, repository)
        pass

    async def on_startup(self):
        print(f"on_startup:{__name__}")
        pass

    async def on_shutdown(self):
        print(f"on_shutdown:{__name__}")
        pass

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        body["pipeline_metadata"] = {"chat_id": body["chat_id"]}
        return body

    def convert_stream(self, stream) -> Generator[bytes, None, None]:
        for item in stream:
            created = int(datetime.fromisoformat(
                item['created_at'].replace('Z', '+00:00')).timestamp())

            response = {
                "id": "chatcmpl-421",
                "object": "chat.completion.chunk",
                "created": created,
                "model": item["model"],
                "system_fingerprint": "fp_ollama",
                "choices": [{
                    "index": 0,
                    "delta": {
                        "role": item["message"]["role"],
                        "content": item["message"]["content"]
                    },
                    "finish_reason": "stop" if item["done"] else None
                }]
            }

            yield f"data: {json.dumps(response)}".encode()

    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
        # This is where you can add your custom pipelines like RAG.
        if "user" in body:
            print("######################################")
            print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})')
            print(f"# Message: {user_message}")
            print("######################################")

        try:
            stream = self.manager.mediate(
                body["user"]["id"], body["pipeline_metadata"]["chat_id"], messages)

            return self.convert_stream(stream)
        except Exception as e:
            return f"Error: {e}"

まとめ

以下のようにして会話します
webui
すると、Gitリポジトリが作成され、会話が逐一Commitされます


GitHubリポジトリを爆速で全文検索の記事にある方法で、このリポジトリに対して検索を行うこともできます

会話履歴をGitで管理する中で、現状のコミットメッセージはシンプルすぎるかもしれないと感じています。例えば「これはユーザーの入力なのか、AIの応答なのか」といったメタ情報もコミットメッセージに含められると、会話の流れを後から追跡しやすくなりそうです。リポジトリの引数を少し修正するだけで実現できそうなので、そのうち試してみたいと思います。

このプロジェクトを通じて、AIプラットフォームの開発において気づいたことがあります。最近はノーコードツールが人気ですが、実際にコードを書いて開発する方が、システムの理解も管理も容易になることが多いと感じています。また、RAG(Retrieval-Augmented Generation)やAI同士の対話など、さまざまな拡張が可能です。これからも、プログラマティックにAIのオーケストレーションを探求していきたいと考えています。

興味深い発見として、人間の会話データの規模は意外とコンパクトです。一人の人間が一生涯で交わす会話量であっても、Gitで管理できる程度のサイズに収まると考えられます。これは、個人の知識や経験をデジタルで管理・活用することの可能性を示唆しています。

Discussion