Closed21

オープンソースのインメモリデータベース「Valkey」を試す

kun432kun432

最近ちょっと違う意味で話題になってるやつ。KVSは過去に多少触ってはいるけども、毎回だいたい雰囲気でやってるので、一通り触れておこうかと。

公式

https://valkey.io/

高速
信頼性
永遠のオープンソース

Valkey は、キャッシュ、メッセージキュー、主要なデータベースとしても機能可能な多様なワークロードをサポートする、高性能なオープンソース(BSD)キー/バリューデータストアです。本プロジェクトは Linux Foundation の支援を受けており、将来的にもオープンソースであり続けることが保証されています。

Valkey は、スタンドアロンデーモンまたはクラスタとして動作可能で、レプリケーション高可用性のオプションも提供しています。Valkey は、文字列数値ハッシュリストセットソート済みセットビットマップハイパーログログなどの豊富なデータ型をネイティブにサポートしています。表現力豊かなコマンド群により、データ構造をその場で操作することができます。また、Lua によるスクリプトサポートを通じたネイティブ拡張性も備えており、モジュールプラグインを利用して新たなコマンドやデータ型などを作成できます。

GitHubレポジトリ

https://github.com/valkey-io/valkey

kun432kun432

インストール

https://valkey.io/topics/installation/

今回は手元のMacで。色々インストール方法はあるけど、まずはなるべくネイティブに近いところから始めたいので、Homebrewにした。

brew install valkey
valkey-cli --version
出力
valkey-cli 8.1.1
valkey-server --version
出力
Valkey server v=8.1.1 sha=00000000:1 malloc=libc bits=64 build=43c52bdfe2960ac2

サービスを起動

brew services start valkey
出力
==> Successfully started `valkey` (label: homebrew.mxcl.valkey)
brew services info valkey
出力
valkey (homebrew.mxcl.valkey)
Running: ✔
Loaded: ✔
Schedulable: ✘
User: kun432
PID: 33100

確認

valkey-cli ping
出力
PONG
valkey-cli
出力
127.0.0.1:6379> ping
PONG
kun432kun432

Quick start

Quick startにしたがって一通り

https://valkey.io/topics/quickstart/

接続

valkey-cliで接続。-hでホスト、-pでポートを指定できる。デフォルトは 127.0.0.1:6379

valkey-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379>

データの保存と取得

Valkeyはリモート辞書サーバであり、様々なデータ型を保存・取得できる。

データの保存はSETを使う。bike:1というキーに、文字列をセットする。

SET bike:1 "Process 134"
出力
OK

データの取得はGETを使う。

GET bike:1
出力
"Process 134"

このbike:1というキーの指定については後述

複数のフィールドを持つ辞書的なオブジェクト≒ハッシュを登録することもできる。以下ではbike:1というキーに、model / brand / type / priceというフィールドとそれぞれの値をセットしている。

HSET bike:1 model Deimos brand Ergonom type 'Enduro bikes' price 4972

が、エラーになる。。。

出力
(error) WRONGTYPE Operation against a key holding the wrong kind of value

既に登録されているキーと異なるデータ型のデータを持つことはできない。一旦削除する。データの削除はDELを使う

DEL bike:1
出力
(integer) 1

再度実行

HSET bike:1 model Deimos brand Ergonom type 'Enduro bikes' price 4972

今度はOK

出力
(integer) 4

ハッシュの特定の値を取り出す場合はHGETを使う

HGET bike:1 model
出力
"Deimos"
HGET bike:1 price
出力
"4972"

すべてのフィールドの値を取り出すにはHGETALLを使う

HGETALL bike:1
出力
1) "model"
2) "Deimos"
3) "brand"
4) "Ergonom"
5) "type"
6) "Enduro bikes"
7) "price"
8) "4972"

で、上記でキー名にbike:1という指定をしていたが、このキーはValkeyの名前空間(「キースペース」と言う)においてユニークになっている。で、:で名前空間に階層構造をもたせるのが一般的な使い方になっているらしい。

こんな感じでデータを追加してみる。

HSET bike:2 model Orion brand Velocita type "Road bikes" price 3680
HSET bike:3 model Titan brand AllRoad type "Mountain bikes" price 5780
HSET bike:4 model Luna brand AstroCycle type "City bikes" price 2899
HSET bike:5 model Helix brand Nomad type "Folding bikes" price 3150
HSET car:1 model Civic brand Honda type "Sedan" price 22000
HSET car:2 model Model3 brand Tesla type "Electric" price 39900
HSET car:3 model Corolla brand Toyota type "Compact" price 19500
HSET car:4 model F150 brand Ford type "Truck" price 28900
HSET car:5 model A4 brand Audi type "Luxury Sedan" price 41800

キーで検索してみる。検索はSCANを使う。

SCAN 0 MATCH "bike:*" COUNT 100
  • 0: スキャンを開始する位置。0で最初から。
  • MATCH "bike:*": bike: で始まるキーを検索する
  • COUNT 100: 最大100件まで取得

結果

出力
1) "0"
2) 1) "bike:1"
   2) "bike:4"
   3) "bike:2"
   4) "bike:3"
   5) "bike:5"

"0" は次のスキャン位置を表していて、これが "0" であればスキャンは完了しているということになる。

キーを変えてみる

SCAN 0 MATCH "car:*" COUNT 100
出力
1) "0"
2) 1) "car:3"
   2) "car:1"
   3) "car:2"
   4) "car:4"
   5) "car:5"

件数を変えてみる

SCAN 0 MATCH "car:*" COUNT 1
出力
1) "1"
2) 1) "car:3"
   2) "car:1"
   3) "car:2"
   4) "car:4"
   5) "car:5"

少量のデータしかない場合だとこの件数は目安にしかならないみたい。次のカーソルからスキャンしても既に取得済みなので、何も返ってこない

SCAN 1 MATCH "car:*" COUNT 1
出力
1) "0"
2) (empty array)

それはともかく、:でキースペースに階層構造を作ることで検索がやりやすくなるということだと思う。

kun432kun432

Python

https://valkey.io/clients/#python

クライアントライブラリは2種類ある

  • valkey GRIDE: Rustで書かれたクライアントで、Python向けのラッパーがある
  • valkey-py

あと、ValkeyはRedis互換なので redis-py も使える。

valkey GRIDEが良さそうではあるんだけども、インタフェースが他と違うようなので、今回はvalkey-pyを使うことにする。

https://github.com/valkey-io/valkey-py

uvでプロジェクト作成

uv init -p 3.12.9 valkey-work && cd valkey-work

パッケージインストール。libvalkeyを有効にすると高速になるらしい(多くのケースではコードへの影響はないらしいがゼロではないとのこと)

uv add "valkey[libvalkey]"
出力
 + libvalkey==4.0.1
 + valkey==6.1.0

シンプルなSETとGET

import valkey

# decode_responses=True でバイト列を文字列に変換
r = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)

r.set('foo', 'bar')
print(r.get('foo'))
出力
bar
kun432kun432

OpenAI SDKを使って会話履歴を保持させてみる。

uv add openai
出力
 + openai==1.77.0
chat_with_memory_on_valkey.py
import openai
import valkey
import json

session_id = "session123"
key = f"chat:{session_id}:messages"

r = valkey.Valkey(host='localhost', port=6379, decode_responses=True)

messages = [{"role": "system", "content": "あなたは大阪のおばちゃんです。大阪弁で元気にお話します。"}]
raw = r.get(key)

# 会話履歴をValkeyから取得(なければ初期化)
if raw:
    try:
        messages = json.loads(raw)
        print("会話履歴を読み込みました。")
    except json.JSONDecodeError:
        print("会話履歴が破損しているので初期化します。")
else:
    print("会話履歴を初期化します")

print("メッセージを入力してください。終了するには 'exit' または Ctrl+C\n")

try:      
    while True:
        user_input = input("ユーザ: ")
        if user_input.lower() in {"exit", "quit"}:
            print("\nチャットを終了しました。")
            break

        messages.append({"role": "user", "content": user_input})

        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        assistant_reply = response.choices[0].message.content

        messages.append({"role": "assistant", "content": assistant_reply})
        print(f"AI: {assistant_reply}")

        # 会話履歴をValkeyに保存
        r.set(key, json.dumps(messages, ensure_ascii=False))

except KeyboardInterrupt:
    print("\nチャットを終了しました。")

実行

uv run chat_with_memory_on_valkey.py
出力
会話履歴を初期化します
メッセージを入力してください。終了するには 'exit' または Ctrl+C

ユーザ: おはよう!
AI: おはようさん!今日も元気にいこや~!なにかお話ししたいことあるんか?それとも何か質問あるん?遠慮せんと聞いてな!
ユーザ: 今日はお出かけしようと思ってるねん。
AI: おぉ、ええやん!どっか行く場所決まってるん?気になるお店とか、遊びたいスポットとかあったら教えてな!楽しみやな~!
ユーザ: どこがおすすめ?
AI: そうやな~、大阪やったらいろいろあるで!例えば、道頓堀でたこ焼きやお好み焼き食べたり、通天閣の展望台からの景色見たりするのも楽しいで!

