🔥

マルチモデルOllamaベンチマークのコード解説

2024/12/19に公開

コードの全体構造

ベンチマークツールは以下の主要なコンポーネントで構成されています:

  1. OllamaClientクラス:APIとの通信を管理
  2. ベンチマーク実行機能:複数モデルのテスト実行
  3. 結果集計機能:モデル間の比較分析
  4. ロギング機能:詳細な実行ログの記録

主要コンポーネントの詳細解説

2.1 OllamaClientクラス

class OllamaClient:
    def __init__(self, host: str = "http://localhost:11434"):
        """Ollamaクライアントの初期化"""
        self.host = host
        self.base_url = f"{host}/api"
        pynvml.nvmlInit()
        logger.info(f"Initialized Ollama client with host: {host}")

重要なポイント:

  • NVMLの初期化によるGPUモニタリング機能の有効化
  • ログ機能の統合によるデバッグのしやすさ
  • ホスト設定のカスタマイズ可能性

2.2 GPU監視機能

def get_gpu_memory(self) -> Dict[str, int]:
    """GPU使用メモリを取得"""
    memory_info = {}
    deviceCount = pynvml.nvmlDeviceGetCount()
    for i in range(deviceCount):
        handle = pynvml.nvmlDeviceGetHandleByIndex(i)
        info = pynvml.nvmlDeviceGetMemoryInfo(handle)
        name = pynvml.nvmlDeviceGetName(handle)
        if isinstance(name, bytes):
            name = name.decode('utf-8')
        memory_info[name] = info.used // 1024 // 1024  # MB単位
    return memory_info

実装のポイント:

  • マルチGPU環境の自動検出
  • メモリ使用量のMB単位への変換
  • バイト文字列のデコード処理による互換性確保

2.3 テキスト生成機能

def generate(self, model: str, prompt: str, stream: bool = True) -> dict:
    """テキスト生成を実行"""
    url = f"{self.base_url}/generate"
    data = {
        "model": model,
        "prompt": prompt,
        "stream": stream
    }
    
    if stream:
        response = requests.post(url, json=data, stream=True)
        full_response = ""
        for line in response.iter_lines():
            if line:
                json_response = json.loads(line)
                if 'response' in json_response:
                    full_response += json_response['response']
                if json_response.get('done', False):
                    return {
                        'response': full_response,
                        'total_duration': json_response.get('total_duration', 0),
                        'load_duration': json_response.get('load_duration', 0),
                        'prompt_eval_count': json_response.get('prompt_eval_count', 0),
                        'eval_count': json_response.get('eval_count', 0)
                    }

実装の特徴:

  • ストリーミングモードのサポート
  • 詳細な性能メトリクスの収集
  • エラーハンドリングの組み込み

2.4 ベンチマーク実行機能

def run_benchmark(client: OllamaClient, 
                 model: str, 
                 prompts: List[str], 
                 output_dir: str):
    """ベンチマークを実行して結果をCSVに保存"""
    logger.info(f"Starting benchmark for model: {model}")
    results = []
    
    # 出力ディレクトリが存在しない場合は作成
    os.makedirs(output_dir, exist_ok=True)
    
    # モデル名から使用可能なファイル名を生成
    safe_model_name = model.replace(':', '_').replace('/', '_')
    output_file = os.path.join(output_dir, f"benchmark_results_{safe_model_name}.csv")
    
    for i, prompt in enumerate(prompts, 1):
        # 生成前のGPUメモリを記録
        initial_memory = client.get_gpu_memory()
        
        # 生成実行と時間計測
        start_time = time.time()
        try:
            response = client.generate(model, prompt)
            success = True
        except Exception as e:
            logger.error(f"Error generating with model {model}: {str(e)}")
            response = {}
            success = False
            
        end_time = time.time()

重要な機能:

  • モデルごとの独立したCSVファイル生成
  • エラー発生時のグレースフルな処理
  • 詳細なメモリ使用状況の追跡

2.5 結果集計機能

