⚙️

MariaDB WHERE INクエリの直列分割処理で最適なチャンクサイズを実測データで検証する

に公開

はじめに

大量のデータを扱う際、WHERE INクエリのパフォーマンスは重要な課題です。特に、数万件のIDを一度に処理する場合、単一のクエリではパフォーマンスが低下することがあります。

この記事では、直列分割処理によるWHERE INクエリの最適化について、実際のベンチマーク結果に基づいて詳しく分析します。

問題の背景

単一WHERE INクエリの問題

-- 50000件のIDを一度に処理
SELECT * FROM test_records 
WHERE id IN (?, ?, ?, ..., ?) -- 50000個のパラメータ

このような大量のパラメータを含むクエリには以下の問題があります:

  • メモリ使用量の増加: 大量のパラメータをメモリに保持
  • クエリプランの複雑化: データベースエンジンの最適化が困難
  • 実行時間の増加: テーブルスキャンが発生する可能性

解決策: 直列分割処理

直列分割処理では、大きなクエリを小さなチャンクに分割して順次実行します:

// 50000件を1000件ずつ50チャンクに分割
let chunks: Vec<Vec<String>> = ids.chunks(1000).map(|chunk| chunk.to_vec()).collect();

for chunk in &chunks {
    let _ = self.db.where_in_query(chunk).await?; // 1000件ずつ処理
}

ベンチマーク環境

テストデータ

  • テーブル: test_records
  • レコード数: 100,212件
  • インデックス: PRIMARY KEY (id)
  • データベース: MariaDB

テストパターン

  • 単一WHERE IN: 50000件を一度に処理
  • 直列分割: 500件〜20000件のチャンクサイズで分割処理
  • 測定項目: 実行時間、パフォーマンス向上率、実行計画

実測結果

1. 単一WHERE INクエリの結果

-- 実行計画
id: 1, select_type: SIMPLE, table: test_records, type: ALL, 
possible_keys: PRIMARY, key: , key_len: , ref: , rows: 100212, Extra: Using where

-- 実行時間
Single WHERE IN time: 448.00ms

問題点:

  • type: ALL (テーブルスキャン)
  • 全100,212行をスキャン
  • PRIMARYキーが使用されていない

2. 直列分割処理の結果

チャンクサイズ チャンク数 実行時間 パフォーマンス向上 アクセスタイプ 効率性
500件 100チャンク 394.00ms 1.14倍 range
1000件 50チャンク 391.00ms 1.15倍 range
2000件 25チャンク 373.00ms 1.20倍 range 最高
3000件 17チャンク 374.00ms 1.20倍 range 最高
4000件 13チャンク 383.00ms 1.17倍 range
5000件 10チャンク 398.00ms 1.13倍 range
10000件 5チャンク 384.00ms 1.17倍 range
20000件 3チャンク 424.00ms 1.06倍 ALL

重要な発見

1. 最適なチャンクサイズ

🏆 最適チャンクサイズ: 2000件

  • 実行時間: 373.00ms
  • パフォーマンス向上: 1.20倍
  • チャンク数: 25チャンク

2. 実行計画の変化パターン

インデックスが効く範囲 (500件〜10000件)

-- 実行計画
type: range, key: PRIMARY, key_len: 144, rows: [チャンクサイズ]

-- 特徴
- インデックスを使用した効率的な範囲スキャン
- 処理行数がチャンクサイズと比例
- 一貫したパフォーマンス向上

インデックスが効かなくなる境界 (20000件)

-- 実行計画
type: ALL, rows: 100212, key: (なし)

-- 特徴
- テーブルスキャンに変化
- パフォーマンス向上率が大幅に低下
- 単一WHERE INと同様の問題が発生

3. パフォーマンス向上のパターン