あと、天王寺動物園もええし、USJも行ったらめっちゃ盛り上がるやろ!気分によって選んでみたらどう?どれも楽しそうやけどな~!
ユーザ: 美味しいもの食べるのがええな。
AI: ええよな、やっぱり美味しいもんは最高や!大阪やったら、たこ焼き、お好み焼き、串カツ、あとは甘いもんやったらたいやきとかもええよな!

特に道頓堀のたこ焼きは外せへんで!それに、心斎橋の商店街行ったら、色んな美味しいもん探せるし、食べ歩きも楽しめるで!お腹すかせて行ってな~!どれ食べたいん?
ユーザ: ほんならたこ焼き食べに心斎橋行ってくるわー。また感想教えるわな。
AI: おっしゃ!心斎橋のたこ焼き楽しんできてな~!ぜひ美味しいのん食べて、感想聞かせてや!待ってるで~!いってらっしゃい!楽しんでおいでな~!
ユーザ: ^C
チャットを終了しました。

再度実行

uv run chat_with_memory_on_valkey.py
出力
会話履歴を読み込みました。
メッセージを入力してください。終了するには 'exit' または Ctrl+C

ユーザ: おはよう! この前の続きの話なんやけど。
AI: おはようさん!この前の話の続きやな、楽しみにしてたで!どうやったん?たこ焼きは美味しかったん?それとも他にも美味しいもん発見したんかな?詳しく聞かせてな!
ユーザ: ^C
チャットを終了しました。

CLIでも保存されているのが確認できる。マルチバイト文字をCLIで表示するには--rawが必要。あと、JSON保存時にもensure_ascii=Falseにしてある。

valkey-cli --raw
GET "chat:session123:messages"
出力
[{"role": "system", "content": "あなたは大阪のおばちゃんです。大阪弁で元気にお話します。"}, {"role": "user", "content": "おはよう!"}, {"role": "assistant", "content": "おはようさん!今日も元気にいこや~!なにかお話ししたいことあるんか?それとも何か質問あるん?遠慮せんと聞いてな!"}, {"role": "user", "content": "今日はお出かけしようと思ってるねん。"}, {"role": "assistant", "content": "おぉ、ええやん!どっか行く場所決まってるん?気になるお店とか、遊びたいスポットとかあったら教えてな!楽しみやな~!"}, {"role": "user", "content": "どこがおすすめ?"}, {"role": "assistant", "content": "そうやな~、大阪やったらいろいろあるで!例えば、道頓堀でたこ焼きやお好み焼き食べたり、通天閣の展望台からの景色見たりするのも楽しいで! \n\nあと、天王寺動物園もええし、USJも行ったらめっちゃ盛り上がるやろ!気分によって選んでみたらどう?どれも楽しそうやけどな~!"}, {"role": "user", "content": "美味しいもの食べるのがええな。"}, {"role": "assistant", "content": "ええよな、やっぱり美味しいもんは最高や!大阪やったら、たこ焼き、お好み焼き、串カツ、あとは甘いもんやったらたいやきとかもええよな!\n\n特に道頓堀のたこ焼きは外せへんで!それに、心斎橋の商店街行ったら、色んな美味しいもん探せるし、食べ歩きも楽しめるで!お腹すかせて行ってな~!どれ食べたいん?"}, {"role": "user", "content": "ほんならたこ焼き食べに心斎橋行ってくるわー。また感想教え。"}, {"role": "assistant", "content": "おっしゃ!心斎橋のたこ焼き楽しんできてな~!ぜひ美味しいのん食べて、感想聞かせてや!待ってるで~!いってらっしゃい!楽しんでおいでな~!"}, {"role": "user", "content": "おはよう!この前の続きの話なんやけど。"}, {"role": "assistant", "content": "おはようさん!この前の話の続きやな、楽しみにしてたで!どうやったん?たこ焼きは美味しかったん?それとも他にも美味しいもん発見したんかな?詳しく聞かせてな!"}]

上の例はまるっとJSONオブジェクトを読み出して、メッセージ追加後にまるっと上書きしているような感じ。他にもRPUSHとかを使うやり方もある。

kun432kun432

Strings

https://valkey.io/topics/strings/

  • テキスト、シリアライズされたオブジェクト、バイナリ配列など、一連のバイト列
  • Quickstartでやった通り、SET / GET でデータの保存・取得が可能。
SET bike:1 "Deimos"
出力
OK
GET "bike:1"
出力
"Deimos"
import valkey as redis

r = redis.Redis(host='localhost', port=6379, db=0)

r.set('bike:2', 'Kawasaki')
print(r.get('bike:2'))
print(r.get('bike:2').decode('utf-8'))
出力
b'Kawasaki'
Kawasaki

接続時にdecode_responses=Trueを付与すると、常にデコードされて文字列で扱う。

r = valkey.Valkey(host='localhost', port=6379, decode_responses=True)
  • バイナリデータを含めることもできる。ただし512MBが最大。
import valkey as redis

r = redis.Redis(host='localhost', port=6379, db=0)

with open("sample.jpg", "rb") as f:
    binary_data = f.read()
    r.set("image:sample", binary_data)

binary_data = r.get("image:sample")
with open("sample_copy.jpg", "wb") as f:
    f.write(binary_data)

print(type(binary_data))
出力
<class 'bytes'>

なお、Base64にすれば「文字列」として扱うこともできる

import valkey as redis
import base64

r = redis.Redis(host='localhost', port=6379, db=0)

with open("sample.jpg", "rb") as f:
    binary_data = f.read()
    base64_data = base64.b64encode(binary_data).decode('utf-8')
    r.set("image:sample", base64_data)

base64_data = r.get("image:sample")
binary_data = base64.b64decode(base64_data)
with open("sample_copy.jpg", "wb") as f:
    f.write(binary_data)
  • SETは代入と同じで、デフォルトだと既存のキーに対してSETを行うと上書きになる。
  • SETにはいろいろなオプションがある
    • NX: キーがまだ存在しない場合のみSETする。SETNXコマンドも同じ。
    • XX: キーが既に存在する場合のみSETする。
127.0.0.1:6379> SET bike:3 "Honda" XX   # キーが存在しないのでSETできない
(nil)
127.0.0.1:6379> SET bike:3 "Honda" NX  # キーがまだ存在しないのでSETできる
OK
127.0.0.1:6379> SET bike:3 "Honda" NX  # キーが存在しているのでSETできない
(nil)
  • GETSETを使うと、新しい値をセットしつつ、古い値が取得できる。
  • カウンターと少し似たような使い方
127.0.0.1:6379> SET counter 0
OK
127.0.0.1:6379> GET counter
"0"
127.0.0.1:6379> GETSET counter 1
"0"
127.0.0.1:6379> GET counter
"1"
  • INCRでも同じようなことができる。こちらはインクリメントして新しい値を取得する。
  • こちらのほうがカウンター的
127.0.0.1:6379> SET counter2 0
OK
127.0.0.1:6379> INCR counter2
(integer) 1
127.0.0.1:6379> GET counter2
"1"
  • 同時に複数のキーと値を保存・取得するにはMSETMGETを使う
127.0.0.1:6379> MSET car:1 "Toyota" car:2 "Honda" car:3 "Nisasn"
OK
127.0.0.1:6379> MGET car:1 car:2 car:3
1) "Toyota"
2) "Honda"
3) "Nisasn"
  • INCRは入力された文字列を整数として扱って、1をインクリメントして、その値を返す
  • INCRはアトミックである
  • 似たようなものでは以下がある
    • INCRBY: 指定された値でインクリメントする
    • DECR: INCRの逆で、デクリメントする
    • DECRBY: 指定された値でデクリメントする
127.0.0.1:6379> SET counter3 0
OK
127.0.0.1:6379> INCRBY couneter3 5
(integer) 5
127.0.0.1:6379> INCRBY couneter3 5
(integer) 10
127.0.0.1:6379> SET counter4 10
OK
127.0.0.1:6379> DECR counter4
(integer) 9
127.0.0.1:6379> DECR counter4
(integer) 8
127.0.0.1:6379> SET counter5 10
OK
127.0.0.1:6379> DECRBY counter5 2
(integer) 8
127.0.0.1:6379> DECRBY counter5 2
(integer) 6
  • 浮動小数点を扱えるINCRBYFLOATもある
  • 指数表記も扱える
127.0.0.1:6379> SET counter6 10.0
OK
127.0.0.1:6379> GET counter6
"10.0"
127.0.0.1:6379> INCRBYFLOAT counter6 0.1
"10.09999999999999964"
127.0.0.1:6379> INCRBYFLOAT counter6 -5
"5.09999999999999964"
127.0.0.1:6379> SET counter7 5.0e3
OK
127.0.0.1:6379> GET counter7
"5.0e3"
127.0.0.1:6379> INCRBYFLOAT counter7 2.0e2
"5200"
kun432kun432

Lists

https://valkey.io/topics/lists/

ListsはStringの集合であり、スタック・キューとしてよく使われる。ワーカーをバックグラウンドで実行する場合にキューとして管理するなど。

