Redisのいい感じの使い方(+アンチパターン)
Redisとは
Redis(Remote Dictionary Server)は、インメモリで動作するキー・バリュー型のデータベースです。メモリ上でデータを保持するため、従来のディスクベースのデータベースと比較して圧倒的に高速な読み書きを実現できます。
主な特徴
- 高速性: メモリ上でのデータ操作により、マイクロ秒単位の応答時間
- 豊富なデータ型: 文字列、リスト、セット、ハッシュ、ソート済みセットなど
- 永続化: RDBスナップショットやAOF(Append Only File)による永続化機能
- レプリケーション: マスター・スレーブ構成による高可用性
- クラスタリング: 水平スケーリングに対応
サンプルコード
こちらのGithubリポジトリ にて、本投稿で紹介しているコードを実際に実行することができます。
理解の一助になりましたら。
Redisを使って嬉しいパターン
1. キャッシュレイヤーとしての活用
最も一般的で効果的な使い方
import redis
import json
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user_profile(user_id):
# キャッシュから取得を試行
cache_key = f"user_profile:{user_id}"
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# キャッシュにない場合はDBから取得
user_data = fetch_from_database(user_id)
# キャッシュに保存(1時間のTTL)
r.setex(cache_key, timedelta(hours=1), json.dumps(user_data))
return user_data
メリット:
- データベースへの負荷軽減
- レスポンス時間の大幅な短縮
- スケーラビリティの向上
2. セッション管理
Webアプリケーションでのセッション情報の保存
from datetime import timedelta
import json
import redis
r = redis.Redis(host="redis", port=6379, db=0)
def create_session(user_id, session_data):
session_id = generate_session_id()
session_key = f"session:{session_id}"
# セッション情報を30分のTTLで保存
r.setex(
session_key,
timedelta(minutes=30),
json.dumps(
{
"user_id": user_id,
"session_data": session_data,
}
),
)
return session_id
def get_session(session_id):
r = redis.Redis(host="redis", port=6379, db=0)
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# アクセス時にTTLを延長
r.expire(session_key, timedelta(minutes=30))
return json.loads(session_data)
return None
メリット:
- 自動的なセッション有効期限管理
- 複数サーバー間でのセッション共有
- 高速なセッション検索
3. リアルタイム集計・カウンター
アクセス数やいいね数など、高頻度更新データの一次ストレージとして
from datetime import datetime
import redis
r = redis.Redis(host="redis", port=6379, db=0)
def increment_page_view(page_id):
key = f"page_views:{page_id}"
return r.incr(key)
def add_user_to_dau(user_id):
today = datetime.now().strftime("%Y-%m-%d")
key = f"dau:{today}"
return r.pfadd(key, user_id) # HyperLogLogを使用
def get_daily_active_users():
today = datetime.now().strftime("%Y-%m-%d")
key = f"dau:{today}"
return r.pfcount(key) # HyperLogLogを使用
メリット:
- アトミックな操作による正確性
- 高速な集計処理
- HyperLogLogによるメモリ効率的な概算カウント
4. 分散ロック
複数プロセス間での結合度を疎に保った排他制御
import time
import uuid
import redis
r = redis.Redis(host="redis", port=6379, db=0)
def acquire_lock(lock_name, timeout=10):
identifier = uuid.uuid4().hex
end = time.time() + timeout
while time.time() < end:
if r.set(lock_name, identifier, nx=True, ex=timeout):
return identifier
time.sleep(0.001)
return False
def release_lock(lock_name, identifier):
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return r.eval(lua_script, 1, lock_name, identifier)
メリット:
- 分散環境での安全な排他制御
- デッドロックの防止
- タイムアウト機能による安全性
5. Pub/Sub(発行・購読)
リアルタイム通知システム
Publisher
import json
import redis
r = redis.Redis(host="redis", port=6379, db=0)
def send_notification(channel, message):
r.publish(channel, json.dumps(message))
send_notification("notifications", "Hello, World!")
Subscriber
import json
import logging
import redis
logger = logging.getLogger()
r = redis.Redis(host="redis", port=6379, db=0)
def message_handler(message):
data = json.loads(message["data"])
logger.debug("Received message", extra={"data": data})
pubsub = r.pubsub()
pubsub.subscribe("notifications")
for message in pubsub.listen():
if message["type"] == "message":
message_handler(message)
メリット:
- リアルタイム通信
- 疎結合なアーキテクチャ
- スケーラブルな通知システム
こういう使い方しちゃだめなパターン
❌ アンチパターン1: プライマリデータベースとしての使用
ある大手SNS (仮名: SocialNet) は、ユーザー登録時に user:{id} を Redis だけで永続化。数年の運用後、Redis が 3 回クラスタ再構築時にメモリ破損し、全ユーザーデータが失われた。
永続化保証のない Redis への依存は致命的。データベースは「永続化DB + キャッシュ」の組み合わせが基本。
# ダメな例
def save_user_permanently(user_data):
r.set(f"user:{user_data['id']}", json.dumps(user_data))
# 永続化の保証がない!
問題点:
- メモリベースのため、サーバー再起動でデータ消失のリスク
- 大量データでメモリ不足になる可能性
- トランザクション機能が限定的
正しいアプローチ:
# 良い例
def save_user(user_data):
# プライマリDBに保存
database.save_user(user_data)
# キャッシュにも保存(TTL付き)
cache_key = f"user:{user_data['id']}"
r.setex(cache_key, timedelta(hours=1), json.dumps(user_data))
❌ アンチパターン2: 大きなオブジェクトの保存
オンラインレポートサービス(仮名: Reportify) で、PDF ファイルをそのまま report:{id} に pickle 化して保存。ピーク時に 10 MB × 50,000 件が同時ロードされ、実メモリ 200 GB に対して 500GB でリソース不足のためクラスタ全体がダウン。
大きいバイナリを Redis に直接格納すると、メモリ圧迫・ネットワーク遅延が爆発。
# ダメな例
def cache_large_report(report_id, report_data):
# 10MBのレポートデータをそのまま保存
r.set(f"report:{report_id}", pickle.dumps(report_data))
問題点:
- メモリ使用量の急激な増加
- ネットワーク転送時間の増大
- 他のキーへの影響
正しいアプローチ:
# 良い例
def cache_report_metadata(report_id, metadata):
# メタデータのみキャッシュ
r.setex(f"report_meta:{report_id}", timedelta(hours=2),
json.dumps(metadata))
def cache_report_summary(report_id, summary):
# サマリー情報のみキャッシュ
r.setex(f"report_summary:{report_id}", timedelta(hours=6),
json.dumps(summary))
❌ アンチパターン3: TTLの未設定
E‑commerce ストア(仮名: ShopRight) は、カート情報を cart:{user} に TTL なしで保存。セッションが切れたユーザーのカートデータが永遠に残り、クラスタ全体で 30 GB の古いデータが蓄積。
TTL を忘れると「メモリリーク」と呼ばれる状態に。古いデータは不要であれば必ず期限を設定すべき。
# ダメな例
def cache_user_data(user_id, data):
r.set(f"user:{user_id}", json.dumps(data))
# TTLが設定されていない!
問題点:
- 古いデータがメモリに永続的に残る
- メモリリークの原因
- データの整合性問題
正しいアプローチ:
# 良い例
def cache_user_data(user_id, data):
r.setex(f"user:{user_id}", timedelta(hours=1), json.dumps(data))
# または
def cache_with_default_ttl(key, data, ttl_hours=1):
r.setex(key, timedelta(hours=ttl_hours), json.dumps(data))
❌ アンチパターン4: 適切でないデータ構造の選択
チャットアプリ(仮名: TalkBox) は、ユーザーごとのメッセージ履歴を Set で保存。Set は順序を保持しないため、タイムライン表示が乱れ、ユーザーから「メッセージが前後している」と苦情。
データ構造は用途に合わせて選択しないと機能不具合やパフォーマンス低下が起きる。
# ダメな例:リストを使うべき場所でセットを使用
def add_user_activity(user_id, activity):
key = f"activities:{user_id}"
r.sadd(key, json.dumps(activity)) # 順序が保持されない
問題点:
- データの特性に合わないデータ構造
- パフォーマンスの劣化
- 機能的な制限
正しいアプローチ:
# 良い例:適切なデータ構造を選択
def add_user_activity(user_id, activity):
key = f"activities:{user_id}"
activity_data = json.dumps({
'activity': activity,
'timestamp': time.time()
})
r.lpush(key, activity_data) # リストで順序を保持
r.ltrim(key, 0, 99) # 最新100件のみ保持
❌ アンチパターン5: 接続の非効率な管理
クラウドサービス(仮名: CloudAPI) は、各 API リクエストで毎回 redis.Redis() を生成。
リクエスト数が 10,000/秒に達した際、接続確立のオーバーヘッドが CPU を圧迫し、レスポンスタイム 200 ms → 1,500 ms に急増。
接続プールを使わないと、頻繁な接続確立・解放がボトルネックになる。
# ダメな例
def get_cached_data(key):
# 毎回新しい接続を作成
r = redis.Redis(host='localhost', port=6379)
data = r.get(key)
r.close()
return data
問題点:
- 接続オーバーヘッドの増大
- パフォーマンスの劣化
- リソースの無駄使い
正しいアプローチ:
# 良い例:接続プールを使用
connection_pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=10
)
r = redis.Redis(connection_pool=connection_pool)
def get_cached_data(key):
return r.get(key)
発展: Redisクラスター構成
Redis ホスト1台でトラフィックが捌けなくなってきた時に検討したい、複数台でのインフラ構成。
1. Redis Sentinel + Master/Replica 構成
概要
1台の Master に対して複数台の Replica を配置し、Sentinel が自動フェイルオーバーを担当する構成です。
Master 障害時には Sentinel が Replica を昇格させ、サービス継続を担保します。
メリット
高可用性 (Master 障害時に自動昇格)。
読み込み負荷を Replica に分散できる。
デメリット
書き込みは Master に集中するため、スケールアウトには限界がある。
適用例
書き込みはそこまで多くないが「読み込みリクエストが多い」キャッシュ用途。
Sentinel1 や Master が停止しても、Sentinel2 や Master に昇格した Slave1 がサービスを継続します。
from datetime import datetime, timezone
from redis.sentinel import Sentinel
# Sentinel 経由で接続
sentinel = Sentinel(
[("redis-sentinel1", 26379), ("redis-sentinel2", 26379)], socket_timeout=0.5
)
# masterに書き込み
now = datetime.now(tz=timezone.utc)
master = sentinel.master_for("mycache", socket_timeout=0.5)
master.set("last_updated", now.isoformat())
# replicaから読み込み
replica = sentinel.slave_for("mycache", socket_timeout=0.5)
print(replica.get("foo")) # => b'bar'
2. Redis Cluster (水平分散)
概要
Cluster モードを有効化し、キーを ハッシュスロット (16,384) にマッピングして複数ノードに分散保存する仕組みです。
各ノードに Replica を持たせることで高可用性も実現できます。
メリット
読み書きを複数ノードに分散でき、真のスケールアウトが可能。
各ノードに Replica を置いて高可用性を実現できる。
デメリット
一部のコマンド(MGET, MSET, KEYS など)が制約される。
運用がやや複雑。
適用例
大規模キャッシュ / セッション管理 / リアルタイム分析。
from redis.cluster import ClusterNode, LoadBalancingStrategy, RedisCluster
# Redis Cluster 接続
startup_nodes = [
ClusterNode("redis1-cluster", 6379),
ClusterNode("redis2-cluster", 6379),
ClusterNode("redis3-cluster", 6379),
]
rc = RedisCluster(
startup_nodes=startup_nodes,
load_balancing_strategy=LoadBalancingStrategy.RANDOM_REPLICA,
decode_responses=True,
)
# 書き込み(キーは自動的にハッシュスロットに割り当てられる)
rc.set("user:1001", "Alice")
rc.set("user:1002", "Bob")
# 読み込み
print(rc.get("user:1001")) # Alice
print(rc.get("user:1002")) # Bob
3. マネージドサービス (AWS ElastiCache for Redis)
概要
AWS の ElastiCache for Redis は、クラスタリング・フェイルオーバー・バックアップ・監視を含めて フルマネージド で提供されるサービスです。
運用負荷を大幅に軽減でき、数分でスケーラブルな Redis クラスタを構築できます。
メリット
運用コストが低い (障害監視、バックアップ、フェイルオーバーを AWS が管理)。
スケールが容易 (数クリック / API でノード増設可能)。
デメリット
AWS 依存 (マルチクラウド戦略には制約)。
自前構築よりコストは高め。
適用例
運用リソースを最小化したい場合。
キャッシュをクラウドインフラに統合したい場合。
接続例 (Python, boto3 + redis-py)
import redis
import boto3
# boto3 で ElastiCache のエンドポイント取得 (例: describe-cache-clusters)
client = boto3.client("elasticache", region_name="ap-northeast-1")
response = client.describe_cache_clusters(CacheClusterId="my-redis-cluster", ShowCacheNodeInfo=True)
endpoint = response["CacheClusters"][0]["CacheNodes"][0]["Endpoint"]["Address"]
port = response["CacheClusters"][0]["CacheNodes"][0]["Endpoint"]["Port"]
# redis-pyで接続
r = redis.Redis(host=endpoint, port=port, decode_responses=True)
r.set("foo", "bar")
print(r.get("foo")) # bar
まとめ
Redisは適切に使用すれば、アプリケーションのパフォーマンスを大幅に向上させる強力なツールです。しかし、その高速性とシンプルさに惑わされて不適切な使い方をしてしまうと、逆に問題を引き起こす可能性があります。
また、シングルノードから始めて、必要に応じてクラスター構成へと発展させることで、スケーラビリティと高可用性を両立できます。
Redisを活用する際の鉄則:
- キャッシュとして使う: プライマリデータベースの代替ではなく、補完として
- TTLを必ず設定: メモリリークを防ぐため
- 適切なデータサイズ: 大きなオブジェクトは分割・圧縮を検討
- データ構造の選択: 用途に応じた最適なデータ型を選択
- 接続管理の最適化: 接続プールを活用してオーバーヘッドを削減
これらのポイントを意識して、Redisを効果的に活用していきましょう。
Redisをクラスター化して利用する際の鉄則:
- クラスター対応: スケール要件に応じた適切な構成選択
- 監視とメンテナンス: クラスター健康状態の継続的な監視
段階的にRedisの活用レベルを上げて、システムの成長に合わせて最適な構成を選択していきましょう。
Discussion