🕌

RAGベンチマーク

2025/02/14に公開
src/Benchmark/manager.py
import os
import glob
import csv
import logging
from typing import List, Dict
import pandas as pd
import asyncio

from ..utils.downloader import AsyncDownloader
from ..PDFExtractor.extractor import AsyncPDFExtractor

class BenchmarkDataManager:
    LOG_FILE = "benchmark_data_manager.log"
    SOURCE_DOCUMENTS_URL = "hf://datasets/allganize/RAG-Evaluation-Dataset-JA/documents.csv"
    SOURCE_EVAL_RESULTS_URL = "hf://datasets/allganize/RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv"
    DOCUMENT_DIR = "benchmark_documents" # ソースドキュメントの保存先ディレクトリ
    BENCHMARK_DATA_DIR = "benchmark_knowledge_data" # ソースドキュメントを加工したデータの保存先ディレクトリ

    SOURCE_DOCUMENTS_FILE = "documents.csv"
    SOURCE_EVAL_RESULTS_FILE = "rag_evaluation_result.csv"
    SOURCE_QUESTIONS_FILE = "questions.csv"

    csv_file_path: str # BenchmarkデータのCSVファイルパス
    data: List[Dict] # Benchmarkデータのリスト
    logger: logging.Logger # ロガー

    def __init__(self, csv_file_path: str=None):
        """
        初期化時にCSVファイルを指定し、データをロードする。
        
        Args:
            csv_file_path (str): CSVファイルのパス
        """
        self.csv_file_path = csv_file_path
        self.data = []
        self.logger = self._setup_logger()

        os.makedirs(self.BENCHMARK_DATA_DIR, exist_ok=True)
        os.makedirs(self.DOCUMENT_DIR, exist_ok=True)

    def _setup_logger(self):
        """
        ロガーのセットアップ。

        Returns:
            logging.Logger: ログ用のロガーインスタンス
        """
        logger = logging.getLogger("BenchmarkDataManager")
        handler = logging.FileHandler(self.LOG_FILE)
        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
        return logger
    
    ### ソースデータの読み込む処理 ###

    def download_source(self):
        """
        ソースデータをダウンロードする。
        """
        self._download_source_list()
        self._download_source_documents()

    def _download_source_list(self):
        """
        ソースドキュメントリストのcsvをダウンロードする。
        """
        # ソースドキュメントリストをダウンロード
        documents_df = pd.read_csv(self.SOURCE_DOCUMENTS_URL)
        documents_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}", index=True)
        self.logger.info(f"ソースドキュメントリストをダウンロードしました: {self.SOURCE_DOCUMENTS_URL}. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")

        # ソース評価結果をダウンロード
        eval_results_df = pd.read_csv(self.SOURCE_EVAL_RESULTS_URL)
        eval_results_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_EVAL_RESULTS_FILE}", index=True)
        self.logger.info(f"ソース評価結果をダウンロードしました: {self.SOURCE_EVAL_RESULTS_URL}. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_EVAL_RESULTS_FILE}")
        
        # ソース質問リストを作成
        question_df = eval_results_df.loc[:, ["question", "target_answer", "target_file_name", "target_page_no", "domain", "type"]]
        question_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_QUESTIONS_FILE}", index=True)
        self.logger.info(f"ソース質問リストを作成しました. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_QUESTIONS_FILE}")

    def _download_source_documents(self):
        download_tasks = []

        documents_df = pd.read_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")
        # Fetch the documents
        for idx, row in documents_df.iterrows():
            domain = row["domain"]
            url = row["url"]
            filename = row["file_name"]
            save_dir = os.path.join(self.DOCUMENT_DIR, domain)

            download_tasks.append([url, save_dir, filename])

        downloader = AsyncDownloader()

        # イベントループを実行
        asyncio.create_task(downloader.download_all(download_tasks))

        total = len(documents_df)
        downloaded = len(glob.glob(f"{self.DOCUMENT_DIR}/**/*.pdf", recursive=True))
        if total == downloaded:
            self.logger.info(f"ソースドキュメントをすべてダウンロードしました: {downloaded}/{total}")
        else:
            self.logger.error(f"ソースドキュメントのダウンロードに失敗しました: {downloaded}/{total}. {downloader.LOG_FILE} を確認してください。")

    
    ### ナレッジデータの作成処理 ###

    async def create_knowledge_data(self, use_llm: bool = False):
        """
        ナレッジデータを作成する。
        """
        save_name = "document_page_data.csv"
        if use_llm:
            save_name = "document_page_data(pdfminer_with_llm).csv"
        else:
            save_name = "document_page_data(pdfminer).csv"

        csv_header = [
            "document_id",
            "domain",
            "title",
            "page",
            "url",
            "file_name",
            "publisher",
            "page_number",
            "page_numebr(zero_indexed)",
            "content"
        ]

        documents_df = pd.read_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")
        total = len(documents_df)
        extractor = AsyncPDFExtractor()

        with open(f"{self.BENCHMARK_DATA_DIR}/{save_name}", "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(csv_header)
            for idx, row in documents_df.iterrows():
                print(f"Processing {idx + 1}/{total}")
                domain = row["domain"]
                filename = row["file_name"]
                save_dir = os.path.join(self.BENCHMARK_DATA_DIR
                                        , domain)

                doc_path = os.path.join(save_dir, filename)
                if not os.path.exists(doc_path):
                    print(f"Failed to download {doc_path}")
                    continue

                
                if use_llm:
                    # Extract text from the PDF with LLM
                    doc_datas: list[AsyncPDFExtractor.DocData] = await extractor.process_pdf_with_llm(doc_path)
                    # datas.extend([DocData(page=d.page, domain=domain, filename=filename, content=d.formatted_content) for d in doc_datas])
                    for d in doc_datas:
                        writer.writerow([idx, domain, row["title"], row["page"], row["url"], filename, row["publisher"], d.page+1, d.page, d.formatted_content.replace("\n", r"\n")])
                
                else:
                    # Extract text from the PDF
                    doc_datas: list[AsyncPDFExtractor.DocData] = await extractor.extract_doc_data(doc_path)
                    # datas.extend([DocData(page=d.page, domain=domain, filename=filename, content=d.content) for d in doc_datas])
                    for d in doc_datas:
                        writer.writerow([idx, domain, row["title"], row["page"], row["url"], filename, row["publisher"], d.page+1, d.page, d.content.replace("\n", r"\n")])        
        

    ### benchmark_knowledge_dataの処理 ###

    def load_csv(self, csv_file_path: str = None):
        """
        CSVファイルを読み込み、データを内部に保持する。
        """
        if csv_file_path:
            self.csv_file_path = csv_file_path
        try:
            with open(self.csv_file_path, "r", encoding="utf-8") as file:
                reader = csv.DictReader(file)
                self.data = [row for row in reader]
                self.logger.info(f"CSVファイルを正常に読み込みました: {self.csv_file_path}")
        except Exception as e:
            self.logger.error(f"CSVファイルの読み込み中にエラーが発生しました: {e}")
            raise e

    def get_data_by_domain(self, domain: str) -> List[Dict]:
        """
        指定されたドメインのデータを取得する。

        Args:
            domain (str): 取得するデータのドメイン

        Returns:
            List[Dict]: ドメインに該当するデータのリスト
        """
        filtered_data = [row for row in self.data if row.get("domain") == domain]
        if not filtered_data:
            self.logger.warning(f"指定されたドメインに該当するデータが見つかりません: {domain}")
        return filtered_data

    def get_content_for_knowledge(self, domain: str) -> List[str]:
        """
        指定されたドメインのデータからcontentカラムを取得する。

        Args:
            domain (str): 取得するデータのドメイン

        Returns:
            List[str]: ドメインに該当するcontentカラムのリスト
        """
        data = self.get_data_by_domain(domain)
        return [row.get("content", "") for row in data]
    
    def get_domains(self) -> List[str]:
        """
        データに含まれるドメインの一覧を取得する。

        Returns:
            List[str]: データに含まれるドメインの一覧
        """
        return list(set(row.get("domain") for row in self.data))

    def log_error(self, message: str):
        """
        エラーメッセージをログに記録する。

        Args:
            message (str): 記録するエラーメッセージ
        """
        self.logger.error(message)

# 使用例
if __name__ == "__main__":
    # CSVファイルパスを指定
    manager = BenchmarkDataManager("benchmark_knowledge_data/document_page_data(pdfminer_with_llm).csv")

    # ドメインごとのデータを取得
    domain_data = manager.get_data_by_domain("it")
    print("ドメインデータ カウント:", len(domain_data))

    # ナレッジ登録用のcontentカラムを取得
    knowledge_content = manager.get_content_for_knowledge("it")
    print("ナレッジ登録データ カウント:", len(knowledge_content))
src/utils/downloader.py
import asyncio
import aiohttp
import os
import pathlib
from aiohttp import ClientTimeout
from tqdm.asyncio import tqdm
import logging

class AsyncDownloader:
    LOG_FILE = "AsyncDownloader_error.log"
    def __init__(self, concurrency: int = 5):
        """
        非同期ダウンロードクラス。

        Parameters
        ----------
        concurrency : int, default 5
            並列ダウンロードの上限数。
        """
        self.semaphore = asyncio.Semaphore(concurrency)
        logging.basicConfig(
            filename=self.LOG_FILE,
            level=logging.ERROR,
            format="%(asctime)s [%(levelname)s] %(message)s",
        )

    async def fetch(self, session: aiohttp.ClientSession, url: str, save_path: str):
        """
        URLからファイルを非同期でダウンロードする。

        Parameters
        ----------
        session : aiohttp.ClientSession
            非同期HTTPセッション。
        url : str
            ダウンロードするファイルのURL。
        save_path : str
            保存先のファイルパス。
        """
        try:
            if os.path.exists(save_path):
                print(f"File already exists: {save_path}")
                return
            async with self.semaphore:
                async with session.get(url, timeout=ClientTimeout(total=30)) as response:
                    response.raise_for_status()
                    pathlib.Path(save_path).parent.mkdir(parents=True, exist_ok=True)
                    with open(save_path, "wb") as f:
                        async for chunk in response.content.iter_chunked(1024):
                            f.write(chunk)
            print(f"Downloaded: {url} -> {save_path}")
        except Exception as e:
            logging.error(f"Failed to download {url}. save_path {save_path}. Error: {str(e)}")

    async def download_all(self, tasks: list):
        """
        リスト内のすべてのダウンロードタスクを実行する。

        Parameters
        ----------
        tasks : list
            [url, 保存先ディレクトリ, ファイル名] のリスト。
        """
        async with aiohttp.ClientSession() as session:
            download_tasks = []
            for url, directory, filename in tasks:
                save_path = os.path.join(directory, filename)
                download_tasks.append(self.fetch(session, url, save_path))

            for task in tqdm(asyncio.as_completed(download_tasks), total=len(download_tasks)):
                await task

if __name__ == "__main__":
    # ダウンロードタスクの例
    download_tasks = [
        ["https://example.com/file1.jpg", "downloads/images", "file1.jpg"],
        ["https://example.com/file2.jpg", "downloads/images", "file2.jpg"],
        ["https://example.com/file3.jpg", "downloads/docs", "file3.jpg"],
    ]

    # 並列数を指定してダウンローダを作成
    downloader = AsyncDownloader(concurrency=3)

    # イベントループを実行
    # await downloader.download_all(download_tasks)
    asyncio.run(downloader.download_all(download_tasks))
src/PDFExtractor/extractor.py
import asyncio
import os, time
import pathlib
from tqdm.asyncio import tqdm
import logging
import pdfminer
import pdfminer.high_level
from dataclasses import dataclass
from pdf2image import convert_from_path
import openai
import aiofiles
import base64

from .prompt import SYSTEM_PROMPT, USER_PROMPT

# pdfminerを使ってPDFからテキストを抽出する
class AsyncPDFExtractor:
    concurrency: int
    retry: int
    wait_time: int
    LOG_FILE = "AsyncPDFExtractor_error.log"

    @dataclass
    class DocData:
        page: int
        filename: str
        content: str
        formatted_content: str | None = None
        
    def __init__(self, concurrency: int = 5, retry: int = 10, wait_time: int = 30):
        self.semaphore = asyncio.Semaphore(concurrency)
        self.retry = retry
        self.wait_time = wait_time
        logging.basicConfig(
            filename=self.LOG_FILE,
            level=logging.ERROR,
            format="%(asctime)s [%(levelname)s] %(message)s",
        )

    async def extract_by_llm(self, text_content: str, image_path: str) -> str:
        openai_client = openai.AsyncAzureOpenAI(
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            api_version=os.getenv("OPENAI_API_VERSION")
        )

        for try_c in range(self.retry):
            try:
                # 画像をBase64エンコードする
                async with aiofiles.open(image_path, 'rb') as image_file:
                    image_data = await image_file.read()
                base64_image = base64.b64encode(image_data).decode('utf-8')

                response = await openai_client.chat.completions.create(
                    model="gpt-4o",
                    messages=[
                        {"role": "system", "content": SYSTEM_PROMPT},
                        {
                            "role": "user",
                            "content": [
                                {
                                    "type": "text",
                                    "text": USER_PROMPT.format(pdfminer_text=text_content),
                                },
                                {
                                    "type": "image_url",
                                    "image_url": {"url": f"data:image/png;base64,{base64_image}"},
                                },
                            ],
                        }
                    ],
                    temperature=0.0
                )
                return response.choices[0].message.content
            except Exception as e:
                if "429" in str(e):
                    logging.warning(f"Rate Limit to process with LLM: {try_c+1}/{self.retry} {self.wait_time}秒待機します。")
                    time.sleep(self.wait_time)
                    continue
                else:
                    logging.error(f"Failed to process with LLM: {str(e)}")
                    return ""

    def get_pages(self, pdf_path: str) -> int:
        # zero-indexed page numbers
        return [r.pageid - 1 for r in pdfminer.high_level.extract_pages(pdf_path)]
    
    async def extract_text(
        self, 
        pdf_path: str, 
        page_numbers: list[str]=None # List of zero-indexed page numbers to extract.
    ) -> str:
        async with self.semaphore:
            content_text = pdfminer.high_level.extract_text(pdf_path, page_numbers=page_numbers)
            return content_text
        
    async def extract_doc_data(self, pdf_path: str) -> list[DocData]:
        pages = self.get_pages(pdf_path)
        doc_data = []
        for page in pages:
            content = await self.extract_text(pdf_path, [page])
            doc_data.append(self.DocData(page=page, filename=pdf_path, content=content))
        return doc_data
    
    async def extract_page_image(self, pdf_path: str, page_number: int, output_dir: str = "images") -> str:
        try:
            pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
            images = convert_from_path(pdf_path, first_page=page_number + 1, last_page=page_number + 1, dpi=300)
            image_path = os.path.join(output_dir, f"page_{page_number + 1}.png")
            images[0].save(image_path, "PNG")
            return image_path
        except Exception as e:
            logging.error(f"Failed to convert PDF page to image: {str(e)}")
            return ""

    async def process_pdf_with_llm(self, pdf_path: str) -> list[DocData]:
        try:
            doc_data = await self.extract_doc_data(pdf_path)
            tasks = []

            async def process_page(data: AsyncPDFExtractor.DocData):
                async with self.semaphore:
                    image_path = await self.extract_page_image(pdf_path, data.page)
                    if image_path:
                        formatted_content = await self.extract_by_llm(data.content, image_path)
                        data.formatted_content = formatted_content
                        # 画像ファイルを削除
                        try:
                            os.remove(image_path)
                            logging.info(f"Deleted temporary image: {image_path}")
                        except Exception as e:
                            logging.error(f"Failed to delete temporary image {image_path}: {str(e)}")
                        return formatted_content
                    return ""

            for data in doc_data:
                tasks.append(process_page(data))

            results = await asyncio.gather(*tasks)
            return doc_data
        except Exception as e:
            logging.error(f"Failed to process PDF with LLM: {str(e)}")
            return []
src/PDFExtractor/prompt.py
SYSTEM_PROMPT = """以下の情報を元に、指定されたPDFページをMarkdown形式の純粋なテキストに変換してください。

- **画像データ**: そのページ全体のレイアウトや表形式を含む画像データ
- **抽出されたテキスト**: pdfminerで抽出されたそのページのテキストデータ(正確な文章を含む)

### 注意事項
1. **pdfminerのテキストをベース**にし、正確な文章として利用してください。
2. **画像データ**は以下の用途に使用してください:
   - 表形式が壊れている場合に正確に補完。
   - 見出しや箇条書きのフォーマットが壊れている場合に修正。
   - テキストが配置されている順序や構造を正確に保つため。
3. 画像データのフォーマットでMarkdown形式に変換するのが困難な場合は、「(※llm: 要確認箇所)」と困難な箇所のテキストの後ろに追記してください。
4. 出力は**Markdown形式の純粋なテキスト**とし、コードブロックや装飾で囲まないでください(例: 「```markdown」などは使用しないでください)。

### 入力情報
- **画像データ**: [ページ全体のスクリーンショットやスキャン画像]
- **pdfminerテキスト**: [ページのテキスト]

必要に応じて、不足している内容があれば質問してください。

### 出力例
1. 見出し(例: `# 見出し`)
2. 箇条書きリスト(例: `- リストアイテム`)
3. 表(例: `| 見出し1 | 見出し2 |`)

最適なMarkdownフォーマットを適用し、フォーマットの壊れた箇所を修正してください。
"""

USER_PROMPT = "# pdfminerテキスト\n{pdfminer_text}"
import pandas as pd
import os
save_dir = "012_results"
os.makedirs(save_dir, exist_ok=True)

question_csv = "benchmark_documents/question.csv"

question_df = pd.read_csv(question_csv)

def rank_questions_by_domain(domain: str, search_class)->tuple[pd.DataFrame, float]:
    print(f"Ranking questions for domain: {domain}")
    datas = data_manager.get_data_by_domain(domain)
    document_indexs = [[d["document_id"], d["page_number"], d["file_name"]] for d in datas]
    # document_corpus = [d["content"] for d in datas]
    document_corpus = []
    for d in datas:
        if d["content"]:
            document_corpus.append(d["content"])
        else:
            document_corpus.append("-")


    search_class.embedding_corpus(document_corpus)

    domain_question_df = question_df[(question_df["domain"] == domain) & (question_df["type"] != "image")]
    domain_question_reuslt_df = domain_question_df.copy()
    domain_question_reuslt_df["rank"] = -1
    domain_question_reuslt_df["rank_score"] = -1.0
    domain_question_reuslt_df["in_a_rank"] = False

    for row in domain_question_df.itertuples():
        # print(row.question)
        (ranked_indices, ranked_scores) = search_class.rank_documents(row.question)
        for i, (index, score) in enumerate(zip(ranked_indices, ranked_scores)):
            (document_id, page_number, file_name) = document_indexs[index]

            if file_name == row.target_file_name and page_number == str(row.target_page_no):
                domain_question_reuslt_df.at[row.Index, "rank"] = i + 1
                domain_question_reuslt_df.at[row.Index, "rank_score"] = score
                domain_question_reuslt_df.at[row.Index, "in_a_rank"] = True
                break
    score = len(domain_question_reuslt_df[domain_question_reuslt_df["in_a_rank"] == True])/len(domain_question_reuslt_df)
    print(f"Domain: {domain}, Score: {score}")
    return (domain_question_reuslt_df, score)


if __name__ == "__main__":
    mixpr_tfidf = MixPR(TfidfVectorizerWrapper())
    total_question_reuslt_df = pd.DataFrame()
    scores = []
    for domain in data_manager.get_domains():
        (domain_question_reuslt_df, score) = rank_questions_by_domain(domain, mixpr_tfidf)

        total_question_reuslt_df = pd.concat([total_question_reuslt_df, domain_question_reuslt_df])
        scores.append([domain, score])
    print(scores)
    # テキストは改行させない
    total_question_reuslt_df.to_csv(os.path.join(save_dir, "total_question_reuslt(tfidf).csv"), index=False)

Discussion