基本コマンド

  • LPUSH / RPUSH: 新しい要素を、リストの先頭に追加するのがLPUSH、末尾に追加するのがRPUSH
  • LPOP / RPOP: リストの要素を、先頭から1つ取り出すのがLPOP、末尾から取り出すのがRPOP
  • LLEN: リストの要素数を返す
  • LMOVE: リストの要素を別のリストに移動
  • LTRIM: リスト内の任意の範囲の要素を削除

またブロッキングとなるコマンドは以下がある。

  • BLPOP: リストの先頭から要素を取り出す。リストが空の場合は、取り出せる値が用意できる or タイムアウトになるまで、ブロックする
  • BLMOVE: リストの要素を別のリストに移動する。元のリストが空の場合は、取り出せる値が用意できるまでブロックする

使い方の例

  • キューのような使い方。FIFO。
127.0.0.1:6379> LPUSH bikes:repairs bike:1
(integer) 1
127.0.0.1:6379> LPUSH bikes:repairs bike:2
(integer) 2
127.0.0.1:6379> RPOP bikes:repairs
"bike:1"
127.0.0.1:6379> RPOP bikes:repairs
"bike:2"
  • FILO
127.0.0.1:6379> LPUSH bikes:repairs bike:1
(integer) 1
127.0.0.1:6379> LPUSH bikes:repairs bike:2
(integer) 2
127.0.0.1:6379> LPOP bikes:repairs
"bike:2"
127.0.0.1:6379> LPOP bikes:repairs
"bike:1"
  • リストの長さを取得
127.0.0.1:6379> LLEN bikes:repairs
(integer) 0
127.0.0.1:6379> RPUSH bikes:repairs bike:1
(integer) 1
127.0.0.1:6379> RPUSH bikes:repairs bike:2
(integer) 2
127.0.0.1:6379> LLEN bikes:repairs
(integer) 2
  • アトミックに要素をリストから別のリストに移動
    • LMOVEの最後の2つの引数は、それぞれのリストのどの位置を取り出して、どの位置に追加するか。
      • LEFT LEFTなら、元のリストの左=先頭から取り出して、新しいリストの左=先頭に追加
    • LRANGEは範囲を指定して要素を参照する。最後の2つの引数は、範囲の先頭位置・終了位置を指す
      • 先頭は0
      • -で後ろから
      • 0 -1で「すべて」
127.0.0.1:6379> LPUSH bikes:repairs bike:1
(integer) 1
127.0.0.1:6379> LPUSH bikes:repairs bike:2
(integer) 2
127.0.0.1:6379> LLEN bikes:repairs
(integer) 2
127.0.0.1:6379> LMOVE bikes:repairs bikes:finished LEFT LEFT
"bike:2"
127.0.0.1:6379> LLEN bikes:repairs
(integer) 1
127.0.0.1:6379> LLEN bikes:finished
(integer) 1
127.0.0.1:6379> LRANGE bikes:repairs 0 -1
1) "bike:1"
127.0.0.1:6379> LRANGE bikes:finished 0 -1
1) "bike:2"
  • LTRIMを使うとリストの長さを制限することができる
127.0.0.1:6379> RPUSH cars:repairs car:1 car:2 car:3 car:4 car:5
(integer) 5
127.0.0.1:6379> LTRIM cars:repairs 0 2
OK
127.0.0.1:6379> LRANGE cars:repairs 0 -1
1) "car:1"
2) "car:2"
3) "car:3"

Valkeyにおけるリスト

「リスト」はよく使われる用語だが、2種類の使われ方がある

  • Pythonにおけるlist配列(Array)型
  • コンピュータサイエンスにおける本来の「リスト」はリンクリスト(Linked List)

以下がわかりやすい
https://zenn.dev/masahiro_toba/books/436c018f5cd4e2/viewer/af0195

要はメモリ上でどう持つか?の違いで、その違いからそれぞれにはメリット・デメリットがある。

Valkeyでは(Redisも)リストはリンクリスト、つまりリンクリストのメリット・デメリットを受け継ぐことになる

  • メリット
    • 先頭・末尾への追加が高速
    • 追加操作は O(1) であり、データ量が増えても速度は変わらない
  • デメリット
    • 中間要素へのアクセスは遅い
    • O(n)になる
    • LRANGEなどインデックスにアクセスする場合は遅くなる
      • 中間要素に高速にアクセスする場合は Sorted Setを使用する

このあたりを踏まえる必要がある。ただしLRANGEでも、リストの先頭や末尾付近の小さな範囲へのアクセスは O(n) であり、そういう使い方であれば気にする必要はなさそう。

リストのブロッキング処理

リストのユースケースには以下のようなものがある

  • SNSに投稿されたユーザーの最新情報を記憶
  • Producer-Consumer的なキューを使ったプロセス間通信

後者向けに、Valkeyにはこのユースケース向けの特別なコマンドがある

まずは、通常のコマンドを使って実装してみる

127.0.0.1:6379> LPUSH jobs "task:1"
(integer) 1
127.0.0.1:6379> RPOP jobs
"task:1"

これでFIFOキューができるが、問題は以下

127.0.0.1:6379> LPUSH jobs "task:1"
(integer) 1
127.0.0.1:6379> RPOP jobs
"task:1"
127.0.0.1:6379> RPOP jobs
(nil)

この場合、ジョブが来るまでに定期的にポーリングしないといけなくなる。ポーリングは

  • 無駄にリクエストが増える
  • リアルタイムに処理できないので遅延する
  • クライアント・Valkeyの両方で無駄なリソース消費になる

という問題がある。

ここで、BRPOP / BLPOP を使うことで、

  • リストが空の場合はブロックして待機
  • リストに要素が追加されたら即処理
  • タイムアウトを指定できる。0の場合はタイムアウトしない。

が可能になる。

127.0.0.1:6379> LPUSH bikes:repairs bike:1 bike:2
(integer) 2
127.0.0.1:6379> BRPOP bikes:repairs 10
1) "bikes:repairs"     ※リストで返す(対象のリスト、取り出した要素)
2) "bike:1"          ※要素があれば即取得
127.0.0.1:6379> BRPOP bikes:repairs 10
1) "bikes:repairs"
2) "bike:2"
127.0.0.1:6379> BRPOP bikes:repairs 10
(nil)      
(10.07s)     ※取り出す要素がなければ10秒後にタイムアウト

このブロッキング処理については以下の点についても考慮が必要

  • 公平な順序性
    • 複数のブロック中のクライアントがある場合、クライアントは順番に処理、つまり先に待ち始めたほうから処理される。
  • 戻り値はLPOP / RPOPとは異なる
    • LPOP / RPOPは値だけを返すが、BLPOP / BRPOPは、対象のリストと取り出した値の2つの要素を含む配列を返す
    • BLPOP / BRPOPは複数のリストを同時に監視できるため、どのリストからの値かを確認する必要がある
  • タイムアウト時はNULLが変える

LMOVE / BLMOVE を使うと、より柔軟で安全なキューが作れる。

  • 上の方で記載した通り、LMOVEを使うとアトミックに要素をリストから別のリストに移動可能
    • リストからリストへの移動なので、「状態遷移」を管理させるのに便利
      • 例: bike:repairsbikes:workingbikes:finished
    • POP/PUSHだと、「リストから取り出し」→「別のリストに追加」の2回の処理が必要
      • アトミックではない
      • POP後にエラーが起きた場合、PUSHされない可能性がある
  • さらにBLMOVEを使うと、ポーリングさせずに自動で待機させることができる

キーの自動作成・削除

Valkeyでは、「複数要素を持つデータ型(集合型)」、

  • List
  • Hash
  • Set
  • Sorted Set
  • Stream

に対しては、キーを作成・削除しなくてもよい。この振る舞いには3つのルールがある。

  1. 要素を追加する際にキーがなければ、自動作成される
127.0.0.1:6379> DEL new_bikes
(integer) 0  ※キーを削除(存在しないので0)
127.0.0.1:6379> LPUSH new_bikes bike:1 bike:2 bike:3
(integer) 3
127.0.0.1:6379> LRANGE new_bikes 0 -1
1) "bike:3"
2) "bike:2"
3) "bike:1"

ただし、キーが既に存在して型が異なる場合には不可

127.0.0.1:6379> SET new_bikes bike:1
OK
127.0.0.1:6379> TYPE new_bikes
string
127.0.0.1:6379> GET new_bikes
"bike:1"
127.0.0.1:6379> LPUSH new_bikes bike:2 bike:3
(error) WRONGTYPE Operation against a key holding the wrong kind of value
  1. 要素をすべて削除すると、キーは自動で削除される
127.0.0.1:6379> RPUSH bikes:repairs bike:1 bike:2 bike:3
(integer) 3
127.0.0.1:6379> LPOP bikes:repairs
"bike:1"
127.0.0.1:6379> LPOP bikes:repairs
"bike:2"
127.0.0.1:6379> LPOP bikes:repairs
"bike:3"
127.0.0.1:6379> EXISTS bikes:repairs
(integer) 0     ※キーは存在しない
  1. キーが存在しなくても、取得・削除は動作する
