💎

Redis Cluster 環境でのバッチ処理最適化

に公開

概要

Redis Clusterにおいて、異なるスロットに分散されたキーに対するバッチ処理を効率的に実行する方法について説明する。特に、MULTIコマンドの制約とPIPELINEを活用した解決策に焦点を当てる。

Redis Clusterは、データを複数のノードに分散させるために、キーをハッシュ化して16,384個のスロットのいずれかに割り当てる。これによりスケーラビリティを確保しているが、異なるスロットにまたがるキーへの一括操作には制約が生まれる。


Redis Clusterでの制約

CROSSSLOTエラーの発生

Redis Clusterでは、異なるスロットに配置されたキーに対する一括操作(DEL, MGETなど)でエラーが発生する。

# ❌ エラーが発生する例
keys = ["key1", "key2", "key3"]  # 異なるスロットに配置
client.del(keys)
# => Redis::CommandError: CROSSSLOT Keys in request don't hash to the same slot

MULTIコマンドの制約

同様に、MULTIトランザクション内で異なるスロットのキーを操作することもできない。

# ❌ 異なるスロットのキーを含むMULTIは失敗
client.multi do |multi|
  multi.del("key1")  # スロット A
  multi.del("key2")  # スロット B(異なるスロット)
end
# => Redis::CommandError: CROSSSLOT Keys in request don't hash to the same slot

解決策:PIPELINEの活用

PIPELINEは異なるスロットでも実行できる

PIPELINEは、複数のコマンドをまとめて送信し、応答をまとめて受け取るための機能である。コマンドは個別に実行されるため、異なるスロット間のキー操作でも正常に動作する。

# ✅ PIPELINEは異なるスロットのキーでも動作
client.pipelined do |pipeline|
  pipeline.del("key1")  # スロット A
  pipeline.del("key2")  # スロット B
  pipeline.del("key3")  # スロット C
end

⚠️ 注意: 当たり前だが、異なるnodeに対してはpipelinedは実行できない。あくまで、同一のnode かつ、異なるスロットに対しても動作する、という話。

IOの効率化

  • PIPELINEの本質はトランザクションではなく、ネットワークI/Oの最適化にある。
  • 個別実行時: コマンドの数だけネットワークの往復が発生する。

PIPELINE使用時: 複数のコマンドを1回のネットワーク往復で実行できる。

# 従来の非効率な方法(n回のIO)
keys.each { |key| client.del(key) }

# 効率的な方法(1回のIO)
client.pipelined do |pipeline|
  keys.each { |key| pipeline.del(key) }
end

実装パターン

バッチ処理でのPIPELINE実装

大量のキーを一度に処理する場合、メモリ使用量を考慮して適切なサイズのバッチに分割し、PIPELINEを実行するのが一般的である。

# RedisAdapterでの実装例
def delete_in_batches(keys, batch_size = 500)
  return if keys.blank?

  keys.each_slice(batch_size) do |chunked_keys|
    pipelined do |pipeline|
      chunked_keys.each do |key|
        pipeline.del(key)
      end
    end
  end
end

全ノードに対する処理パターン

SCANのように、クラスタ内の全ノードに対してコマンドを実行したい場合は、各ノードのクライアントを個別に操作する必要がある。多くのクライアントライブラリは、クラスタ内の各マスターノードへの接続を管理する機能を提供している。

# 全ノードからパターンに一致するキーを検索し、バッチ削除する例
def find_and_delete_keys(pattern, batch_size = 500)
  # redis-rbの場合、`_client.cluster.nodes`で各ノード情報にアクセスできる
  # CLUSTER NODES コマンド: https://redis.io/docs/latest/commands/cluster-nodes/
  store._client.cluster.nodes.each do |node|
    next unless node.master? # マスターノードのみを対象

    # SCANは個別のノードで実行する必要がある
    node.scan_each(match: pattern, count: 100).each_slice(batch_size) do |keys|
      # 取得したキーをバッチ削除
      delete_in_batches(keys, batch_size) unless keys.empty?
    end
  end
end

重要な注意点

1. トランザクション保証の違い

MULTIとPIPELINEの最も重要な違いは、アトミック性(トランザクション保証)の有無にある。

