🕌
RAGベンチマーク
src/Benchmark/manager.py
import os
import glob
import csv
import logging
from typing import List, Dict
import pandas as pd
import asyncio
from ..utils.downloader import AsyncDownloader
from ..PDFExtractor.extractor import AsyncPDFExtractor
class BenchmarkDataManager:
LOG_FILE = "benchmark_data_manager.log"
SOURCE_DOCUMENTS_URL = "hf://datasets/allganize/RAG-Evaluation-Dataset-JA/documents.csv"
SOURCE_EVAL_RESULTS_URL = "hf://datasets/allganize/RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv"
DOCUMENT_DIR = "benchmark_documents" # ソースドキュメントの保存先ディレクトリ
BENCHMARK_DATA_DIR = "benchmark_knowledge_data" # ソースドキュメントを加工したデータの保存先ディレクトリ
SOURCE_DOCUMENTS_FILE = "documents.csv"
SOURCE_EVAL_RESULTS_FILE = "rag_evaluation_result.csv"
SOURCE_QUESTIONS_FILE = "questions.csv"
csv_file_path: str # BenchmarkデータのCSVファイルパス
data: List[Dict] # Benchmarkデータのリスト
logger: logging.Logger # ロガー
def __init__(self, csv_file_path: str=None):
"""
初期化時にCSVファイルを指定し、データをロードする。
Args:
csv_file_path (str): CSVファイルのパス
"""
self.csv_file_path = csv_file_path
self.data = []
self.logger = self._setup_logger()
os.makedirs(self.BENCHMARK_DATA_DIR, exist_ok=True)
os.makedirs(self.DOCUMENT_DIR, exist_ok=True)
def _setup_logger(self):
"""
ロガーのセットアップ。
Returns:
logging.Logger: ログ用のロガーインスタンス
"""
logger = logging.getLogger("BenchmarkDataManager")
handler = logging.FileHandler(self.LOG_FILE)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
### ソースデータの読み込む処理 ###
def download_source(self):
"""
ソースデータをダウンロードする。
"""
self._download_source_list()
self._download_source_documents()
def _download_source_list(self):
"""
ソースドキュメントリストのcsvをダウンロードする。
"""
# ソースドキュメントリストをダウンロード
documents_df = pd.read_csv(self.SOURCE_DOCUMENTS_URL)
documents_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}", index=True)
self.logger.info(f"ソースドキュメントリストをダウンロードしました: {self.SOURCE_DOCUMENTS_URL}. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")
# ソース評価結果をダウンロード
eval_results_df = pd.read_csv(self.SOURCE_EVAL_RESULTS_URL)
eval_results_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_EVAL_RESULTS_FILE}", index=True)
self.logger.info(f"ソース評価結果をダウンロードしました: {self.SOURCE_EVAL_RESULTS_URL}. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_EVAL_RESULTS_FILE}")
# ソース質問リストを作成
question_df = eval_results_df.loc[:, ["question", "target_answer", "target_file_name", "target_page_no", "domain", "type"]]
question_df.to_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_QUESTIONS_FILE}", index=True)
self.logger.info(f"ソース質問リストを作成しました. 保存先: {self.DOCUMENT_DIR}/{self.SOURCE_QUESTIONS_FILE}")
def _download_source_documents(self):
download_tasks = []
documents_df = pd.read_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")
# Fetch the documents
for idx, row in documents_df.iterrows():
domain = row["domain"]
url = row["url"]
filename = row["file_name"]
save_dir = os.path.join(self.DOCUMENT_DIR, domain)
download_tasks.append([url, save_dir, filename])
downloader = AsyncDownloader()
# イベントループを実行
asyncio.create_task(downloader.download_all(download_tasks))
total = len(documents_df)
downloaded = len(glob.glob(f"{self.DOCUMENT_DIR}/**/*.pdf", recursive=True))
if total == downloaded:
self.logger.info(f"ソースドキュメントをすべてダウンロードしました: {downloaded}/{total}")
else:
self.logger.error(f"ソースドキュメントのダウンロードに失敗しました: {downloaded}/{total}. {downloader.LOG_FILE} を確認してください。")
### ナレッジデータの作成処理 ###
async def create_knowledge_data(self, use_llm: bool = False):
"""
ナレッジデータを作成する。
"""
save_name = "document_page_data.csv"
if use_llm:
save_name = "document_page_data(pdfminer_with_llm).csv"
else:
save_name = "document_page_data(pdfminer).csv"
csv_header = [
"document_id",
"domain",
"title",
"page",
"url",
"file_name",
"publisher",
"page_number",
"page_numebr(zero_indexed)",
"content"
]
documents_df = pd.read_csv(f"{self.DOCUMENT_DIR}/{self.SOURCE_DOCUMENTS_FILE}")
total = len(documents_df)
extractor = AsyncPDFExtractor()
with open(f"{self.BENCHMARK_DATA_DIR}/{save_name}", "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(csv_header)
for idx, row in documents_df.iterrows():
print(f"Processing {idx + 1}/{total}")
domain = row["domain"]
filename = row["file_name"]
save_dir = os.path.join(self.BENCHMARK_DATA_DIR
, domain)
doc_path = os.path.join(save_dir, filename)
if not os.path.exists(doc_path):
print(f"Failed to download {doc_path}")
continue
if use_llm:
# Extract text from the PDF with LLM
doc_datas: list[AsyncPDFExtractor.DocData] = await extractor.process_pdf_with_llm(doc_path)
# datas.extend([DocData(page=d.page, domain=domain, filename=filename, content=d.formatted_content) for d in doc_datas])
for d in doc_datas:
writer.writerow([idx, domain, row["title"], row["page"], row["url"], filename, row["publisher"], d.page+1, d.page, d.formatted_content.replace("\n", r"\n")])
else:
# Extract text from the PDF
doc_datas: list[AsyncPDFExtractor.DocData] = await extractor.extract_doc_data(doc_path)
# datas.extend([DocData(page=d.page, domain=domain, filename=filename, content=d.content) for d in doc_datas])
for d in doc_datas:
writer.writerow([idx, domain, row["title"], row["page"], row["url"], filename, row["publisher"], d.page+1, d.page, d.content.replace("\n", r"\n")])
### benchmark_knowledge_dataの処理 ###
def load_csv(self, csv_file_path: str = None):
"""
CSVファイルを読み込み、データを内部に保持する。
"""
if csv_file_path:
self.csv_file_path = csv_file_path
try:
with open(self.csv_file_path, "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
self.data = [row for row in reader]
self.logger.info(f"CSVファイルを正常に読み込みました: {self.csv_file_path}")
except Exception as e:
self.logger.error(f"CSVファイルの読み込み中にエラーが発生しました: {e}")
raise e
def get_data_by_domain(self, domain: str) -> List[Dict]:
"""
指定されたドメインのデータを取得する。
Args:
domain (str): 取得するデータのドメイン
Returns:
List[Dict]: ドメインに該当するデータのリスト
"""
filtered_data = [row for row in self.data if row.get("domain") == domain]
if not filtered_data:
self.logger.warning(f"指定されたドメインに該当するデータが見つかりません: {domain}")
return filtered_data
def get_content_for_knowledge(self, domain: str) -> List[str]:
"""
指定されたドメインのデータからcontentカラムを取得する。
Args:
domain (str): 取得するデータのドメイン
Returns:
List[str]: ドメインに該当するcontentカラムのリスト
"""
data = self.get_data_by_domain(domain)
return [row.get("content", "") for row in data]
def get_domains(self) -> List[str]:
"""
データに含まれるドメインの一覧を取得する。
Returns:
List[str]: データに含まれるドメインの一覧
"""
return list(set(row.get("domain") for row in self.data))
def log_error(self, message: str):
"""
エラーメッセージをログに記録する。
Args:
message (str): 記録するエラーメッセージ
"""
self.logger.error(message)
# 使用例
if __name__ == "__main__":
# CSVファイルパスを指定
manager = BenchmarkDataManager("benchmark_knowledge_data/document_page_data(pdfminer_with_llm).csv")
# ドメインごとのデータを取得
domain_data = manager.get_data_by_domain("it")
print("ドメインデータ カウント:", len(domain_data))
# ナレッジ登録用のcontentカラムを取得
knowledge_content = manager.get_content_for_knowledge("it")
print("ナレッジ登録データ カウント:", len(knowledge_content))
src/utils/downloader.py
import asyncio
import aiohttp
import os
import pathlib
from aiohttp import ClientTimeout
from tqdm.asyncio import tqdm
import logging
class AsyncDownloader:
LOG_FILE = "AsyncDownloader_error.log"
def __init__(self, concurrency: int = 5):
"""
非同期ダウンロードクラス。
Parameters
----------
concurrency : int, default 5
並列ダウンロードの上限数。
"""
self.semaphore = asyncio.Semaphore(concurrency)
logging.basicConfig(
filename=self.LOG_FILE,
level=logging.ERROR,
format="%(asctime)s [%(levelname)s] %(message)s",
)
async def fetch(self, session: aiohttp.ClientSession, url: str, save_path: str):
"""
URLからファイルを非同期でダウンロードする。
Parameters
----------
session : aiohttp.ClientSession
非同期HTTPセッション。
url : str
ダウンロードするファイルのURL。
save_path : str
保存先のファイルパス。
"""
try:
if os.path.exists(save_path):
print(f"File already exists: {save_path}")
return
async with self.semaphore:
async with session.get(url, timeout=ClientTimeout(total=30)) as response:
response.raise_for_status()
pathlib.Path(save_path).parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "wb") as f:
async for chunk in response.content.iter_chunked(1024):
f.write(chunk)
print(f"Downloaded: {url} -> {save_path}")
except Exception as e:
logging.error(f"Failed to download {url}. save_path {save_path}. Error: {str(e)}")
async def download_all(self, tasks: list):
"""
リスト内のすべてのダウンロードタスクを実行する。
Parameters
----------
tasks : list
[url, 保存先ディレクトリ, ファイル名] のリスト。
"""
async with aiohttp.ClientSession() as session:
download_tasks = []
for url, directory, filename in tasks:
save_path = os.path.join(directory, filename)
download_tasks.append(self.fetch(session, url, save_path))
for task in tqdm(asyncio.as_completed(download_tasks), total=len(download_tasks)):
await task
if __name__ == "__main__":
# ダウンロードタスクの例
download_tasks = [
["https://example.com/file1.jpg", "downloads/images", "file1.jpg"],
["https://example.com/file2.jpg", "downloads/images", "file2.jpg"],
["https://example.com/file3.jpg", "downloads/docs", "file3.jpg"],
]
# 並列数を指定してダウンローダを作成
downloader = AsyncDownloader(concurrency=3)
# イベントループを実行
# await downloader.download_all(download_tasks)
asyncio.run(downloader.download_all(download_tasks))
src/PDFExtractor/extractor.py
import asyncio
import os, time
import pathlib
from tqdm.asyncio import tqdm
import logging
import pdfminer
import pdfminer.high_level
from dataclasses import dataclass
from pdf2image import convert_from_path
import openai
import aiofiles
import base64
from .prompt import SYSTEM_PROMPT, USER_PROMPT
# pdfminerを使ってPDFからテキストを抽出する
class AsyncPDFExtractor:
concurrency: int
retry: int
wait_time: int
LOG_FILE = "AsyncPDFExtractor_error.log"
@dataclass
class DocData:
page: int
filename: str
content: str
formatted_content: str | None = None
def __init__(self, concurrency: int = 5, retry: int = 10, wait_time: int = 30):
self.semaphore = asyncio.Semaphore(concurrency)
self.retry = retry
self.wait_time = wait_time
logging.basicConfig(
filename=self.LOG_FILE,
level=logging.ERROR,
format="%(asctime)s [%(levelname)s] %(message)s",
)
async def extract_by_llm(self, text_content: str, image_path: str) -> str:
openai_client = openai.AsyncAzureOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("OPENAI_API_VERSION")
)
for try_c in range(self.retry):
try:
# 画像をBase64エンコードする
async with aiofiles.open(image_path, 'rb') as image_file:
image_data = await image_file.read()
base64_image = base64.b64encode(image_data).decode('utf-8')
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{
"type": "text",
"text": USER_PROMPT.format(pdfminer_text=text_content),
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{base64_image}"},
},
],
}
],
temperature=0.0
)
return response.choices[0].message.content
except Exception as e:
if "429" in str(e):
logging.warning(f"Rate Limit to process with LLM: {try_c+1}/{self.retry} {self.wait_time}秒待機します。")
time.sleep(self.wait_time)
continue
else:
logging.error(f"Failed to process with LLM: {str(e)}")
return ""
def get_pages(self, pdf_path: str) -> int:
# zero-indexed page numbers
return [r.pageid - 1 for r in pdfminer.high_level.extract_pages(pdf_path)]
async def extract_text(
self,
pdf_path: str,
page_numbers: list[str]=None # List of zero-indexed page numbers to extract.
) -> str:
async with self.semaphore:
content_text = pdfminer.high_level.extract_text(pdf_path, page_numbers=page_numbers)
return content_text
async def extract_doc_data(self, pdf_path: str) -> list[DocData]:
pages = self.get_pages(pdf_path)
doc_data = []
for page in pages:
content = await self.extract_text(pdf_path, [page])
doc_data.append(self.DocData(page=page, filename=pdf_path, content=content))
return doc_data
async def extract_page_image(self, pdf_path: str, page_number: int, output_dir: str = "images") -> str:
try:
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
images = convert_from_path(pdf_path, first_page=page_number + 1, last_page=page_number + 1, dpi=300)
image_path = os.path.join(output_dir, f"page_{page_number + 1}.png")
images[0].save(image_path, "PNG")
return image_path
except Exception as e:
logging.error(f"Failed to convert PDF page to image: {str(e)}")
return ""
async def process_pdf_with_llm(self, pdf_path: str) -> list[DocData]:
try:
doc_data = await self.extract_doc_data(pdf_path)
tasks = []
async def process_page(data: AsyncPDFExtractor.DocData):
async with self.semaphore:
image_path = await self.extract_page_image(pdf_path, data.page)
if image_path:
formatted_content = await self.extract_by_llm(data.content, image_path)
data.formatted_content = formatted_content
# 画像ファイルを削除
try:
os.remove(image_path)
logging.info(f"Deleted temporary image: {image_path}")
except Exception as e:
logging.error(f"Failed to delete temporary image {image_path}: {str(e)}")
return formatted_content
return ""
for data in doc_data:
tasks.append(process_page(data))
results = await asyncio.gather(*tasks)
return doc_data
except Exception as e:
logging.error(f"Failed to process PDF with LLM: {str(e)}")
return []
src/PDFExtractor/prompt.py
SYSTEM_PROMPT = """以下の情報を元に、指定されたPDFページをMarkdown形式の純粋なテキストに変換してください。
- **画像データ**: そのページ全体のレイアウトや表形式を含む画像データ
- **抽出されたテキスト**: pdfminerで抽出されたそのページのテキストデータ(正確な文章を含む)
### 注意事項
1. **pdfminerのテキストをベース**にし、正確な文章として利用してください。
2. **画像データ**は以下の用途に使用してください:
- 表形式が壊れている場合に正確に補完。
- 見出しや箇条書きのフォーマットが壊れている場合に修正。
- テキストが配置されている順序や構造を正確に保つため。
3. 画像データのフォーマットでMarkdown形式に変換するのが困難な場合は、「(※llm: 要確認箇所)」と困難な箇所のテキストの後ろに追記してください。
4. 出力は**Markdown形式の純粋なテキスト**とし、コードブロックや装飾で囲まないでください(例: 「```markdown」などは使用しないでください)。
### 入力情報
- **画像データ**: [ページ全体のスクリーンショットやスキャン画像]
- **pdfminerテキスト**: [ページのテキスト]
必要に応じて、不足している内容があれば質問してください。
### 出力例
1. 見出し(例: `# 見出し`)
2. 箇条書きリスト(例: `- リストアイテム`)
3. 表(例: `| 見出し1 | 見出し2 |`)
最適なMarkdownフォーマットを適用し、フォーマットの壊れた箇所を修正してください。
"""
USER_PROMPT = "# pdfminerテキスト\n{pdfminer_text}"
import pandas as pd
import os
save_dir = "012_results"
os.makedirs(save_dir, exist_ok=True)
question_csv = "benchmark_documents/question.csv"
question_df = pd.read_csv(question_csv)
def rank_questions_by_domain(domain: str, search_class)->tuple[pd.DataFrame, float]:
print(f"Ranking questions for domain: {domain}")
datas = data_manager.get_data_by_domain(domain)
document_indexs = [[d["document_id"], d["page_number"], d["file_name"]] for d in datas]
# document_corpus = [d["content"] for d in datas]
document_corpus = []
for d in datas:
if d["content"]:
document_corpus.append(d["content"])
else:
document_corpus.append("-")
search_class.embedding_corpus(document_corpus)
domain_question_df = question_df[(question_df["domain"] == domain) & (question_df["type"] != "image")]
domain_question_reuslt_df = domain_question_df.copy()
domain_question_reuslt_df["rank"] = -1
domain_question_reuslt_df["rank_score"] = -1.0
domain_question_reuslt_df["in_a_rank"] = False
for row in domain_question_df.itertuples():
# print(row.question)
(ranked_indices, ranked_scores) = search_class.rank_documents(row.question)
for i, (index, score) in enumerate(zip(ranked_indices, ranked_scores)):
(document_id, page_number, file_name) = document_indexs[index]
if file_name == row.target_file_name and page_number == str(row.target_page_no):
domain_question_reuslt_df.at[row.Index, "rank"] = i + 1
domain_question_reuslt_df.at[row.Index, "rank_score"] = score
domain_question_reuslt_df.at[row.Index, "in_a_rank"] = True
break
score = len(domain_question_reuslt_df[domain_question_reuslt_df["in_a_rank"] == True])/len(domain_question_reuslt_df)
print(f"Domain: {domain}, Score: {score}")
return (domain_question_reuslt_df, score)
if __name__ == "__main__":
mixpr_tfidf = MixPR(TfidfVectorizerWrapper())
total_question_reuslt_df = pd.DataFrame()
scores = []
for domain in data_manager.get_domains():
(domain_question_reuslt_df, score) = rank_questions_by_domain(domain, mixpr_tfidf)
total_question_reuslt_df = pd.concat([total_question_reuslt_df, domain_question_reuslt_df])
scores.append([domain, score])
print(scores)
# テキストは改行させない
total_question_reuslt_df.to_csv(os.path.join(save_dir, "total_question_reuslt(tfidf).csv"), index=False)
Discussion