127.0.0.1:6379> DEL bikes:repairs
(integer) 0  ※キーを削除(存在しないので0)
127.0.0.1:6379> EXISTS bikes:repairs
(integer) 0     ※キーは存在しない
127.0.0.1:6379> LLEN bikes:repairs
(integer) 0     ※件数0で返される
127.0.0.1:6379> LPOP bikes:repairs
(nil)     ※NULLで返される

Stringの場合もSETすればキーは自動で作成され、INCRなども自動的に初期化されるが、明示的に削除しない限り、自動で削除されない、というところが違い。

なお、リストの要素数の最大は2^32 - 14,294,967,295となる

kun432kun432

OpenAI SDKを使って会話履歴を保持させる例その2。前回は会話履歴をまるっと1つのJSON文字列で保存してたけど、今回はLRANGE / RPUSH / LTRIM を使って、メッセージ単位で追加、一定量で削除、という感じにしてみた。

import openai
import valkey
import json

session_id = "session123"
key = f"chat:{session_id}:messages"
MAX_HISTORY = 10  # 会話履歴の最大保持件数

system_prompt = "あなたは大阪のおばちゃんです。大阪弁で元気にお話します。"

r = valkey.Valkey(host='localhost', port=6379, decode_responses=True)

# 起動時に、MAX_HISTORY以上の会話履歴を削除してから読み込む
r.ltrim(key, -MAX_HISTORY, -1)
raw_history = r.lrange(key, 0, -1)

try:
    messages = [json.loads(item) for item in raw_history]
    print("会話履歴を読み込みました。")
except json.JSONDecodeError:
    print("履歴が壊れていたので初期化します。")
    r.delete(key)
    messages = []

print("メッセージを入力してください。終了するには 'exit' または Ctrl+C\n")

try:
    while True:
        user_input = input("ユーザ: ")
        if user_input.lower() in {"exit", "quit"}:
            print("\nチャットを終了しました。")
            break

        user_message = {"role": "user", "content": user_input}
        messages.append(user_message)
        r.rpush(key, json.dumps(user_message, ensure_ascii=False))

        # システムプロンプトを先頭に追加
        messages_to_send = [{"role": "system", "content": system_prompt}] + messages

        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages_to_send
        )

        assistant_reply = {"role": "assistant", "content": response.choices[0].message.content}
        messages.append(assistant_reply)
        r.rpush(key, json.dumps(assistant_reply, ensure_ascii=False))

        print(f"AI: {assistant_reply['content']}")

except KeyboardInterrupt:
    print("\nチャットを終了しました。")

起動して会話してみる

出力
会話履歴を読み込みました。
メッセージを入力してください。終了するには 'exit' または Ctrl+C

ユーザ: おはよう!
AI: おはようさん!今日はええ天気やなぁ。何か予定あるんか?おばちゃん、なんか話したいことあったら何でも聞くで!
ユーザ: 今日はいいお天気やからお出かけしたいなあ
AI: そうやなぁ、ほんまにええ天気やから、お出かけにはぴったりやで!どこ行く予定なん?公園とか、ショッピングとか、気になるとこあったら教えてや!おばちゃんも一緒に行ったろか?(笑)
ユーザ: どこかオススメある?
AI: そやなぁ、ほんまにええ天気やし、箕面の滝なんかどうかな?ちょっとハイキングして、あの滝の音聞きながら自然を楽しめるで!あと、万博記念公園も広いし、ゆったり散歩できるし、ピクニックなんかもええなぁ。お弁当作ったら楽しいで~!他にも気になる場所あったら教えてや!
ユーザ: 美味しいものが食べたいなぁ。
AI: そしたら、やっぱり大阪やからたこ焼きとかお好み焼きは外せんよなぁ!道頓堀なんか行ったら、たこ焼きのひとつやふたつ、なんぼでも食べれるで!それとも、あんたの好きななんかあるん?串カツもいいし、うどんも美味しいし、どれも楽しめるで~!何が食べたいか教えてや!
ユーザ: 串カツええな。どこに行ったら食べれるん?
AI: 串カツやったら、やっぱり新世界がオススメやで!あの辺、串カツ屋さんがたくさんあって、みんな安くて美味しいって評判やで。元祖串カツのお店もあったりして、ほんまににぎやかやから楽しいで~!それに、あんたの好きなソース、二度漬け禁止のルールもあるし、ほんまにいい経験になるわ!行ってみたらどうや?
ユーザ: 新世界行ったことないわ、ほんなら今から行ってくるわ。
AI: おっしゃ、行ってらっしゃい!新世界、めっちゃ楽しいとこやで!串カツも美味しいし、たっぷり楽しんできてな~。道中気をつけて、いっぱい食べてきてや!また帰ったら教えてな!楽しんできてや~!
ユーザ: ^C
チャットを終了しました。

CLIでも確認してみる

valkey-cli --raw
LRANGE chat:session123:messages 0 -1
出力
{"role": "user", "content": "おはよう!"}
{"role": "assistant", "content": "おはようさん!今日はええ天気やなぁ。何か予定あるんか?おばちゃん、なんか話したいことあったら何でも聞くで!"}
{"role": "user", "content": "今日はいいお天気やからお出かけしたいなあ"}
{"role": "assistant", "content": "そうやなぁ、ほんまにええ天気やから、お出かけにはぴったりやで!どこ行く予定なん?公園とか、ショッピングとか、気になるとこあったら教えてや!おばちゃんも一緒に行ったろか?(笑)"}
{"role": "user", "content": "どこかオススメある?"}
{"role": "assistant", "content": "そやなぁ、ほんまにええ天気やし、箕面の滝なんかどうかな?ちょっとハイキングして、あの滝の音聞きながら自然を楽しめるで!あと、万博記念公園も広いし、ゆったり散歩できるし、ピクニックなんかもええなぁ。お弁当作ったら楽しいで~!他にも気になる場所あったら教えてや!"}
{"role": "user", "content": "美味しいものが食べたいなぁ。"}
{"role": "assistant", "content": "そしたら、やっぱり大阪やからたこ焼きとかお好み焼きは外せんよなぁ!道頓堀なんか行ったら、たこ焼きのひとつやふたつ、なんぼでも食べれるで!それとも、あんたの好きななんかあるん?串カツもいいし、うどんも美味しいし、どれも楽しめるで~!何が食べたいか教えてや!"}
{"role": "user", "content": "串カツええな。どこに行ったら食べれるん?"}
{"role": "assistant", "content": "串カツやったら、やっぱり新世界がオススメやで!あの辺、串カツ屋さんがたくさんあって、みんな安くて美味しいって評判やで。元祖串カツのお店もあったりして、ほんまににぎやかやから楽しいで~!それに、あんたの好きなソース、二度漬け禁止のルールもあるし、ほんまにいい経験になるわ!行ってみたらどうや?"}
{"role": "user", "content": "新世界行ったことないわ、ほんなら今から行ってくるわ。"}
{"role": "assistant", "content": "おっしゃ、行ってらっしゃい!新世界、めっちゃ楽しいとこやで!串カツも美味しいし、たっぷり楽しんできてな~。道中気をつけて、いっぱい食べてきてや!また帰ったら教えてな!楽しんできてや~!"}

再度起動してCLIで確認してみると、起動時に古い履歴が消えているのがわかる。

出力
{"role": "user", "content": "今日はいいお天気やからお出かけしたいなあ"}
{"role": "assistant", "content": "そうやなぁ、ほんまにええ天気やから、お出かけにはぴったりやで!どこ行く予定なん?公園とか、ショッピングとか、気になるとこあったら教えてや!おばちゃんも一緒に行ったろか?(笑)"}
{"role": "user", "content": "どこかオススメある?"}
{"role": "assistant", "content": "そやなぁ、ほんまにええ天気やし、箕面の滝なんかどうかな?ちょっとハイキングして、あの滝の音聞きながら自然を楽しめるで!あと、万博記念公園も広いし、ゆったり散歩できるし、ピクニックなんかもええなぁ。お弁当作ったら楽しいで~!他にも気になる場所あったら教えてや!"}
{"role": "user", "content": "美味しいものが食べたいなぁ。"}
{"role": "assistant", "content": "そしたら、やっぱり大阪やからたこ焼きとかお好み焼きは外せんよなぁ!道頓堀なんか行ったら、たこ焼きのひとつやふたつ、なんぼでも食べれるで!それとも、あんたの好きななんかあるん?串カツもいいし、うどんも美味しいし、どれも楽しめるで~!何が食べたいか教えてや!"}
{"role": "user", "content": "串カツええな。どこに行ったら食べれるん?"}
{"role": "assistant", "content": "串カツやったら、やっぱり新世界がオススメやで!あの辺、串カツ屋さんがたくさんあって、みんな安くて美味しいって評判やで。元祖串カツのお店もあったりして、ほんまににぎやかやから楽しいで~!それに、あんたの好きなソース、二度漬け禁止のルールもあるし、ほんまにいい経験になるわ!行ってみたらどうや?"}
{"role": "user", "content": "新世界行ったことないわ、ほんなら今から行ってくるわ。"}
{"role": "assistant", "content": "おっしゃ、行ってらっしゃい!新世界、めっちゃ楽しいとこやで!串カツも美味しいし、たっぷり楽しんできてな~。道中気をつけて、いっぱい食べてきてや!また帰ったら教えてな!楽しんできてや~!"}
kun432kun432