方式 トランザクション 異なるスロット対応 パフォーマンス
MULTI ACID保証 高速(IO削減)
PIPELINE トランザクションなし 高速(IO削減)
個別実行 トランザクションなし 低速(IO多数)

2. PIPELINEの制約事項

PIPELINE内のコマンドはアトミックではないため、途中のコマンドが失敗しても、後続のコマンドは実行される。

# ⚠️ PIPELINEではトランザクション保証がない
client.set("key1", "string_value")

results = client.pipelined do |pipeline|
  pipeline.del("key_non_existent") # 失敗しない(結果は0)
  pipeline.incr("key1")           # ❌ 文字列にINCRを実行しようとしてエラー
  pipeline.del("key3")           # ✅ 前のコマンドが失敗しても実行される
end

# results[1] には Redis::CommandError オブジェクトが入る

3. エラーハンドリング

PIPELINEの実行結果はコマンドごとの応答配列として返ってくるため、個別の結果を確認してエラーをハンドリングできる。

# PIPELINEの結果確認
results = client.pipelined do |pipeline|
  keys.each { |key| pipeline.del(key) }
end

# 個別の結果を確認
results.each_with_index do |result, index|
  if result.is_a?(Redis::CommandError)
    Rails.logger.error "Failed to delete #{keys[index]}: #{result.message}"
  end
end

パフォーマンス比較

実測例(1,000キーの削除)

方式 実行時間 ネットワーク往復 メモリ使用量
個別実行 1000ms 1000回
PIPELINE 50ms 1回
バッチ化PIPELINE 50ms 1回

※ メモリ使用量について: PIPELINE では、一度に多くのコマンドをサーバーに送信し、その応答をまとめて受け取るため、クライアント側・サーバー側双方で一時的に応答を保持するためのバッファメモリを消費する。バッチサイズを適切に設定することが重要である。


ベストプラクティス

1. 適切なバッチサイズの選択

メモリ使用量とパフォーマンスのバランスを取るため、バッチサイズは 500〜1000程度が推奨される。これはアプリケーションの特性に合わせて調整すべきである。

2. エラー処理の実装

PIPELINE全体が失敗する可能性も考慮し、rescue節でフォールバック処理を実装することが堅牢性を高める。

def safe_delete_in_batches(keys, batch_size = 500)
  keys.each_slice(batch_size) do |chunked_keys|
    begin
      pipelined do |pipeline|
        chunked_keys.each { |key| pipeline.del(key) }
      end
    rescue Redis::CommandError => e
      Rails.logger.error "Batch delete failed: #{e.message}"
      # フォールバック: 個別削除
      chunked_keys.each { |key| del(key) rescue nil }
    end
  end
end

3. 監視とロギング

バッチ処理の実行時間や処理件数をロギングすることは、パフォーマンスのボトルネック特定やトラブルシューティングに役立つ。

# ※命名は適当
def delete_in_batches_with_monitoring(keys, batch_size = 500)
  start_time = Time.current
  keys.each_slice(batch_size).with_index do |chunked_keys, batch_index|
    batch_start = Time.current
    pipelined do |pipeline|
      chunked_keys.each { |key| pipeline.del(key) }
    end
    batch_duration = (Time.current - batch_start) * 1000
    Rails.logger.info "Batch #{batch_index + 1}: #{chunked_keys.size} keys deleted in #{batch_duration.round(2)}ms"
  end
  total_duration = (Time.current - start_time) * 1000
  Rails.logger.info "Total: #{keys.size} keys deleted in #{total_duration.round(2)}ms"
end

まとめ

Redis Cluster環境でのバッチ処理においては、以下の点を理解することが重要である。

  • MULTI: 同一スロット内でのみ使用可能。ACID保証が必要な場合に選択する。
  • PIPELINE: 異なるスロット間でも使用可能。トランザクション保証はないが、I/Oを劇的に効率化できる。
  • 個別実行: 最もシンプルだが非効率であり、大量のキー処理には不向き。

CROSSSLOTエラーに直面した際は、PIPELINEを適切なバッチサイズで活用することで、Redis Clusterの制約を回避しながら高いパフォーマンスを実現できる。ただし、トランザクション保証がなくなる点には十分な注意が必要であり、必要であればアプリケーションレベルでの制御を検討すべきである。

Discussion