def aggregate_results(output_dir: str):
    """全モデルの結果を集計して比較用CSVを作成"""
    all_results = []
    csv_files = [f for f in os.listdir(output_dir) 
                 if f.startswith('benchmark_results_') and f.endswith('.csv')]
    
    for csv_file in csv_files:
        with open(os.path.join(output_dir, csv_file), 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            model_results = list(reader)
            if model_results:
                model_name = model_results[0]['model']
                avg_duration = sum(float(r['total_duration']) 
                                 for r in model_results) / len(model_results)
                avg_memory_change = {}
                
                # GPU別の平均メモリ変化を計算
                for key in model_results[0].keys():
                    if key.endswith('_memory_change_mb'):
                        gpu_name = key.replace('_memory_change_mb', '')
                        avg_memory_change[gpu_name] = sum(float(r[key]) 
                                 for r in model_results) / len(model_results)

集計処理のポイント:

  • モデル間の比較可能な指標の生成
  • GPU別のメモリ使用統計
  • 成功率と平均応答時間の算出

拡張とカスタマイズのポイント

3.1 新しいメトリクスの追加

# result辞書に新しいメトリクスを追加する例
result.update({
    'tokens_per_second': response.get('eval_count', 0) / 
                        (end_time - start_time),
    'memory_efficiency': response.get('eval_count', 0) / 
                        abs(memory_change if memory_change != 0 else 1)
})

3.2 カスタムプロンプトの設定

# プロンプトをカテゴリ別に整理
prompts = {
    'creative': [
        "1000文字のSF小説を書いてください。設定は宇宙船内での出来事です。",
        # 他の創作タスク
    ],
    'technical': [
        "Pythonで簡単なウェブスクレイピングプログラムを書いてください。",
        # 他の技術タスク
    ],
    'explanation': [
        "量子コンピューティングについて500文字で説明してください。",
        # 他の説明タスク
    ]
}

3.3 ロギング設定のカスタマイズ

# より詳細なログ設定の例
logger.add(
    "detailed_benchmark_{time}.log",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
    level="DEBUG",
    rotation="1 day",
    compression="zip"
)

エラーハンドリングとデバッグ

4.1 例外処理の実装

try:
    response = client.generate(model, prompt)
    success = True
except requests.exceptions.ConnectionError as e:
    logger.error(f"Connection error: {str(e)}")
    success = False
except json.JSONDecodeError as e:
    logger.error(f"JSON decode error: {str(e)}")
    success = False
except Exception as e:
    logger.error(f"Unexpected error: {str(e)}")
    success = False

4.2 デバッグモードの活用

if args.debug:
    logger.remove()
    logger.add(
        sys.stdout,
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
               "<level>{level: <8}</level> | "
               "<cyan>{name}</cyan>:<cyan>{function}</cyan>:"
               "<cyan>{line}</cyan> - <level>{message}</level>",
        level="DEBUG"
    )

使用方法とベストプラクティス

5.1 基本的な使用方法

# 単一モデルでのベンチマーク
python ollama_benchmark.py --models llama2:latest

# 複数モデルでのベンチマーク
python ollama_benchmark.py --models llama2:latest gemma2:latest qwen2.5-coder:7b

# デバッグモードでの実行
python ollama_benchmark.py --models llama2:latest --debug

5.2 結果の分析

# Pandas を使用した結果の分析例
import pandas as pd

def analyze_results(comparison_file):
    df = pd.read_csv(comparison_file)
    
    # モデル別の性能分析
    performance_summary = df.groupby('model').agg({
        'average_duration': ['mean', 'std'],
        'success_rate': 'mean'
    })
    
    # メモリ使用量の分析
    memory_usage = df.filter(regex='_avg_memory_change$')
    
    return performance_summary, memory_usage

最適化とパフォーマンスチューニング

6.1 メモリ管理の最適化

# モデル間の待機時間の調整
time.sleep(5)  # モデル切り替え時のメモリクリア待機

# メモリ使用量の監視
if memory_change > threshold:
    logger.warning(f"High memory usage detected: {memory_change}MB")

6.2 並列処理の実装

from concurrent.futures import ThreadPoolExecutor

def parallel_benchmark(models, prompts, max_workers=3):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for model in models:
            future = executor.submit(run_benchmark, 
                                   client, model, prompts, output_dir)
            futures.append(future)
        
        # 結果の収集
        results = [f.result() for f in futures]

まとめ

このコードガイドでは、マルチモデルOllamaベンチマークツールの主要なコンポーネントと実装の詳細を解説しました。各セクションで示したコード例とベストプラクティスは、実際の運用環境での活用や、さらなる機能拡張の基礎として活用できます。

全体コード


import requests
import time
import json
from typing import Dict, List, Optional
import pynvml
import argparse
from datetime import datetime
import csv
from art import text2art
from loguru import logger
import sys
import os

# ロガーの設定
logger.remove()
logger.add(
    sys.stdout,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level="INFO"
)
logger.add("ollama_benchmark_{time}.log")

def print_banner():
    """アプリケーションバナーを表示"""
    art = text2art("Ollama Benchmark", font='block')
    print("\033[94m" + art + "\033[0m")
    print("\033[92m" + "=" * 50 + "\033[0m")
    print("\033[93mMulti-Model K/V Context Quantization Performance Test\033[0m")
    print("\033[92m" + "=" * 50 + "\033[0m\n")

class OllamaClient:
    def __init__(self, host: str = "http://localhost:11434"):
        """Ollamaクライアントの初期化"""
        self.host = host
        self.base_url = f"{host}/api"
        pynvml.nvmlInit()
        logger.info(f"Initialized Ollama client with host: {host}")

    def get_gpu_memory(self) -> Dict[str, int]:
        """GPU使用メモリを取得"""
        memory_info = {}
        deviceCount = pynvml.nvmlDeviceGetCount()
        for i in range(deviceCount):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            info = pynvml.nvmlDeviceGetMemoryInfo(handle)
            name = pynvml.nvmlDeviceGetName(handle)
            if isinstance(name, bytes):
                name = name.decode('utf-8')
            memory_info[name] = info.used // 1024 // 1024  # MB単位
        return memory_info

    def list_models(self) -> List[Dict]:
        """利用可能なモデルの一覧を取得"""
        logger.info("Fetching available models...")
        response = requests.get(f"{self.base_url}/tags")
        models = response.json()['models']
        logger.info(f"Found {len(models)} models")
        return models

    def generate(self, model: str, prompt: str, stream: bool = True) -> dict:
        """テキスト生成を実行"""
        logger.info(f"Generating text with model: {model}")
        logger.debug(f"Prompt: {prompt[:100]}...")
        
        url = f"{self.base_url}/generate"
        data = {
            "model": model,
            "prompt": prompt,
            "stream": stream
        }
        
        if stream:
            response = requests.post(url, json=data, stream=True)
            full_response = ""
            for line in response.iter_lines():
                if line:
                    json_response = json.loads(line)
                    if 'response' in json_response:
                        full_response += json_response['response']
                    if json_response.get('done', False):
                        logger.success("Text generation completed")
                        return {
                            'response': full_response,
                            'total_duration': json_response.get('total_duration', 0),
                            'load_duration': json_response.get('load_duration', 0),
                            'prompt_eval_count': json_response.get('prompt_eval_count', 0),
                            'eval_count': json_response.get('eval_count', 0)
                        }
        else:
            response = requests.post(url, json=data)
            return response.json()

def run_benchmark(client: OllamaClient, 
                 model: str, 
                 prompts: List[str], 
                 output_dir: str):
    """ベンチマークを実行して結果をCSVに保存"""
    logger.info(f"Starting benchmark for model: {model}")
    results = []
    
    # 出力ディレクトリが存在しない場合は作成
    os.makedirs(output_dir, exist_ok=True)
    
    # モデル名から使用可能なファイル名を生成
    safe_model_name = model.replace(':', '_').replace('/', '_')
    output_file = os.path.join(output_dir, f"benchmark_results_{safe_model_name}.csv")
    
    for i, prompt in enumerate(prompts, 1):
        logger.info(f"Running test {i}/{len(prompts)}")
        
        # 生成前のGPUメモリを記録
        initial_memory = client.get_gpu_memory()
        
        # 生成実行
        start_time = time.time()
        try:
            response = client.generate(model, prompt)
            success = True
        except Exception as e:
            logger.error(f"Error generating with model {model}: {str(e)}")
            response = {}
            success = False
            
        end_time = time.time()
        
        # 生成後のGPUメモリを記録
        final_memory = client.get_gpu_memory()
        
        # 結果を記録
        result = {
            'model': model,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'prompt': prompt[:50],
            'success': success,
            'total_duration': end_time - start_time,
            'load_duration': response.get('load_duration', 0),
            'prompt_eval_count': response.get('prompt_eval_count', 0),
            'eval_count': response.get('eval_count', 0),
            'response_length': len(response.get('response', '')),
        }
        
        # 各GPUのメモリ使用量の変化を記録
        for gpu, initial in initial_memory.items():
            result[f'{gpu}_initial_memory_mb'] = initial
            result[f'{gpu}_final_memory_mb'] = final_memory[gpu]
            result[f'{gpu}_memory_change_mb'] = final_memory[gpu] - initial
            logger.info(f"GPU {gpu} memory change: {result[f'{gpu}_memory_change_mb']}MB")
        
        results.append(result)
        status = "succeeded" if success else "failed"
        logger.success(f"Test {i}/{len(prompts)} {status} in {result['total_duration']:.2f}s")

        # 各テスト後に少し待機してGPUメモリを解放
        time.sleep(2)

    # 結果をCSVに保存
    if results:
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=results[0].keys())
            writer.writeheader()
            writer.writerows(results)
        logger.success(f"Results for model {model} saved to {output_file}")

