ValkeyでPubSubを試す
Valkey一通り触ってみて、
次はPubSubもやってみる。
PubSub関連のコマンドは以下
概要
ValkeyにおけるPubSubの特徴
- メッセージは永続化しない
- サブスクライブしてない時のメッセージはロスト
- 永続化が必要な場合はStreamを検討
- データベース番号は無関係
- DB0にPUBLISHしても、DB1でSUBSCRIBEすれば受け取れる
- 環境ごとにチャネル名にプレフィックスを付けてスコープ管理がおすすめ
基本コマンドは以下
-
PUBLISH
: チャネルに発行 -
SUBCRIBE
: チャネルの購読を開始 -
UNSUBSCRIBE
: チャネルの購読を停止
他にもあるが後ほど。
とりあえず動かしてみる
valkey-serverはすでに起動しているものとする。
ターミナルを2つ開いて、1つはSubscriber、もう一つをPublisherとする。それぞれvalkey-cli
で接続しておく。
valkey-cli
127.0.0.1:6379>
まずSubscriber側でSUBSCRIBE
SUBSCRIBE my_channel
1) "subscribe"
2) "my_channel"
3) (integer) 1
Reading messages... (press Ctrl-C to quit or any key to type command)
次にPublisher側でPUBLISH
PUBLISH my_channel "This is a test message"
(integer) 1
Subscriber側のターミナルには以下が出力される
1) "message"
2) "my_channel"
3) "This is a test message"
この時、Subscriber側で適当なキーを入力すると以下となる。
127.0.0.1:6379(subscribed mode)>
どうやらこの"subscribed mode"だとSUBSCRIBEは一応継続しているみたいで、この状態のときにPublisher側でメッセージをPUBLISHすると、画面には出力されないのだが、再度SUBSCRIBEコマンドを実行すると確認できるみたい。
SUBSCRIBE my_channel
1) "message"
2) "my_channel"
3) "This is a test message during subscribed mode" <-- Subscriberがsubscribed modeの間にPublisherからPublishしたメッセージ
1) "subscribe"
2) "my_channel"
3) (integer) 1
なお、valkey-cli の場合、UNSUBSCRIBEについては、
valkey-cli を使用する場合、サブスクライブモードでは UNSUBSCRIBE や PUNSUBSCRIBE などのコマンドは使用できません。valkey-cli はコマンドを受け付けず、Ctrl-C でモードを終了するしか方法がありませんのでご注意ください。
とあるのだが、"subscribed mode"でUNSUBSCRIBEを実行するとなんかちゃんと受け付けているように見えるのだけど、違うのかな?
UNSUBSCRIBE my_channel
1) "unsubscribe"
2) "my_channel"
3) (integer) 0
配信セマンティクス
Valkey Pub/Sub の配信セマンティクスは at-most-once
-
at-most-once
- メッセージは「一度だけ送られる」
- 送信後に Subscriber で処理エラーやネットワーク断が起きても再送はされず、そのメッセージはロストししてしまう
- 再送なしにすることで、シンプル&高速性を重視
- データ永続化やACKを待つ仕組みがないため、レイテンシーが低く、オーバーヘッドも小さい
- 信頼性が必要なケース
- より強い配信保証が必要な場合は、ロスト対策・再送機構を持つ Streams を検討
- Streams は at-most-once に加え at-least-once(「少なくとも一度は届く」)をサポート
プッシュメッセージの形式
Valkey サーバが Subscriber に送るプッシュメッセージは、配列返信(array-reply)で3つの要素から構成される。上の方で簡単にPubSubを試したが、その際の出力を見るとわかりやすい。
1つ目はメッセージの「種類」で、この種類が以下のどちらかで、残りの2つの要素の内容が異なる様子。
-
subscribe
/unsubscribe
message
subscribe
/ unsubscribe
1. チャネルを購読開始・購読停止する際の通知
- メッセージの種類:
subscribe
またはunsubscribe
- チャンネル名
- 現在購読しているチャネル数
valkey-cliでSUBSCRIBEしたときの出力を再掲
Subscriber側ターミナルのvalkey-cliSUBSCRIBE my_channel
出力1) "subscribe" 2) "my_channel" 3) (integer) 1 Reading messages... (press Ctrl-C to quit or any key to type command)
実際にはRESP(Redis Serialization Protocol)で送信されるので多分こんな感じのメッセージが返ってきていることになるみたい。
*3
$9
subscribe
$10
my_channel
:1
*3
で3要素の配列、$n
で後続のデータのバイト長を表してデータの区切りを示している。RESPには2と3の2つのバージョンがあるみたいだが、PubSubでは気にしなくて良さそう。
message
2. チャネルにPUBLISHされたメッセージ
- メッセージの種類:
message
- 発行元のチャンネル名
- メッセージ本文
valkey-cliでPUBLISHされたときのSubscriber側の出力を再掲
次にPublisher側でPUBLISH
Publisher側ターミナルのvalkey-cliPUBLISH my_channel "This is a test message"
P出力(integer) 1
Subscriber側のターミナルには以下が出力される
Subscriber側のターミナル出力1) "message" 2) "my_channel" 3) "This is a test message"
データベースとスコーピング
Pub/SubはDB番号を無視する
- ValkeyのPub/Subはキー空間(key space)には一切干渉しない
- どのDB番号でPUBLISHしても、どのDB番号でSUBSCRIBEしても同じチャンネルに届く
Subscriber側で、データベース番号1を指定(-n 1
)してSUBSCRIBE
valkey-cli -n 1 SUBSCRIBE my_channel
1) "subscribe"
2) "my_channel"
3) (integer) 1
Publisher側で、データベース番号10を指定(-n 10
)してPUBLISH
valkey-cli -n 10 PUBLISH my_channel "message from database 10"
(integer) 1
Subscriber側のターミナルには以下が出力される。
1) "message"
2) "my_channel"
3) "message from database 10"
したがって、DB番号では分けることができないため、名前で分ける必要がある。例えば、test
/staging
/production
みたいな環境名とか。
valkey-cli SUBSCRIBE production:my_channel
valkey-cli PUBLISH production:my_channel "message from production"
パターン購読
PSUBSCRIBE
/ PUNSUBSCRIBE
で、glob形式のワイルドカードを使うと、複数チャネルを購読開始・停止できる。
例えば、news.*
というチャネルを購読する。
valkey-cli PSUBSCRIBE news.*
1) "psubscribe"
2) "news.*"
3) (integer) 1
通常のSUBSCRIBEと同じメッセージ形式になっている。
PUBLISHしてみる
valkey-cli PUBLISH news.channel_1 "message from channel 1"
(integer) 1
valkey-cli PUBLISH news.channel_2 "message from channel 2"
(integer) 1
Subscriber側の出力は以下となる。
1) "pmessage"
2) "news.*"
3) "news.channel_1"
4) "message from channel 1"
1) "pmessage"
2) "news.*"
3) "news.channel_2"
4) "message from channel 2"
こちらはメッセージのフォーマットが通常のものとは違って4要素になっている。
- メッセージの種類:
pmessage
- マッチしたチャネルのパターン
- 実際のチャネル
- メッセージ
注意すべき点としては、以下の場合に同じメッセージを複数回受信してしまう可能性がある
- 複数のパターン購読にマッチ
- 通常のチャネル購読にパターン購読がマッチ
実際にやってみる。Subscriber側。
valkey-cli
SUBSCRIBE foo
チャネルfoo
を購読。このあと適当にキー入力してsubscribed modeに。
1) "subscribe"
2) "foo"
3) (integer) 1
Reading messages... (press Ctrl-C to quit or any key to type command)
127.0.0.1:6379(subscribed mode)>
f*
でパターン購読
PSUBSCRIBE f*
2つのチャネルを購読しているのがわかる。
1) "psubscribe"
2) "f*"
3) (integer) 2
別ターミナルでPUBLISHしてみる。
valkey-cli PUBLISH foo "message from foo"
Subscriber側の出力は以下のように同じメッセージをそれぞれ受信しているが、それぞれ形式が違うのがわかる。
1) "message"
2) "foo"
3) "message from foo"
1) "pmessage"
2) "f*"
3) "foo"
4) "message from foo"
シャーディングPub/Sub
クラスタモードでグローバルにPubSubすると、全ノードにメッセージが送信されるため、場合によってはトラフィックに影響する可能性がある。これをシャード単位に限定して送信するのがシャーディングPub/Sub。この場合に使用するコマンドは以下となる。
- SSUBSCRIBE
- SUNSUBSCRIBE
- SPUBLISH
チャネル名がシャードになるみたいだけど、クラスタモードはちょっとパス。
あと、「キースペース通知」というのがある。
これは、Valkeyのデータセットに対して起こった操作(キーの削除、リストへのLPUSH、キーの期限切れ、など)を 特殊なPub/Subチャンネルを通してリアルタイムに「イベント」として受け取れる。
少し運用用途かなという気もするけど、アプリケーションでも何かしら使えるかもしれない。
永続化が必要な場合はStreamを検討
ということで、Streamを使ったPubSubについても見ていく。Streamそのものについては以下。
Listening for new items with XREAD
同じストリームの「あるタイミング以降の新しいメッセージを複数のクライアントに配信」したい場合は、
-
XREAD
(特に BLOCK オプション)を複数クライアントで並列に実行 -
XADD
でストリームに新規データが追加されるたび、待機中の全クライアントに同じメッセージが届く
だけで、 fan-out なPub/Subは実現できる。
ターミナルを2つ開いて、valkey-cliを起動し、以下をそれぞれのターミナルで実施。
XREAD BLOCK 0 STREAMS race:france $
これがSuscriber側になる。
さらにもう1つターミナルを開いて、valkey-cliから以下を実施。こちらがPublisher想定になる。
XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
"1751177491153-0"
Publisher側の出力は2つとも以下となる。
1) 1) "race:france"
2) 1) 1) "1751177491153-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
(34.33s)
ただし、この場合はPub/Subとは違ってメッセージは削除されない。XDEL
/XTRIM
で削除する必要がある。
Consumer groups
同じストリームを複数のワーカーで分散処理したい場合はConsumer groupsを使う
- ストリームに来たメッセージを「グループ」に割り振る
- 各ワーカー(Consumer)が自分に割り当てられたメッセージだけを処理
- 処理後に ACK で完了を通知 → 未ACKは再度配信可能
Consumer groupsは以下について一定の保証を提供する
- メッセージの一意配信
- 各メッセージは必ず異なるConsumerにだけ渡される。
- 同じメッセージが複数のConsumerに重複して配信されることはない
-
Consumerの識別と永続状態
- Consumerは名前で識別(case-sensitive)される。再接続しても「自分が受け取ったメッセージ」「保留中のメッセージ」をそのまま引き継ぐことが可能。
- ただし、名前はクライアント側でユニークである必要がある。
-
「まだ誰にも消費されていない最初のID」を管理
- 新しく読み込むときは「前回最後に渡したID」の次から取得できる。
- 何度同じグループで読み始めても、既読を飛ばして未読分だけを受け取れる
-
ACKによる明示的な完了通知
- 処理が終わったメッセージは XACK で明示的にサーバに知らせる必要がある
- これによりConsumer gropから削除される。
-
保留中メッセージのトラッキング
- 配信済みだがまだACKされていない「保留中」のメッセージを管理。
- 自分に割り当てられた履歴だけが見れるため、落ちたときの再処理や別ワーカーへの再割当(XPENDING/XCLAIM)が可能
主なコマンド
-
XGROUP
: グループの作成・削除などの管理 -
XREADGROUP
: グループ経由でストリームを読む -
XACK
: 処理済みをマークする
Consumer groupの作成
valkey-cliで。まず、一旦全部のキーを削除
FLUSHDB
XGROUP CREATE ストリーム名 グループ名
でグループを作成する。$
でこれ以降のエントリのみを対象にする。
XGROUP CREATE race:kokura kokura_jockeys $
エラー。キーは事前に存在する必要がある。ただし、MKSTREAM
を付与すると、グループ作成時にキーも自動的に作成してくれる。
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
ということでMKSTREAM
をつけてグループを作成。
XGROUP CREATE race:kokura kokura_jockeys $ MKSTREAM
OK
メッセージを送信
各ストリームにメッセージをまとめて送信しておく
XADD race:kokura * jockey Kawada
XADD race:kokura * jockey Matsuyama
XADD race:kokura * jockey R_Sakai
XADD race:kokura * jockey Danno
XADD race:kokura * jockey Miyuki
XADD race:kokura * jockey Takasugi
メッセージの取得
XREADGROUP グループ名 ワーカー名
で、グループのメッセージを1件取得する。>
はConsumer groupでのみ使える特別なIDで、「未配信」のメッセージを指す。
XREADGROUP GROUP kokura_jockeys worker1 COUNT 1 BLOCK 0 STREAMS race:kokura >
1) 1) "race:kokura"
2) 1) 1) "1751842246479-0"
2) 1) "jockey"
2) "Kawada"
別のワーカーで。
XREADGROUP GROUP kokura_jockeys worker2 COUNT 1 BLOCK 0 STREAMS race:kokura >
1) 1) "race:kokura"
2) 1) 1) "1751842246485-0"
2) 1) "jockey"
2) "Matsuyama"
各ワーカーごとにメッセージの処理が個別に行われていることがわかる。
ただしこれらは各ワーカーがまだ「保留中」であって、「完了」されたわけではない。各ワーカーの保留中=処理中のメッセージを見るにはIDに0
を指定する。以下、ワーカー名が違う点に注意。
XREADGROUP GROUP kokura_jockeys worker1 STREAMS race:kokura 0
1) 1) "race:kokura"
2) 1) 1) "1751842246479-0"
2) 1) "jockey"
2) "Kawada"
XREADGROUP GROUP kokura_jockeys worker2 STREAMS race:kokura 0
1) 1) "race:kokura"
2) 1) 1) "1751842246485-0"
2) 1) "jockey"
2) "Matsuyama"
処理済みを返すにはXACK
を使う。worker1で処理を完了したものとする。
XACK race:kokura kokura_jockeys 1751842246479-0
(integer) 1
worker1の保留中メッセージからは消える
XREADGROUP GROUP kokura_jockeys worker1 STREAMS race:kokura 0
1) 1) "race:kokura"
2) (empty array)
新しいworker-3で複数のメッセージを同時に取得してみる。
XREADGROUP GROUP kokura_jockeys worker3 COUNT 2 BLOCK 0 STREAMS race:kokura >
1) 1) "race:kokura"
2) 1) 1) "1751842246487-0"
2) 1) "jockey"
2) "R_Sakai"
2) 1) "1751842246487-1"
2) 1) "jockey"
2) "Danno"
XPENDING
を使うとconsumer group内の保留状態を監視できる。
XPENDING race:kokura kokura_jockeys
以下のように出力される
1) (integer) 3 # ワーカーによって保留中なメッセージ総数
2) "1751842246485-0" # 保留中メッセージの最も下位のID
3) "1751842246487-1" # 保留中メッセージの最も上位のID
4) 1) 1) "worker2" # 各ワーカー(コンシュマー)ごとの保留メッセージ数
2) "1"
2) 1) "worker3"
2) "2"
開始ID・終了IDをXRANGE
と同じように- +
で指定して件数を指定するとより細かく確認できる。
XPENDING race:kokura kokura_jockeys - + 10
以下のようにより詳細に確認できる。メッセージが配信された回数がなぜか2になっているものがあるが、おそらくID>
で「取得」したのと、ID0
で指定して「確認」したからじゃないかなぁ。確認用途の場合はXREADGROUP
は使わずにXPENDING
を使うのが良さそう。
1) 1) "1751842246485-0"
2) "worker2"
3) (integer) 861235 # ワーカーが取得してからのアイドル時間
4) (integer) 2 # メッセージが配信された回数
2) 1) "1751842246487-0"
2) "worker3"
3) (integer) 436678
4) (integer) 1
3) 1) "1751842246487-1"
2) "worker3"
3) (integer) 436678
4) (integer) 1