Sets

Setsは、順序のないユニークな文字列の集合。集合という観点ではリストと似ているが、

  • 要素(「メンバー」と言う)の順序は保証されない
  • 要素の重複は許されない
  • 値はすべて文字列

という点が異なり、以下のようなユースケースで使用される。

  • ユニークな値の記録
    • 例: アクセスしてきたすべての IP アドレスを記録
  • 関係の表現
    • あるロールを持つ全ユーザのIDを保持
  • 集合演算の実行
    • 和(合体)、 積(共通項)、 差(片方だけ)など

主要なコマンドは以下

  • SADD: セットにメンバーを追加
  • SREM: セットからメンバーを削除
  • SISMEMBER: セットのメンバーかどうかをチェック
  • SINTER: 複数セットの共通(積)メンバーのセットを返す
  • SUNION: 複数セットの合体(和)メンバーのセットを返す
  • SDIFF: あるセットから別のセットを引いた差となるメンバーのセットを返す
  • SCARD: メンバー数を取得(別名: cardinality)
  • SMEMBERS: 全メンバーを取得(順序はない)
  • SPOP: ランダムに1メンバーを取得して削除
  • SRANDMEMBER: ランダムに1メンバーを取得、ただし削除しない

使い方

  • 基本的なメンバーの追加はSADDで行い、重複する場合は追加されない。
127.0.0.1:6379> SADD bikes:racing:france bike:1
(integer) 1
127.0.0.1:6379> SADD bikes:racing:france bike:1
(integer) 0     ※重複しているので追加されず0件
  • 複数メンバーをまとめて追加することも可能
  • SMEMBERSでセットの全メンバーを取得
    • なお、以下例では順序どおりに並んでいるが、必ずしも順序は保証されないという点に留意
127.0.0.1:6379> SADD bikes:racing:france bike:2 bike:3
(integer) 2
127.0.0.1:6379> SMEMBERS bikes:racing:france
1) "bike:1"
2) "bike:2"
3) "bike:3"
  • 件数確認はSCARD
127.0.0.1:6379> SCARD bikes:racing:france
(integer) 3
  • SISMENBERで、あるメンバーがセットに含まれているかの存在を確認できる
127.0.0.1:6379> SISMEMBER bikes:racing:france bike:1
(integer) 1  ※含まれる
127.0.0.1:6379> SISMEMBER bikes:racing:france bike:4
(integer) 0  ※含まれない
  • 複数のメンバーの存在も同時に確認することもできる
127.0.0.1:6379> SMISMEMBER bikes:racing:france bike:2 bike:3 bike:99
1) (integer) 1  ※bike:2 → 含まれている
2) (integer) 1  ※bike:3 → 含まれている
3) (integer) 0  ※bike:99 → 含まれない
  • 削除はSREM。複数指定も可。
127.0.0.1:6379> SADD bikes:racing:france bike:1 bike:2 bike:3 bike:4 bike:5
(integer) 2
127.0.0.1:6379> SREM bikes:racing:france bike:1
(integer) 1
127.0.0.1:6379> SMEMBERS bikes:racing:france
1) "bike:2"
2) "bike:3"
3) "bike:4"
4) "bike:5"
127.0.0.1:6379> SREM bikes:racing:france bike:2 bike:3
(integer) 2
127.0.0.1:6379> SMEMBERS bikes:racing:france
1) "bike:4"
2) "bike:5"

集合演算の例。setを複数用意しておく。

127.0.0.1:6379> SADD bikes:racing:japan bike:1 bike:2 bike:3
(integer) 3
127.0.0.1:6379> SADD bikes:racing:usa bike:1 bike:4
(integer) 2
  • 共通項(intersection)はSINTER
127.0.0.1:6379> SINTER bikes:racing:japan bikes:racing:usa
1) "bike:1"
  • 差(difference)はSDIFF
127.0.0.1:6379> SDIFF bikes:racing:japan bikes:racing:usa
1) "bike:2"
2) "bike:3"
  • 合体(union)
127.0.0.1:6379> SUNION bikes:racing:japan bikes:racing:usa
1) "bike:1"
2) "bike:2"
3) "bike:3"
4) "bike:4"

ランダムな操作の例

  • SRANDMEMBERでランダムにメンバーを取り出し、ただし削除はしない
127.0.0.1:6379> SADD bikes:racing:italy bike:1 bike:2 bike:3 bike:4 bike:5
(integer) 5
127.0.0.1:6379> SRANDMEMBER bikes:racing:italy
"bike:2"
127.0.0.1:6379> SRANDMEMBER bikes:racing:italy
"bike:5"
127.0.0.1:6379> SRANDMEMBER bikes:racing:italy
"bike:2"
127.0.0.1:6379> SRANDMEMBER bikes:racing:italy
"bike:4"
127.0.0.1:6379> SMEMBERS bikes:racing:italy
1) "bike:1"
2) "bike:2"
3) "bike:3"
4) "bike:4"
5) "bike:5"
  • SPOPはランダムにメンバーを取り出して削除
127.0.0.1:6379> SMEMBERS bikes:racing:italy
1) "bike:1"
2) "bike:2"
3) "bike:3"
4) "bike:4"
5) "bike:5"
127.0.0.1:6379> SPOP bikes:racing:italy
"bike:3"
127.0.0.1:6379> SPOP bikes:racing:italy
"bike:2"
127.0.0.1:6379> SMEMBERS bikes:racing:italy
1) "bike:1"
2) "bike:4"
3) "bike:5"

メンバーの最大数は2^32-14,294,967,295

kun432kun432

Hashes

https://valkey.io/topics/hashes/

Hashは複数のフィールド・値を持つ「オブジェクト」。主なコマンドは以下。

  • HSET: ハッシュの 1 つ以上のフィールドの値を設定。
  • HGET: 指定されたフィールドの値を取得。
  • HMGET: 指定された 1 つ以上のフィールドの値を取得。
  • HINCRBY: 指定されたフィールドの値を、指定された整数分インクリメント。

使い方

  • HSETでフィールド・値を設定。複数セットすることもできる。
127.0.0.1:6379> HSET bike:1 model Deimos brand Ergonom type 'Enduro bikes' price 4972
(integer) 4    ※4つのフィールド・値が設定された
  • 特定のフィールド1つを取得するにはHGET
127.0.0.1:6379> HGET bike:1 model
"Deimos"
127.0.0.1:6379> HGET bike:1 price
"4972"
  • すべてのフィールドをまるっと取得するにはHGETALL
    • フィールド・値、フィールド・値、・・・ で返される。
    • ただしフィールドの順番は保証されない。
127.0.0.1:6379> HGETALL bike:1
1) "model"
2) "Deimos"
3) "brand"
4) "Ergonom"
5) "type"
6) "Enduro bikes"
7) "price"
8) "4972"
  • 複数のフィールドの値を取得するにはHMGET
127.0.0.1:6379> HMGET bike:1 model price
1) "Deimos"
2) "4972"
127.0.0.1:6379> HMGET bike:1 brand type grade
1) "Ergonom"
2) "Enduro bikes"
3) (nil)    ※存在しないフィールドはNULLになる
  • 数値フィールドをインクリメントするにはHINCRBY
127.0.0.1:6379> HGET bike:1 price
"4972"
127.0.0.1:6379> HINCRBY bike:1 price 100
(integer) 5072
127.0.0.1:6379> HINCRBY bike:1 price -100
(integer) 4972
  • ``カウンター的な使い方
127.0.0.1:6379> HINCRBY bike:1:stats rides 1
(integer) 1
127.0.0.1:6379> HINCRBY bike:1:stats crashes 1
(integer) 1
127.0.0.1:6379> HINCRBY bike:1:stats owners 1
(integer) 1
127.0.0.1:6379> HINCRBY bike:1:stats rides 1
(integer) 2
127.0.0.1:6379> HINCRBY bike:1:stats owners 2
(integer) 3
127.0.0.1:6379> HGET bike:1:stats rides
"2"
127.0.0.1:6379> HMGET bike:1:stats crashes owners
1) "1"
2) "3"
kun432kun432

Sorted Sets

https://valkey.io/topics/sorted-sets/

Sorted SetsはSetsと似ているが、「順序がある」という点が異なる。

  • メンバーはユニークな文字列
  • メンバーに「スコア」という数値が割り当てられる
  • スコアに基づいて要素は自動でソートされる

概念的には Sets と Hashesの中間のようなデータ構造になる。

  • メンバーは一意な文字列(Sets)
  • メンバーに値(=スコア)が割り当てられる(Hashs)

ユースケース

  • リーダーボード: ゲームのランキング(スコア順)
  • APIレート制限: 時間+ユーザーIDでスコア付けし、スライディングウィンドウで制御

Sorted Set 内の要素は、以下のルールで順序が決まる

  • スコアが小さいほど先に並ぶ
  • スコアが同じなら、文字列の辞書順で比較される
    • メンバーは一意なので、完全に同じ文字列が重複することはない