def aggregate_results(output_dir: str):
    """全モデルの結果を集計して比較用CSVを作成"""
    all_results = []
    csv_files = [f for f in os.listdir(output_dir) if f.startswith('benchmark_results_') and f.endswith('.csv')]
    
    for csv_file in csv_files:
        with open(os.path.join(output_dir, csv_file), 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            model_results = list(reader)
            if model_results:
                model_name = model_results[0]['model']
                avg_duration = sum(float(r['total_duration']) for r in model_results) / len(model_results)
                avg_memory_change = {}
                
                # GPU別の平均メモリ変化を計算
                for key in model_results[0].keys():
                    if key.endswith('_memory_change_mb'):
                        gpu_name = key.replace('_memory_change_mb', '')
                        avg_memory_change[gpu_name] = sum(float(r[key]) for r in model_results) / len(model_results)
                
                all_results.append({
                    'model': model_name,
                    'average_duration': avg_duration,
                    'success_rate': sum(1 for r in model_results if r['success'] == 'True') / len(model_results) * 100,
                    **{f'{gpu}_avg_memory_change': change for gpu, change in avg_memory_change.items()}
                })
    
    if all_results:
        comparison_file = os.path.join(output_dir, 'model_comparison.csv')
        with open(comparison_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=all_results[0].keys())
            writer.writeheader()
            writer.writerows(all_results)
        logger.success(f"Model comparison saved to {comparison_file}")

def main():
    print_banner()
    
    parser = argparse.ArgumentParser(description='Multi-Model Ollama Benchmark Client')
    parser.add_argument('--models', nargs='+', default=['llama2:latest'],
                      help='Models to benchmark (space-separated list)')
    parser.add_argument('--output-dir', default='benchmark_results',
                      help='Output directory for results')
    parser.add_argument('--debug', action='store_true',
                      help='Enable debug logging')
    args = parser.parse_args()

    if args.debug:
        logger.remove()
        logger.add(sys.stdout, level="DEBUG")

    client = OllamaClient()
    
    # テスト用プロンプト
    prompts = [
        "1000文字のSF小説を書いてください。設定は宇宙船内での出来事です。",
        "Pythonで簡単なウェブスクレイピングプログラムを書いてください。BeautifulSoupを使用してください。",
        "量子コンピューティングについて500文字で説明してください。",
        "機械学習におけるバイアスとバリアンスのトレードオフについて説明してください。",
        "クリーンアーキテクチャの主要な原則について説明し、実装例を示してください。"
    ]

    logger.info(f"Testing models: {', '.join(args.models)}")
    logger.info("Checking available models...")
    
    available_models = {model['name']: model['digest'] for model in client.list_models()}
    print("\nAvailable models:")
    for name, digest in available_models.items():
        print(f"- {name}: {digest}")

    print("\n" + "=" * 50)
    
    # 各モデルに対してベンチマークを実行
    for model in args.models:
        if model in available_models:
            run_benchmark(client, model, prompts, args.output_dir)
        else:
            logger.warning(f"Model {model} not found in available models, skipping...")
        
        # モデル間で少し待機してGPUメモリを完全に解放
        time.sleep(5)
    
    # 結果を集計
    aggregate_results(args.output_dir)

if __name__ == "__main__":
    main()
    
    
# # 複数モデルでベンチマーク実行
# python ollama_benchmark.py --models llama2:latest gemma2:latest qwen2.5-coder:7b

# # カスタム出力ディレクトリを指定
# python ollama_benchmark.py --models llama2:latest llama3:latest --output-dir my_benchmark_results

# # デバッグモードで実行
# python ollama_benchmark.py --models llama2:latest --debug

<script async src="https://platform.twitter.com/widgets.js" charset="utf-8"></script>

Discussion