500件   → 1.14倍 (オーバーヘッド大)
1000件  → 1.15倍 (オーバーヘッド中)
2000件  → 1.20倍 (バランス最適) ← ピーク
3000件  → 1.20倍 (バランス最適) ← ピーク
4000件  → 1.17倍 (バランス良好)
5000件  → 1.13倍 (バランス中)
10000件 → 1.17倍 (バランス良好)
20000件 → 1.06倍 (インデックス効率低下)

技術的な分析

1. インデックス効率の境界線

// インデックスが効くチャンクサイズの上限
let max_efficient_chunk_size = 10000; // 実測値

// 最適なチャンクサイズの計算
let optimal_chunk_size = sqrt(total_records * efficiency_factor);
// 50000件の場合: sqrt(50000 * 0.1) ≈ 70件

// 実際の最適値: 2000件
// 実用的な制約とオーバーヘッドを考慮した結果

2. オーバーヘッドと効率性のバランス

// チャンク数によるオーバーヘッド
let overhead = chunk_count * query_overhead;

// 500件: 100 * overhead (オーバーヘッド大)
// 2000件: 25 * overhead (バランス最適)
// 20000件: 3 * overhead (オーバーヘッド小、インデックス効率低下)

// インデックス効率
let index_efficiency = if chunk_size <= 10000 { "high" } else { "low" };

3. メモリ使用量の最適化

// 単一クエリ
let single_memory = 50000 * 36; // 1.8MB (UUID文字列)

// 分割クエリ(2000件チャンク)
let split_memory = 2000 * 36;   // 72KB

// メモリ削減率: 25倍

実装例

Rustでの最適化された実装

pub async fn optimized_split_processing(
    &self,
    ids: &[String],
    chunk_size: usize,
) -> Result<Vec<TestRecord>> {
    // 適応的なチャンクサイズ選択
    let optimal_chunk_size = if ids.len() > 10000 {
        chunk_size.min(2000) // 大きなデータセットでは2000件を上限
    } else {
        chunk_size
    };

    let chunks: Vec<Vec<String>> = ids
        .chunks(optimal_chunk_size)
        .map(|chunk| chunk.to_vec())
        .collect();

    let mut all_records = Vec::new();
    
    // 直列処理(並列処理のオーバーヘッドを避ける)
    for chunk in &chunks {
        let records = self.db.where_in_query(chunk).await?;
        all_records.extend(records);
    }

    Ok(all_records)
}

環境別の最適設定

// 高パフォーマンス環境
let high_performance_config = QueryConfig {
    chunk_size: 2000,           // 最適値
    max_parallel_tasks: 25,     // 25チャンク
    target: "maximum_performance",
};

// DB負荷軽減環境
let load_balance_config = QueryConfig {
    chunk_size: 1000,           // より小さなチャンク
    max_parallel_tasks: 50,     // 50チャンク
    target: "minimum_db_load",
};

// バランス環境
let balanced_config = QueryConfig {
    chunk_size: 3000,           // 最適範囲内
    max_parallel_tasks: 17,     // 17チャンク
    target: "balanced_performance",
};

最適化の推奨事項

1. チャンクサイズの選択基準

  • 500件〜2000件: 高パフォーマンス、高負荷軽減
  • 2000件〜5000件: バランス最適、標準運用
  • 5000件〜10000件: 中程度のパフォーマンス、安定性重視
  • 10000件以上: 非推奨(インデックス効率低下)

2. 環境別の最適化戦略

本番環境(高負荷時)

let chunk_size = 1000; // 負荷軽減重視

開発・テスト環境

let chunk_size = 2000; // パフォーマンス重視

バッチ処理環境

let chunk_size = 3000; // バランス重視

3. 監視と調整

// パフォーマンス監視
let performance_metrics = monitor_split_processing_performance(
    chunk_size,
    execution_time,
    memory_usage,
    db_load
);

// 適応的な調整
if performance_metrics.db_load > threshold {
    chunk_size = chunk_size / 2; // チャンクサイズを半分に
}

直列分割実行のメリット

1. データベース負荷の軽減

接続プールの効率的な活用