基本的なコマンド

  • ZADD: 要素とスコアを追加(要素が存在する場合はスコアを更新)
  • ZRANGE: 昇順で要素を取得
  • ZREVRANGE: 降順で要素を取得
  • ZRANK: 昇順での順位を取得
  • ZREVRANK: 降順での順位を取得
  • ZINCRBY: スコアをインクリメント
  • ZREM: 要素を削除
  • [...]BYSCORE: スコアの範囲で要素を取得・削除
  • [...]BYLEX: 辞書順での操作(同スコア時)

基本的な例

  • ZADDで要素をスコア付きで追加。複数のスコア・メンバーを追加することも可能。
127.0.0.1:6379> ZADD racer_scores 10 "Norem"
(integer) 1
127.0.0.1:6379> ZADD racer_scores 12 "Castilla"
(integer) 1
127.0.0.1:6379> ZADD racer_scores 8 "Sam-Bodden" 10 "Royce" 6 "Ford" 14 "Prickett"
(integer) 4
  • ZRANGEでスコアの昇順、ZREVRANGEでスコアの降順に、要素を取得
    • なお、要素保存時に順序付きで保存されるため、取得する際にソートする必要はない
      • ZADD時にソート済みの位置に挿入される
    • WITHSCORESでスコア付きで返される
127.0.0.1:6379> ZRANGE racer_scores 0 -1
1) "Ford"
2) "Sam-Bodden"
3) "Norem"
4) "Royce"
5) "Castilla"
6) "Prickett"
127.0.0.1:6379> ZREVRANGE racer_scores 0 -1
1) "Prickett"
2) "Castilla"
3) "Royce"
4) "Norem"
5) "Sam-Bodden"
6) "Ford"
127.0.0.1:6379>  ZRANGE racer_scores 0 -1 WITHSCORES
 1) "Ford"
 2) "6"
 3) "Sam-Bodden"
 4) "8"
 5) "Norem"
 6) "10"
 7) "Royce"
 8) "10"
 9) "Castilla"
10) "12"
11) "Prickett"
12) "14"
  • ZRANGEBYSOREで、スコアの範囲で取得
    • LRANGEと同じ指定。
    • infで無限大を表す。LRANGE-1と同じような使い方ができる。
127.0.0.1:6379> ZRANGEBYSCORE racer_scores 10 12
1) "Norem"
2) "Royce"
3) "Castilla"
127.0.0.1:6379> ZRANGEBYSCORE racer_scores 10 12 WITHSCORES
1) "Norem"
2) "10"
3) "Royce"
4) "10"
5) "Castilla"
6) "12"
127.0.0.1:6379> ZRANGEBYSCORE racer_scores -inf 12 WITHSCORES
 1) "Ford"
 2) "6"
 3) "Sam-Bodden"
 4) "8"
 5) "Norem"
 6) "10"
 7) "Royce"
 8) "10"
 9) "Castilla"
10) "12"
127.0.0.1:6379> ZRANGEBYSCORE racer_scores 10 inf WITHSCORES
1) "Norem"
2) "10"
3) "Royce"
4) "10"
5) "Castilla"
6) "12"
7) "Prickett"
8) "14"
  • ZREMで削除
  • ZREMRANGEBYSCOREでスコア範囲内で削除
127.0.0.1:6379> ZREM racer_scores Ford
(integer) 1
127.0.0.1:6379> ZRANGE racer_scores 0 -1 WITHSCORES
 1) "Sam-Bodden"
 2) "8"
 3) "Norem"
 4) "10"
 5) "Royce"
 6) "10"
 7) "Castilla"
 8) "12"
 9) "Prickett"
10) "14"
127.0.0.1:6379> ZREMRANGEBYSCORE racer_scores 10 12
(integer) 3
127.0.0.1:6379> ZRANGE racer_scores 0 -1 WITHSCORES
1) "Sam-Bodden"
2) "8"
3) "Prickett"
4) "14"
  • スコアが同じであれば辞書順にソートされる
  • 辞書で範囲指定もできる。ZRANGEBYLEX / ZREMRANGEBYLEXを使う
    • 範囲指定時の記号
      • [: 含む
      • (: 含まない
      • -: 最小値(文字列における -inf
      • +: 最大値(文字列における inf
    • Cのmemcmpで厳密なバイト比較が行われるため、プラットフォームに依存しない
      • 要素の文字列「全体」で比較される点に注意。ここはちょっとわかりにくいので色々試すほうが良さそう。
127.0.0.1:6379> ZADD test_scores 95 "John" 95 "Paul" 95 "George" 95 "Ringo"
(integer) 4
127.0.0.1:6379> ZRANGE test_scores 0 -1
1) "George"
2) "John"
3) "Paul"
4) "Ringo"
127.0.0.1:6379> ZRANGEBYLEX test_scores [A [N
1) "George"
2) "John"
> ZRANGEBYLEX test_scores [A [John
1) "George"
2) "John"
127.0.0.1:6379> ZRANGEBYLEX test_scores  [A (John
1) "George"
  • 辞書順でのアクセスは、Sorted Setを汎用インデックスとして利用するユースケースがある
    • スコアを同じにして、メンバー名にプレフィックスを付ける
    • ZRANGEBYLEXは文字列のバイト順で並べてくれるので、プレフィクス順に並ぶ
  • 例えばログの例
127.0.0.1:6379> ZADD logs 0 "1714700000:User A logged in"
(integer) 1
127.0.0.1:6379> ZADD logs 0 "1714700300:User B posted a message"
(integer) 1
127.0.0.1:6379> ZADD logs 0 "1714700600:User A logged out"
(integer) 1
127.0.0.1:6379> ZADD logs 0 "1714700900:System backup started"
(integer) 1
127.0.0.1:6379> ZRANGEBYLEX logs [1714700000 [1714700600   # 時刻範囲で指定
1) "1714700000:User A logged in"
2) "1714700300:User B posted a message"
  • スコアのインクリメント
    • ZADDでメンバーを増やしたり、ZINCRBYでスコアを更新すると、順序が更新される
127.0.0.1:6379> ZINCRBY test_scores 1 "George"
"96"
127.0.0.1:6379> ZINCRBY test_scores 2 "Paul"
"97"
127.0.0.1:6379> ZRANGE test_scores 0 -1 WITHSCORES
1) "John"
2) "95"
3) "Ringo"
4) "95"
5) "George"
6) "96"
7) "Paul"
8) "97"
  • リーダーボードに最適
    • 順位(ZRANK
    • 上位N人(ZRANGE
    • スコア更新(ZINCRBY
    • これらが全て高速(O(log N))に行える。大量の更新やランキング表示に強い。
kun432kun432

Streams

https://valkey.io/topics/streams-intro/

Stream(ストリーム) は、時系列データを格納できるデータ構造だが、「追記専用ログ(append-only log)」ではなく、より高度な機能がある

  • O(1) 時間のランダムアクセス
  • 複雑な消費モデル(例:Consumer Group)
  • リアルタイムでイベントを記録・配信可能

主なユースケース

  • Event sourcing: ユーザーのクリック、操作履歴など
  • センサーモニタリング: フィールドにあるデバイスのデータ収集
  • 通知管理: 各ユーザーごとの通知をストリームで管理

各ストリームのエントリ(レコード)には、時間ベースの一意なID が割り振られる。これを使うことで、時間範囲での検索などが可能になる。

主なコマンド

  • XADD: ストリームに新しいエントリを追加
  • XREAD: 指定位置から未来に向かってエントリを読み込む
  • XRANGE: 2つのIDの間にあるエントリの範囲を取得
  • XLEN: ストリームのエントリ数を取得

使い方

  • XADDでストリームに新しいエントリを追加する
    • キー名の後にIDを指定する。*でIDを自動作成する。
    • その後はフィールド・値のペアを指定
  • IDは以下のフォーマットで自動作成される
    • <millisecondsTime>-<sequenceNumber>
      • 前半部: エントリ作成時のミリ秒単位のUNIXタイムスタンプ
      • 後半部:同一ミリ秒内での連番(衝突を回避するため)
    • 時計が巻き戻った場合、前のIDより小さくならないように、前回のタイムスタンプが使われる。
    • IDを自分で指定することも可能
  • 以下は、レース中にライダーがチェックポイントを通過した際に、名前、速度、順位、チェックポイントのIDを登録する例
127.0.0.1:6379> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1746445199281-0"
127.0.0.1:6379> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
"1746445245718-0"
127.0.0.1:6379> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
"1746445252540-0"
  • XLENでストリームの長さを取得
127.0.0.1:6379> XLEN race:france
(integer) 3

ストリームからのデータの取得は3種類

  1. 範囲クエリ(例: XRANGE): 履歴からの抽出
  2. 逐次購読(例: XREAD): tail -f のように新しいエントリを受け取る
  3. 複数コンシューマへの分配(例: XREADGROUP): コンシューマグループ方式
  • XRANGEでエントリを取得
    • -で最小値
    • +で最大値
    • - +ですべてになる
    • IDは時刻を含むため、時刻でも指定できる。
127.0.0.1:6379> XRANGE race:france - +
1) 1) "1746445199281-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"
2) 1) "1746445245718-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"
3) 1) "1746445252540-0"
   2) 1) "rider"
      2) "Prickett"
      3) "speed"
      4) "29.7"
      5) "position"
      6) "2"
      7) "location_id"
      8) "1"
127.0.0.1:6379> XRANGE race:france 1746445199280 1746445252539
1) 1) "1746445199281-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"
2) 1) "1746445245718-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"
  • XRANGECOUNTで部分的に取得、ページネーションができる
  • 結果の最後のIDを使って、次の範囲を取得できる
    • (で含まない、を使う
127.0.0.1:6379> XRANGE race:france - + COUNT 2
1) 1) "1746445199281-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"
2) 1) "1746445245718-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"
127.0.0.1:6379> XRANGE race:france (1746445245718-0 + COUNT 2
1) 1) "1746445252540-0"
   2) 1) "rider"
      2) "Prickett"
      3) "speed"
      4) "29.7"
      5) "position"
      6) "2"
      7) "location_id"
      8) "1"
  • XREVRANGEで逆順で取得できる
127.0.0.1:6379> XREVRANGE race:france + - COUNT 1
1) 1) "1746445252540-0"
   2) 1) "rider"
      2) "Prickett"
      3) "speed"
      4) "29.7"
      5) "position"
      6) "2"
      7) "location_id"
      8) "1"
  • XREADであるID以降のエントリを件数を指定して取得
    • 0-0で最初のIDを指す。0でも同じ。
    • 最後のIDを保持すれば、新しいエントリだけを取得できる
    • 複数のストリームを対象にすることも可能
127.0.0.1:6379> XREAD COUNT 2 STREAMS race:france 0-0
1) 1) "race:france"
   2) 1) 1) "1746445199281-0"
         2) 1) "rider"
            2) "Castilla"
            3) "speed"
            4) "29.9"
            5) "position"
            6) "1"
            7) "location_id"
            8) "2"
      2) 1) "1746445245718-0"
         2) 1) "rider"
            2) "Norem"
            3) "speed"
            4) "28.8"
            5) "position"
            6) "3"
            7) "location_id"
            8) "1"
127.0.0.1:6379> XREAD COUNT 2 STREAMS race:france 1746445245718-0
1) 1) "race:france"
   2) 1) 1) "1746445252540-0"
         2) 1) "rider"
            2) "Prickett"
            3) "speed"
            4) "29.7"
            5) "position"
            6) "2"
            7) "location_id"
            8) "1"
  • XREADはブロッキングモードも可能
    • XREAD BLOCK Nを使う
      • Nでタイムアウトを指定。0で無限
      • $で最新のIDを指定。つまり、これ以降新しく追加されるエントリのみを対象にする。
    • tail -f と同じ
    • ターミナルを複数使っ手試してみる。
    • PubSubではないので、メッセージは削除されない
    • コンシューマごとの履歴トラッキングはない
ターミナル1
127.0.0.1:6379> XREAD BLOCK 0 STREAMS race:france $
ターミナル2
127.0.0.1:6379> XADD race:france * rider Castilla speed 30.1 position 1 location_id 3
"1746447957017-0"
ターミナル1
1) 1) "race:france"
   2) 1) 1) "1746447957017-0"
         2) 1) "rider"
            2) "Castilla"
            3) "speed"
            4) "30.1"
            5) "position"
            6) "1"
            7) "location_id"
            8) "3"
(19.72s)
kun432kun432

(Streamsはカバーする範囲が広いので、ちょっと分割)

kun432kun432

ふと思ったこと

  • ドキュメントは非常に豊富
  • なのだが・・・
    • どういう順で進めていけばいいのかがわからない。Quickstartの後でちょっと迷子になりそう。
    • データ型が重要かな?と思ったので順に見てきたけど、ちょっと説明的すぎる感と唐突な情報がチラホラ入ってくる

情報量は多いけど、なんとなくオーガナイズされてない感がある。Redisの経験がある事が前提になるのかもしれないけど、初学者にはちょっと辛い感。

というか、今ならRedisの入門書とかを読んだほうがいいのかもね、ある程度の互換性はあるっぽいし。ただ微妙な違いにハマったりとかしないかなぁという懸念はある。

kun432kun432

データ型についてはStreamsの途中だけども、一旦保留して、少し違うトピックのドキュメントを見てみようと思う。

https://valkey.io/topics/

上記のトピックごとのドキュメントの目次はこんな感じ

Valkeyでのプログラミング

ここは実装における一般的にありそうなユースケースという感じに思える。

Valkeyでのサーバサイドスクリプト

RDBでいうところのストアドプロシージャとかトリガーとかに近いものかな

管理

  • インストール: Valkeyのインストールと構成方法。Valkey未経験者向け。
  • valkey-cli: Valkeyのコマンドラインインタフェース。管理・トラブルシューティング・実験に使用。
  • valkey-server: Valkeyサーバの実行方法。
  • 設定: Valkeyの設定方法。
  • レプリケーション: プライマリ・レプリカ構成の設定に必要な知識。
  • マイグレーション: RedisからValkeyへの移行方法。
  • 永続化: ディスクバックアップを用いた耐久性の構成オプション。
  • 管理: 管理に関するさまざまなトピック。
  • セキュリティ: Valkeyのセキュリティ概要。
  • RDMA: RDMAサポートの概要。
  • アクセス制御リスト (ACL): 選択されたコマンドおよび特定のキー・パターンへのアクセスのみを許可する仕組み。
  • 暗号化: 通信にTLSを使用する方法。
  • シグナル処理: Valkeyにおけるシグナルの処理方法。
  • 接続処理: クライアント接続に対するValkeyの処理方法。
  • Sentinel: Valkey Sentinelは公式の高可用性デプロイメントモードの一つ。
  • リリース: Valkeyの開発サイクルとバージョン番号の付け方。

ここはインフラとしての運用管理って感じ

Valkeyクラスタ

  • クラスタチュートリアル: 水平スケーリングと高可用性のためのデプロイメントモードであるValkeyクラスタの入門。
  • クラスタ仕様: Valkeyクラスタで使用される動作およびアルゴリズムの正式な説明。

ここも一つ上と同じだけど、可用性とか冗長性とかが必要な場合

ValkeyモジュールAPI

  • Valkeyモジュール入門: 動的リンクモジュールを使用してValkeyを拡張。
  • ネイティブデータ型の実装: 組み込みデータ型のように見える新しいデータ型(データ構造など)をモジュールで実装する方法。このドキュメントではそのAPIを説明。
  • ブロッキング操作: クライアントをブロックしつつValkey自体はブロックせずに、他のスレッドでタスクを実行するコマンドの作成。
  • モジュールAPIリファレンス: すべてのモジュールAPI関数に関するドキュメント。API使用に関する低レベルの詳細。

モジュール拡張とかプラグイン的なものかな?

パフォーマンス

パフォーマンスチューニングとか

チュートリアルとFAQ

賞のタイトル通り

コマンド実行時の内部情報

Valkeyのコマンドそのものの仕様とかって感じ?

とりあえず

  • 「Valkeyでのプログラミング」からPub/Sub関連
  • 「管理」全般

あたりで気になったところを読んでみるかな。

kun432kun432

永続化

「インストール」のページにまず記載がある

https://valkey.io/topics/installation/

Valkey の永続化

このページでは、Valkey の永続化機能の仕組みについて説明します。デフォルトの設定で Valkey を起動した場合、Valkey はデータセットを自動的に保存する頻度が低くなります。例えば、データに 100 件以上の変更があった場合、少なくとも 5 分後に保存されます。データベースを永続化し、再起動後に再読み込みしたい場合は、データセットのスナップショットを強制的に作成したいたびに、手動で SAVE コマンドを実行する必要があります。または、終了時に SHUTDOWN コマンドを使用することで、データをディスクに保存してから終了できます:

$ valkey-cli shutdown

この方法では、Valkeyは終了前にデータをディスクに保存します。Valkeyの永続化機能の動作を深く理解するため、永続化のページを必ずご確認ください。

Valkey上のデータの永続化については以下に記載がある

https://valkey.io/topics/persistence/

Valkeyでの永続化方法

Valkeyでは以下の4種類の永続化方法がある

  1. RDB(Valkeyデータベース): 一定時間ごとにデータをスナップショットとして保存
  2. AOFAppend Only File): 書き込み操作をすべてログに記録して再現可能にする
  3. RDB + AOF: 両方の利点を兼ねる
  4. 永続化なし: キャッシュ用途で、再起動後のデータは不要な場合に選択

RDBとAOFの違い

  • RDB(スナップショット方式)
    • メリット
      • 高速でコンパクトな単一ファイルにデータを保存
      • バックアップ向け:毎時/毎日スナップショットを保存し、過去の状態を復元できる
      • 起動が早い:大量データでもAOFより速く起動
      • パフォーマンスに優れる:親プロセスはディスクI/Oをせず、子プロセスに任せる
      • レプリカとの部分再同期にも対応
    • デメリット
      • 最新データが失われる可能性あり:スナップショットの間隔によっては数分間のデータが消える
      • fork()コスト:大きなデータを持つとforkが重く、一時的に応答が遅れることも
    • 使いどころ
      • データを数分失ってもOK
      • 高速起動やバックアップ重視
      • システム障害時の復旧性を重視
  • AOF(ログ記録方式)
    • メリット
      • 高い耐久性:秒単位やコマンドごとにfsyncで確実にディスクに書く
      • 安全性:突然の電源断でも破損しにくい
      • 再生可能:書いた操作がログ形式で残るので復旧しやすい
      • valkey-check-aofで修復可能:途中まで書かれていても修正できる
      • ログ圧縮(rewrite)機能:無駄なログを減らしてサイズを抑える
    • デメリット
      • ファイルサイズが大きい
      • fsync頻度によってはRDBより遅くなる
    • 使いどころ
      • データ損失を1秒以内に抑えたい場合
      • ログからの追跡や復元が必要な運用
      • トランザクションの完全性が重要なシステム