// 直列実行では1つの接続を順次使用
let mut connection = pool.acquire().await?;
for chunk in &chunks {
    let _ = connection.execute(&query, &chunk).await?;
}

// 並列実行では複数接続を同時使用
let handles: Vec<_> = chunks.iter().map(|chunk| {
    let pool = pool.clone();
    tokio::spawn(async move {
        let conn = pool.acquire().await?;
        conn.execute(&query, chunk).await
    })
}).collect();

メリット:

  • 接続数の制御: 同時接続数を制限してDB負荷を抑制
  • リソース競合の回避: 複数クエリによるロック競合を防止
  • 安定したパフォーマンス: 予測可能な実行時間

メモリ使用量の最適化

// 直列実行: メモリ使用量を制御
let mut all_records = Vec::new();
for chunk in &chunks {
    let records = process_chunk(chunk).await?;
    all_records.extend(records);
    // 必要に応じてメモリを解放
    if all_records.len() > 10000 {
        all_records = all_records.drain(..).collect();
    }
}

// 並列実行: 全チャンクの結果を同時保持
let all_results = futures::future::join_all(handles).await?;
let all_records: Vec<_> = all_results.into_iter().flatten().collect();

2. 実行計画の最適化

インデックス効率の向上

-- 小規模チャンクでの実行計画
EXPLAIN SELECT * FROM test_records WHERE id IN (?, ?, ?, ...);
-- type: range, key: PRIMARY, rows: 1000

-- 大規模バッチでの実行計画
EXPLAIN SELECT * FROM test_records WHERE id IN (?, ?, ?, ..., ?); -- 50000個
-- type: ALL, rows: 100212 (テーブルスキャン)

メリット:

  • インデックス使用: 小規模チャンクでインデックスが効率的に動作
  • クエリプラン最適化: データベースエンジンが最適なプランを選択
  • 統計情報の活用: 正確な統計情報に基づく実行計画

クエリキャッシュの効率化

// 直列実行: 同じクエリパターンを繰り返し実行
for chunk in &chunks {
    // 同じクエリ構造で異なるパラメータ
    let query = "SELECT * FROM test_records WHERE id IN (?, ?, ?, ...)";
    let _ = execute_query(query, chunk).await?;
}

// データベースがクエリプランをキャッシュ
// 2回目以降は最適化されたプランを使用

3. エラーハンドリングとデバッグ

段階的な処理とエラー復旧

// 直列実行: エラーが発生したチャンクを特定
let mut successful_chunks = Vec::new();
let mut failed_chunks = Vec::new();

for (i, chunk) in chunks.iter().enumerate() {
    match process_chunk(chunk).await {
        Ok(records) => {
            successful_chunks.push((i, records));
            info!("Chunk {} processed successfully", i);
        }
        Err(e) => {
            failed_chunks.push((i, e));
            error!("Chunk {} failed: {}", i, e);
            // 他のチャンクは継続処理
        }
    }
}

// 失敗したチャンクのみ再処理
for (i, _) in &failed_chunks {
    info!("Retrying chunk {}", i);
    // 再処理ロジック
}

メリット:

  • エラーの局所化: 特定のチャンクの失敗が全体に影響しない
  • 部分的な成功: 一部のチャンクが成功していれば結果を活用
  • デバッグの容易さ: 問題のあるチャンクを特定しやすい

進捗監視とログ出力

// 直列実行: 詳細な進捗監視
let total_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
    let start = Instant::now();
    
    info!("Processing chunk {}/{} (size: {})", i + 1, total_chunks, chunk.len());
    
    let result = process_chunk(chunk).await?;
    let duration = start.elapsed();
    
    info!("Chunk {} completed in {:?}, records: {}", 
           i + 1, duration, result.len());
    
    // 進捗率の計算
    let progress = ((i + 1) as f64 / total_chunks as f64) * 100.0;
    info!("Progress: {:.1}%", progress);
}

4. リソース管理とスケーラビリティ