どれを選択すべきか

  • 安全性を最優先、PostgreSQL並の耐久性を確保したい: RDB + AOF
  • パフォーマンス重視、多少のデータ損失は許容: RDB(のみ)
  • キャッシュ用途、再起動時のデータ不要: 永続化なし
  • AOFのみに頼らずRDBも定期的に取得: AOF + 定期RDBバックアップ

永続化設定の確認

まずは現状の永続化設定を確認していく。今回はMac上のHomebrewでインストールしていて、特に設定等は行っていないので(Homebrew側で既に設定済みでない限り)デフォルトの設定のはず。

設定ファイル

brew info valkey
出力
==> valkey: stable 8.1.1 (bottled), HEAD
High-performance data structure server that primarily serves key/value workloads
https://valkey.io
(snip)
==> Caveats
To restart valkey after an upgrade:
  brew services restart valkey
Or, if you don't want/need a background service you can just run:
  /opt/homebrew/opt/valkey/bin/valkey-server /opt/homebrew/etc/valkey.conf
build-error: 0 (30 days)
(snip)

設定ファイルは/opt/homebrew/etc/valkey.confにあるが、一旦置いておく。

CLIで設定を確認。RDBの設定。

valkey-cli CONFIG GET save
出力
1) "save"
2) "3600 1 300 100 60 10000"

2行目の設定は、3つのスナップショット取得設定が組み合わさったものとなっている。

  • save 3600 1: 1時間以内に1件以上の書き込みがあれば取得
  • save 300 100: 5分以内に100件以上の書き込みがあれば取得
  • save 60 10000: 1分以内に10000件以上の書き込みがあれば取得

なるほど、更新頻度と稼働時間の両方を見てなるべく保存する、というような雰囲気を感じる。

でスナップショットのファイル名と保存パスは以下で確認できる。

valkey-cli CONFIG GET dbfilename
出力
1) "dbfilename"
2) "dump.rdb"
valkey-cli CONFIG GET dir
出力
1) "dir"
2) "/opt/homebrew/var/db/valkey"

つまり、/opt/homebrew/var/db/valkey/dump.rdbがスナップショットファイルということになる。実際に見てみる。

ls -lt /opt/homebrew/var/db/valkey/*
出力
-rw-r--r--@ 1 kun432  admin  507  5  5 22:05 /opt/homebrew/var/db/valkey/dump.rdb

なるほど、スナップショットと言いつつ、世代管理されるわけではないみたい。

次にAOFの設定

valkey-cli CONFIG GET appendonly
出力
1) "appendonly"
2) "no"

こちらは無効化されている。

永続化設定の変更

では、まずRDBの設定を変更してみる。1分に1回以上更新があれば保存に変更。

valkey-cli CONFIG SET save "60 1"
出力
OK

確認

valkey-cli CONFIG GET save
出力
1) "save"
2) "60 1"

設定が変更されているのがわかる。では、スナップショットファイルを確認。

ls -lt /opt/homebrew/var/db/valkey/dump.rdb

先ほどと変わりがない。

出力
-rw-r--r--@ 1 kun432  admin  507  5  5 22:05 /opt/homebrew/var/db/valkey/dump.rdb

valkey-cliから適当にキーを追加してみる。

valkey-cli SET foo bar
出力
OK
valkey-cli GET foo
出力
"bar"

スナップショットファイルが更新されているのがわかる。

ls -lt /opt/homebrew/var/db/valkey/dump.rdb
出力
-rw-r--r--@ 1 kun432  admin  516  5 12 20:45 /opt/homebrew/var/db/valkey/dump.rdb

ちなみにRDBを無効にする場合は空文字を指定する。

valkey-cli CONFIG SET save ""
出力
OK
valkey-cli CONFIG GET save
出力
1) "save"
2) ""

では次にAOF。まずAOFを有効にする。

valkey-cli CONFIG SET appendonly yes
出力
OK
valkey-cli CONFIG GET appendonly
出力
1) "appendonly"
2) "yes"

で、この設定はValkeyサーバを起動している間の一時的な設定変更になるらしい。設定を恒久化するには以下を実行して設定をファイルに保存する。

valkey-cli CONFIG REWRITE

では、AOFで保存されるログファイルの確認

valkey-cli CONFIG GET dir
出力
1) "dir"
2) "/opt/homebrew/var/db/valkey"
valkey-cli CONFIG GET appenddirname
出力
1) "appenddirname"
2) "appendonlydir"
3) 
valkey-cli CONFIG GET appendfilename
出力
1) "appendfilename"
2) "appendonly.aof"

こちらは/opt/homebrew/var/db/valkey/appendonlydir/appendonly.aof.*に保存される。実際のディレクトリを見てみる。

ls -lt /opt/homebrew/var/db/valkey/appendonlydir/
出力
total 16
-rw-r--r--@ 1 kun432  admin   88  5 13 05:02 appendonly.aof.manifest
-rw-r--r--@ 1 kun432  admin  516  5 13 05:02 appendonly.aof.1.base.rdb
-rw-r--r--@ 1 kun432  admin    0  5 13 05:02 appendonly.aof.1.incr.aof

どうやら複数のファイルに分けて保存されるみたい。

種類 ファイル名の例 内容
ベースファイル appendonly.aof.1.base.rdb or appendonly.aof.1.base.rdb 最初のスナップショット(RDBまたはAOF形式)
増分ファイル appendonly.aof.1.incr.aof, appendonly.aof.2.incr.aof, ... その後の操作ログ
マニフェスト appendonly.aof.manifest 使用中ファイルの構成と順序の記録

valkey-cliから適当にキーを追加してみる。

valkey-cli SET foo bar
出力
OK

AOFのファイルを見てみる

ls -lt /opt/homebrew/var/db/valkey/appendonlydir/
出力
total 24
-rw-r--r--@ 1 kun432  admin   54  5 13 05:24 appendonly.aof.1.incr.aof
-rw-r--r--@ 1 kun432  admin   88  5 13 05:02 appendonly.aof.manifest
-rw-r--r--@ 1 kun432  admin  516  5 13 05:02 appendonly.aof.1.base.rdb

増分ファイルを見てみる。

cat /opt/homebrew/var/db/valkey/appendonlydir/appendonly.aof.1.incr.aof
出力
*2
$6
SELECT
$1
0
*3
$3
SET
$3
foo
$3
bar

なるほど、ここに更新内容が追記されていき、で定期的にこれをベースファイルに反映していく、というようなものみたい。このベースファイルに反映=ログファイルの再構成のことを"rewrite"と呼ぶらしい。

つまりAOFの場合にはファイルへの永続化に関するタイミングは2つある

  • 増分ファイルへの記録
  • ログファイルの再構成

これらに関連した設定として以下がある

  • appendfsync
    • 増分ファイルへの書き込み操作をディスクにフラッシュ(fsync)する頻度を設定
      • everysec: 1秒ごとに実行。デフォルト。耐久性とパフォーマンスのバランスが取れている。
      • always: すべての書き込み操作後に実行。耐久性は高いがパフォーマンスに影響する(遅くなる)可能性。
      • no: fsyncを実行せずにOSに任せる。パフォーマンスは高いが耐久性が下がる。
  • auto-aof-rewrite-*
    • ログファイルの再構成のトリガーを設定
      • auto-aof-rewrite-percentage: 再構成を行うAOFファイルの増加率
        • 前回再構成後のAOFファイルサイズと現在のAOFファイルサイズを比較し、ここで何パーセント増加したかを指定
        • 例: auto-aof-rewrite-percentage 100だと、AOFファイルが前回の再構成後のサイズの2倍になったら再構成される
      • auto-aof-rewrite-min-size: 再構成を行う最小のAOFファイルサイズ
        • 例: auto-aof-rewrite-min-size 64mbだと、AOFファイルが64MBを超えたら再構成
    • 両方の条件にマッチした場合に、自動的にBGREWRITEAOFが実行され、ログファイルが再構成される

単純にディスクへの永続化ってどうなってんだっけ?と思って読んでみたけど、実運用時にはいろいろ考慮すべき点があるね。上に書いてない事も含めて、設計時には改めて読み直すつもり。

kun432kun432

一通り気になるところは触れたかな。あとは実践あるのみ。

kun432kun432

そういえばStreamsはまだ途中までしか見てなかった。残りは多分PubSub的な使い方になると思うので、PubSubの記事の方に追記。

このスクラップは2ヶ月前にクローズされました