メモリ使用量の制御

// 直列実行: メモリ使用量を段階的に制御
let mut total_memory = 0;
let memory_limit = 100 * 1024 * 1024; // 100MB

for chunk in &chunks {
    let chunk_memory = estimate_memory_usage(chunk);
    
    if total_memory + chunk_memory > memory_limit {
        // メモリ制限に達した場合の処理
        info!("Memory limit reached, processing remaining chunks with cleanup");
        cleanup_memory();
        total_memory = 0;
    }
    
    let records = process_chunk(chunk).await?;
    total_memory += estimate_memory_usage(&records);
    
    info!("Current memory usage: {:.2}MB", total_memory as f64 / 1024.0 / 1024.0);
}

スケーラビリティの向上

// 直列実行: システムリソースに応じた適応
let system_memory = get_available_memory();
let optimal_chunk_size = calculate_optimal_chunk_size(system_memory);

let chunks: Vec<Vec<String>> = ids
    .chunks(optimal_chunk_size)
    .map(|chunk| chunk.to_vec())
    .collect();

info!("System memory: {}MB, optimal chunk size: {}", 
       system_memory / 1024 / 1024, optimal_chunk_size);

5. 運用とメンテナンス

監視とアラート

// 直列実行: 詳細なパフォーマンス監視
let mut performance_metrics = Vec::new();

for (i, chunk) in chunks.iter().enumerate() {
    let start = Instant::now();
    let before_memory = get_memory_usage();
    
    let result = process_chunk(chunk).await?;
    
    let duration = start.elapsed();
    let after_memory = get_memory_usage();
    let memory_delta = after_memory - before_memory;
    
    performance_metrics.push(ChunkMetrics {
        chunk_index: i,
        size: chunk.len(),
        duration,
        memory_delta,
        success: true,
    });
    
    // パフォーマンス閾値チェック
    if duration.as_millis() > 5000 {
        warn!("Chunk {} took too long: {:?}", i, duration);
    }
    
    if memory_delta > 50 * 1024 * 1024 { // 50MB
        warn!("Chunk {} used too much memory: {}MB", i, memory_delta / 1024 / 1024);
    }
}

設定の柔軟性

// 環境に応じた設定調整
#[derive(Debug, Clone)]
pub struct SerialProcessingConfig {
    pub chunk_size: usize,
    pub memory_limit: usize,
    pub timeout_per_chunk: Duration,
    pub retry_attempts: u32,
    pub enable_progress_logging: bool,
    pub enable_performance_monitoring: bool,
}

impl Default for SerialProcessingConfig {
    fn default() -> Self {
        Self {
            chunk_size: 2000, // 最適値
            memory_limit: 100 * 1024 * 1024, // 100MB
            timeout_per_chunk: Duration::from_secs(30),
            retry_attempts: 3,
            enable_progress_logging: true,
            enable_performance_monitoring: true,
        }
    }
}

まとめ

主要な発見

  1. 最適チャンクサイズ: 2000件(1.20倍の高速化)
  2. インデックス効率の境界: 10000件まで有効
  3. パフォーマンス向上: 1.06倍〜1.20倍の範囲
  4. メモリ使用量: 最大25倍の削減

実用的な推奨事項

  • 標準運用: 2000件チャンク
  • 高負荷時: 1000件チャンク
  • バランス重視: 3000件チャンク
  • 避けるべき: 10000件以上のチャンクサイズ

今後の課題

  • 異なるデータベースエンジンでの検証
  • より複雑な検索条件での最適化
  • 並列処理との組み合わせ効果
  • 動的なチャンクサイズ調整

参考資料


この記事は、実際のベンチマーク結果に基づいて作成されています。実測データによる分析により、直列分割処理での最適なチャンクサイズの選択が明確になりました。

実装の際は、環境や要件に応じて適切なチャンクサイズを選択し、継続的なパフォーマンス監視を行うことをお勧めします。

コラボスタイル Developers

Discussion