略解Redis Streams & コマンドチートシート
こんにちは、@key60228です。
私たちが開発しているAI Workerでは、AIエージェントのタスク管理、メッセージキューイング基盤としてRedis Streamsを採用しています[1]。
そこで本記事では、Redis Streamsの概要と関連するコマンドについて一通りまとめてみました。
Redis Streamsとは
Redis StreamsはRedis 5.0から追加された概念です。
stream自体は strings
や hashes
同様、いくつかあるデータ型の1つですが、
All that said, a Redis Stream is quite distinctly its own thing. It’s not exactly fair to say they are like structure foo but with feature bar. There is a unique capability of Redis Streams that sets it apart from any other existing data structure: consumer groups that allow various clients to consume a stream with their own position.
Redis 5.0 is here! -- Redis
とある通り、やや特殊な存在となっています。
リアルタイムなメッセージングシステムとして利用することができるという点ではRedis Pub/Subに似た印象を受けますが、Redis Streamsはもう少し高機能で、配信後もメッセージが揮発しなかったり、メッセージの配信状況を確認できたりします。
基本概念
Redis Streamsの各コマンドを理解する上で把握しておくべき基本概念がいくつかあります。
Stream
StreamはEntry (Message) を格納していくデータ構造です。
Redis Pub/SubにおけるChannelや、AWS SQSにおけるQueueに相当するもの、と考えると分かりやすいかもしれません。
Entry
Streamに追加される最小単位のデータをEntryと呼びます。
いわゆるQueueのMessageに相当するもので、Streamに対して追加されるデータは全てEntryとして扱われます。
各Stream内における一意なIDを持ち、内容はフィールドと値のペアで構成されます。
Consumer
ConsumerはStreamからEntryを読むクライアントのことを指します。
単純な読み取りだけであれば特別な設定をせずにEntryを取得できますが、単一のStreamに対して複数のConsumerで負荷分散を行う等の場合は次に説明するConsumer Groupと組み合わせて使うのが一般的です。
Consumer Group
Consumer Groupは、その名の通り、複数のConsumerをグループ化した概念です。
Redis StreamsではConsumer Group単位でメッセージの処理状況を管理するため、あるConsumerに配信されたがまだ完了通知されていないEntryを追跡したり、Consumer Group内の他のConsumerに再配信したりすることができます。
この処理状況を管理する仕組みの1つが次で説明するPending Entry List (PEL) です。
Pending Entry List (PEL)
Pending Entry List (PEL) は、先述の通り、配信済みだがまだACKされていないEntryの一覧です。
内部的にはConsumer Groupごとに管理されており、EntryがどのConsumerに配信されたか、配信されてからどれくらいの時間が経過しているか、などの情報を持っています。
Dockerでサッとコマンド (redis-cli) を試す
$ docker run -itd --rm --name redis_streams_test redis:7.4
$ docker exec -it redis_streams_test redis-cli
Entryの追加を行うコマンド
XADD] Entryを追加する
[XADD ${stream name} [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] ${threshold} [LIMIT ${count}]] ${id} ${field} ${value} [${field} ${value}...]
実行例
127.0.0.1:6379> XADD test * field1 value1
"1745907039108-0"
Options
${stream name} (required)
KeyとなるStreamの名前の指定。
NOMKSTREAM (optional)
Streamを作成するかどうかのオプション。
このオプションが指定されていて、かつStreamが存在しない場合は (nil)
が返ります。
127.0.0.1:6379> XADD test NOMKSTREAM * field1 value1
(nil)
127.0.0.1:6379> XLEN test
(integer) 0
MAXLEN (optional)
Entry追加後のStreamの最大長を指定するオプション。
=
は厳密な最大長を指定でき、~
はおおよその精度で最大長を指定できるオプションです。
演算子は省略することも可能で、省略した場合は =
が指定されたものとして扱われます。
パフォーマンス的には ~
の方が良いとされています。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XADD test MAXLEN = 2 * field1 value1
"1745905673169-0"
127.0.0.1:6379> XLEN test
(integer) 2
また、 LIMIT
オプションを指定することで、最大長を超えた場合に削除するEntryの数を指定できます。
Entryの削除にはそれなりの計算コストがかかるため、予期せず長時間のブロッキングが発生することを避けるためのオプションだそうです。
なお、LIMIT
オプションを指定する場合、演算子は ~
である必要があります。
127.0.0.1:6379> XLEN test
(integer) 2000
127.0.0.1:6379> XADD test MAXLEN = 1000 LIMIT 100 * field1 value1
(error) ERR syntax error, LIMIT cannot be used without the special ~ option
127.0.0.1:6379> XADD test MAXLEN ~ 1000 LIMIT 100 * field1 value1
"1745907027051-0"
127.0.0.1:6379> XLEN test
(integer) 1941
127.0.0.1:6379> XADD test MAXLEN ~ 1000 LIMIT 100 * field1 value1
"1745907039108-0"
127.0.0.1:6379> XLEN test
(integer) 1842
127.0.0.1:6379> XADD test MAXLEN ~ 1000 LIMIT 100 * field1 value1
"1745907054628-0"
127.0.0.1:6379> XLEN test
(integer) 1743
127.0.0.1:6379> XADD test MAXLEN ~ 1000 LIMIT 100 * field1 value1
"1745907057669-0"
127.0.0.1:6379> XLEN test
(integer) 1644
MINID (optional)
MAXLEN
同様にStreamのtrimを行うオプションですが、こちらはStream内の最小IDを指定するオプションです。
MAXLEN
同様 =
と ~
の演算子が指定でき、LIMIT
オプションも指定することができます。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745907461371-0"
...
2) 1) "1745907462112-0"
...
3) 1) "1745907462685-0"
...
4) 1) "1745907463290-0"
...
5) 1) "1745907463948-0"
...
127.0.0.1:6379> XADD test MINID = 1745907462685-0 * field1 value1
"1745907573491-0"
127.0.0.1:6379> XRANGE test - +
1) 1) "1745907462685-0"
...
2) 1) "1745907463290-0"
...
3) 1) "1745907463948-0"
...
4) 1) "1745907573491-0"
...
${id} (required)
Streamに追加するEntryのIDを指定します。
*
を指定するとRedisが現在のタイムスタンプをもとに自動で一意なIDを生成してくれます。
任意のIDを指定する場合はstream内で一意であることに加え、IDの形式は ${num}-${num}
である必要があったり、現存するEntryのIDよりも大きい必要があったりと、いくつかの制約があります。
通常はユーザー側でIDを設定・管理する必要はなく、ほとんどのケースで *
を使うことが推奨されています。
When a user specified an explicit ID to XADD, the minimum valid ID is 0-1, and the user must specify an ID which is greater than any other ID currently inside the stream, otherwise the command will fail and return an error. Usually resorting to specific IDs is useful only if you have another system generating unique IDs (for instance an SQL table) and you really want the Redis stream IDs to match the one of this other system.
XADD -- Redis
127.0.0.1:6379> XADD test 100-0 field1 value1
"100-0"
127.0.0.1:6379> XADD test * field1 value1
"1745909207620-0"
127.0.0.1:6379> XADD test 101-0 field1 value1
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
未来のタイムスタンプのIDを指定することも可能ではあり、それ以降のXADDで *
を指定した場合はsequence番号が更新されていきます。
127.0.0.1:6379> XADD test 1747000000000-0 field1 value1
"1747000000000-0"
127.0.0.1:6379> XADD test * field1 value1
"1747000000000-1"
127.0.0.1:6379> XADD test * field1 value1
"1747000000000-2"
127.0.0.1:6379> XADD test * field1 value1
"1747000000000-3"
${field} ${value} (required)
Entryのフィールド名と値を指定します。
複数のフィールドと値を指定することも可能です。
127.0.0.1:6379> XADD test * field1 value1
"1745909522915-0"
127.0.0.1:6379> XRANGE test - +
1) 1) "1745909522915-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XADD test * field1 value1 field2 value2
"1745909576214-0"
127.0.0.1:6379> XRANGE test - +
1) 1) "1745909522915-0"
2) 1) "field1"
2) "value1"
2) 1) "1745909576214-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
単純なEntryの取得を行うコマンド
XREAD] Entryを取得する
[XREAD [COUNT ${count}] [BLOCK ${milliseconds}] STREAMS ${stream name} [${stream name}...] ${id} [${id}...]
実行例
127.0.0.1:6379> XREAD STREAMS test test2 0-0 0-0
1) 1) "test"
2) 1) 1) "1745909522915-0"
2) 1) "field1"
2) "value1"
2) 1) "1745909576214-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
2) 1) "test2"
2) 1) 1) "1745910673926-0"
2) 1) "field1"
2) "value1"
Options
COUNT (optional)
取得するEntryの最大数を指定します。
指定しない場合は全ての該当するEntryを取得します。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XREAD COUNT 2 STREAMS test 0-0
1) 1) "test"
2) 1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREAD STREAMS test 0-0
1) 1) "test"
2) 1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
BLOCK (optional)
Entryを取得するまで待機する時間を指定するオプションです。
127.0.0.1:6379> XLEN test
(integer) 0
127.0.0.1:6379> XREAD BLOCK 30000 STREAMS test 0-0
# === 別プロセス ===
127.0.0.1:6379> XADD test * field1 value1
"1745910498197-0"
# ================
1) 1) "test"
2) 1) 1) "1745910498197-0"
2) 1) "field1"
2) "value1"
(8.34s)
指定しない場合は同期的に取得されます。
127.0.0.1:6379> XLEN test10
(integer) 0
127.0.0.1:6379> XREAD STREAMS test10 0-0
(nil)
また、0
が指定された場合は無期限に待機します。
${stream name} (required)
取得するStreamの名前を指定します。
複数のStreamを指定することも可能です。
複数のStreamを指定する場合はIDもそれぞれ指定する必要があります。
127.0.0.1:6379> XLEN test
(integer) 2
127.0.0.1:6379> XREAD STREAMS test 0-0
1) 1) "test"
2) 1) 1) "1745909522915-0"
2) 1) "field1"
2) "value1"
2) 1) "1745909576214-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
127.0.0.1:6379> XLEN test2
(integer) 1
127.0.0.1:6379> XREAD STREAMS test test2 0-0 0-0
1) 1) "test"
2) 1) 1) "1745909522915-0"
2) 1) "field1"
2) "value1"
2) 1) "1745909576214-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
2) 1) "test2"
2) 1) 1) "1745910673926-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREAD STREAMS test test2 0-0
(error) ERR Unbalanced 'xread' list of streams: for each stream key an ID or '$' must be specified.
${id} (required)
取得する基準となるEntryのIDを指定します。
指定されたID 以降の Entryを取得します。
一見すると該当のIDのみを取得するように見えますが、実際には指定されたIDよりも大きいIDのEntryを全て取得するので注意が必要です。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
# 3つ目のEntry IDを指定
127.0.0.1:6379> XREAD STREAMS test 1745910235802-0
1) 1) "test"
2) 1) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
# 5つ目のEntry IDを指定
127.0.0.1:6379> XREAD STREAMS test 1745910236980-0
(nil)
また、$
を指定すると「最新のEntry ID」を指定することができます。
BLOCK
オプションと組み合わせることで、Entryが新しく追加されるまで待機することができ、リアルタイムメッセージングに有用です。
XRANGE] Entryを範囲指定でIDが小さい順に取得する
[XRANGE ${stream name} ${start id} ${end id} [COUNT ${count}]
実行例
127.0.0.1:6379> XRANGE test - +
1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
Options
${stream name} (required)
取得するStreamの名前を指定します。
${start id} ${end id} (required)
取得する範囲の起点と終点となるEntry IDを指定します。
-
を指定するとstream内の「最も古いEntry ID」を指定することができ、+
を指定すると「最も新しいEntry ID」を指定することができるため、startとendにそれぞれを指定することでstream内の全てのEntryを取得することができます。
該当するEntryがない場合は空配列が返ります。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XRANGE test - +
1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XRANGE test 1745910235802-0 1745910236454-0
1) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XRANGE test + -
(empty array)
COUNT (optional)
取得するEntryの最大数を指定します。
指定しない場合は全ての該当するEntryを取得します。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XRANGE test - + COUNT 2
1) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
XREVRANGE] Entryを範囲指定でIDが大きい順に取得する
[XREVRANGE ${stream name} ${end id} ${start id} [COUNT ${count}]
実行例
127.0.0.1:6379> XREVRANGE test + -
1) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
Options
${stream name} (required)
取得するStreamの名前を指定します。
${end id} ${start id} (required)
取得する範囲の終点と起点となるEntry IDを指定します。
XRANGE 同様 -
と +
記号が使用可能です。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XREVRANGE test + -
1) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
3) 1) "1745910235802-0"
2) 1) "field1"
2) "value1"
4) 1) "1745910235115-0"
2) 1) "field1"
2) "value1"
5) 1) "1745910234281-0"
2) 1) "field1"
2) "value1"
COUNT (optional)
取得するEntryの最大数を指定します。
127.0.0.1:6379> XREVRANGE test + - COUNT 2
1) 1) "1745910236980-0"
2) 1) "field1"
2) "value1"
2) 1) "1745910236454-0"
2) 1) "field1"
2) "value1"
Entryの削除を行うコマンド
XDEL] Entryを削除する
[XDEL ${stream name} ${id} [${id}...]
実行例
127.0.0.1:6379> XDEL test 1746000000000-0
(integer) 1
Options
${stream name} (required)
Streamの名前を指定します。
${id} (required)
削除する対象のEntry IDを指定します。
複数指定可能で、削除したEntryの数が返ります。
127.0.0.1:6379> XRANGE test - +
1) 1) "100-0"
2) 1) "field1"
2) "value1"
2) 1) "1745909207620-0"
2) 1) "field1"
2) "value1"
3) 1) "1746000000000-0"
2) 1) "field1"
2) "value1"
4) 1) "1747000000000-0"
2) 1) "field1"
2) "value1"
5) 1) "1747000000000-1"
2) 1) "field1"
2) "value1"
6) 1) "1747000000000-2"
2) 1) "field1"
2) "value1"
7) 1) "1747000000000-3"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XDEL test 1746000000000-0
(integer) 1
127.0.0.1:6379> XDEL test 1747000000000-1 1747000000000-2
(integer) 2
127.0.0.1:6379> XDEL test 999-9
(integer) 0
XTRIM] 古いEntryを削除する
[XTRIM ${stream name} <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
実行例
127.0.0.1:6379> XTRIM test MAXLEN = 2
(integer) 2
Options
${stream name} (required)
Streamの名前を指定します。
MAXLEN | MINID (required)
削除する対象のEntryを指定するためのオプションです。
詳細はXADDの MAXLEN
/ MINID
オプションを参照してください。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745907462685-0"
2) 1) "field1"
2) "value1"
2) 1) "1745907463290-0"
2) 1) "field1"
2) "value1"
3) 1) "1745907463948-0"
2) 1) "field1"
2) "value1"
4) 1) "1745907573491-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XTRIM test MAXLEN = 2
(integer) 2
127.0.0.1:6379> XRANGE test - +
1) 1) "1745907463948-0"
2) 1) "field1"
2) "value1"
2) 1) "1745907573491-0"
2) 1) "field1"
2) "value1"
Consumer Groupの設定・更新・削除を行うコマンド
XGROUP CREATE] Consumer Groupを作成する
[XGROUP CREATE ${stream name} ${group name} ${id} [MKSTREAM] [ENTRIESREAD ${entries id}]
実行例
127.0.0.1:6379> XGROUP CREATE test group $
OK
Options
${stream name} (required)
Consumer Groupを作成するStreamの名前を指定します。
${group name} (required)
Consumer Groupの名前を指定します。
既に存在しているConsumer Group名を指定した場合はエラーになります。
127.0.0.1:6379> XGROUP CREATE test group $
OK
127.0.0.1:6379> XGROUP CREATE test group $
(error) BUSYGROUP Consumer Group name already exists
${id} (required)
Consumer Groupの配信の起点となるEntry IDを指定します。
0
を指定するとStream内の最も古いEntry IDが指定され、XREAD同様に $
を指定するとStream内の最新のEntry IDが指定されます。
MKSTREAM (optional)
Groupの作成時にStreamが存在しない場合に作成するオプションです。
このオプションが指定されておらず、かつStreamが存在しない場合はエラーになります。
127.0.0.1:6379> XGROUP CREATE test group $
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
127.0.0.1:6379> XGROUP CREATE test group $ MKSTREAM
OK
ENTRIESREAD (optional)
Consumer Groupの内部情報を指定するオプションです。
デフォルトは0で、基本的にユーザーが指定するケースはなく、Redis内部で使用されているオプションです。
XGROUP DESTROY] Consumer Groupを削除する
[XGROUP DESTROY ${stream name} ${group name}
実行例
127.0.0.1:6379> XGROUP DESTROY test group
(integer) 1
Options
${stream name} (required)
該当のStreamの名前を指定します。
存在しないStreamを指定した場合はエラーになります。
127.0.0.1:6379> XGROUP DESTROY test group
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
${group name} (required)
削除するConsumer Groupの名前を指定します。
指定したConsumer Groupが存在しない場合もエラーにはなりません。
127.0.0.1:6379> XGROUP DESTROY test group
(integer) 1
127.0.0.1:6379> XGROUP DESTROY test group2
(integer) 0
XGROUP CREATECONSUMER] Consumerを作成する
[XGROUP CREATECONSUMER ${stream name} ${group name} ${consumer name}
実行例
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 1
Options
${stream name} (required)
Consumer Groupを作成するStreamの名前を指定します。
存在しないStreamを指定した場合はエラーになります。
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
${group name} (required)
Consumer Groupの名前を指定します。
指定したConsumer Groupが存在しない場合はエラーになります。
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 1
127.0.0.1:6379> XGROUP CREATECONSUMER test group2 consumer
(error) NOGROUP No such consumer group 'group2' for key name 'test'
${consumer name} (required)
Consumerの名前を指定します。
指定したConsumer名が既に存在している場合もエラーにはなりません。
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 1
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 0
XGROUP DELCONSUMER] Consumerを削除する
[XGROUP DELCONSUMER ${stream name} ${group name} ${consumer name}
実行例
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 1
Options
${stream name} (required)
Consumer Groupを作成するStreamの名前を指定します。
存在しないStreamを指定した場合はエラーになります。
127.0.0.1:6379> XGROUP DELCONSUMER test group consumer
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
${group name} (required)
Consumer Groupの名前を指定します。
指定したConsumer Groupが存在しない場合はエラーになります。
127.0.0.1:6379> XGROUP DELCONSUMER test group2 consumer
(error) NOGROUP No such consumer group 'group2' for key name 'test'
${consumer name} (required)
削除するConsumerの名前を指定します。
指定したConsumer名が存在しない場合もエラーにはなりません。
127.0.0.1:6379> XGROUP CREATECONSUMER test group consumer
(integer) 1
127.0.0.1:6379> XGROUP DELCONSUMER test group consumer
(integer) 0
127.0.0.1:6379> XGROUP DELCONSUMER test group consumer
(integer) 0
XGROUP SETID] Consumer Groupの最後の配信Entry IDを設定する
[XGROUP SETID ${stream name} ${group name} ${id}
実行例
127.0.0.1:6379> XGROUP SETID test group $
OK
Options
${stream name} (required)
Consumer Groupを作成するStreamの名前を指定します。
存在しないStreamを指定した場合はエラーになります。
127.0.0.1:6379> XGROUP SETID test group $
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
${group name} (required)
Consumer Groupの名前を指定します。
指定したConsumer Groupが存在しない場合はエラーになります。
127.0.0.1:6379> XGROUP SETID test group $
(error) NOGROUP No such consumer group 'group' for key name 'test'
${id} (required)
Consumer Groupの配信の起点となるEntry IDを指定します。
これもXREAD同様に $
を指定するとStream内の最新のEntry IDが指定されます。
ただし、このコマンドはXGROUP CREATEの ENTRIESREAD
オプションと同様に、ユーザーが使用するケースは稀です。
127.0.0.1:6379> XGROUP SETID test group $
OK
Consumer Groupを使ったEntryの取得を行うコマンド
XREADGROUP] EntryをConsumer Groupとして取得する
[XREADGROUP GROUP ${group name} ${consumer name} [COUNT ${count}] [BLOCK ${milliseconds}] [NOACK] STREAMS ${stream name} [${stream name}...] ${id} [${id}...]
実行例
127.0.0.1:6379> XREADGROUP GROUP group consumer STREAMS test >
1) 1) "test"
2) 1) 1) "1745906970475-0"
2) 1) "field1"
2) "value1"
2) 1) "1745906970475-1"
2) 1) "field1"
2) "value1"
3) 1) "1745906970476-0"
2) 1) "field1"
2) "value1"
Options
${group name} (required)
対象のConsumer Groupの名前を指定します。
指定したConsumer Groupが存在しない場合はエラーになります。
127.0.0.1:6379> XREADGROUP GROUP group consumer STREAMS test $
(error) NOGROUP No such key 'test' or consumer group 'group' in XREADGROUP with GROUP option
${consumer name} (required)
Consumerの名前を指定します。
指定したConsumerが存在しない場合は自動的に作成されます。
127.0.0.1:6379> XREADGROUP GROUP group consumer STREAMS test >
1) 1) "test"
2) 1) 1) "1745906970475-0"
2) 1) "field1"
2) "value1"
2) 1) "1745906970475-1"
2) 1) "field1"
2) "value1"
3) 1) "1745906970476-0"
2) 1) "field1"
2) "value1"
COUNT (optional)
取得するEntryの最大数を指定します。
詳細はXREADの COUNT
オプションを参照してください。
BLOCK (optional)
Entryを取得するまで待機する時間を指定するオプションです。
詳細はXREADの BLOCK
オプションを参照してください。
ただし、後述のIDパラメータで >
を指定していない場合はこのオプションは無視されます。
NOACK (optional)
完了通知を送る必要をなくすオプションです。
通常このオプションが指定されていない場合、XREADGROUPで取得されたEntryはPELに格納され、完了通知を待ちます。
このオプションを指定することで、Consumerから明示的に完了通知を送る必要がなくなります。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745957036448-0"
2) 1) "field1"
2) "value1"
2) 1) "1745957037730-0"
2) 1) "field1"
2) "value1"
3) 1) "1745957038255-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP group consumer COUNT 1 STREAMS test >
1) 1) "test"
2) 1) 1) "1745957036448-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XPENDING test group
1) (integer) 1
2) "1745957036448-0"
3) "1745957036448-0"
4) 1) 1) "consumer"
2) "1"
127.0.0.1:6379> XREADGROUP GROUP group consumer COUNT 1 NOACK STREAMS test >
1) 1) "test"
2) 1) 1) "1745957037730-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XPENDING test group
1) (integer) 1
2) "1745957036448-0"
3) "1745957036448-0"
4) 1) 1) "consumer"
2) "1"
ただし、後述のIDパラメータで >
を指定していない場合はこのオプションは無視されます。
${stream name} (required)
取得するStreamの名前を指定します。
複数のStreamを指定することも可能ですが、その場合はIDもそれぞれ指定する必要があります。
詳細はXREADの stream name
オプションを参照してください。
${id} (required)
取得するEntryのIDを指定します。
IDは特殊記号 >
または任意のIDを指定することができますが、それぞれ挙動が異なります。
>
を指定すると、他のConsumerによってREADされていないEntryを取得することができます。
一方、>
以外の任意のIDを指定した場合は、該当のConsumerによって既にREADされた、指定のIDよりも大きなIDのEntryを取得します。
つまり、>
の場合はPELに格納されていないEntryを、それ以外のIDの場合はPELに格納されているEntryを取得することになります。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
2) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
3) 1) "1745992558436-0"
2) 1) "field1"
2) "value1"
4) 1) "1745992559064-0"
2) 1) "field1"
2) "value1"
5) 1) "1745992559709-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP group consumer1 COUNT 1 STREAMS test >
1) 1) "test"
2) 1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP group consumer2 COUNT 1 STREAMS test >
1) 1) "test"
2) 1) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP group consumer2 STREAMS test 0
1) 1) "test"
2) 1) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
XPENDING] PELのEntryを取得する
[XPENDING ${stream name} ${group name} [[IDLE ${milliseconds}] ${start} ${end} ${count} [${consumer name}]]
実行例
127.0.0.1:6379> XPENDING test group - + 1
1) 1) "1745992556849-0"
2) "consumer3"
3) (integer) 2947487
4) (integer) 2
Options
${stream name} (required)
Streamの名前を指定します。
${group name} (required)
Consumer Groupの名前を指定します。
IDLE (optional)
取得するEntryのIdle時間を指定するオプションです。
指定しない場合は全ての該当するEntryを取得します。
${start} ${end} ${count} (optional)
取得するEntryの起点ID、終点ID、最大数を指定するオプションです。
これらのオプションはセットとなっており、どれか1つのみ指定することはできません。
start
と end
についてはXRANGEと同様の特殊記号 (-
/ +
) が使用可能です。
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer3"
2) "1"
127.0.0.1:6379> XPENDING test group - + 1
1) 1) "1745992556849-0"
2) "consumer3"
3) (integer) 2947487
4) (integer) 2
${consumer name} (optional)
対象のEntryを保持しているConsumerの名前を指定するオプションです。
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer3"
2) "1"
127.0.0.1:6379> XPENDING test group - + 10 consumer1
1) 1) "1745992557786-0"
2) "consumer1"
3) (integer) 755467
4) (integer) 8
XCLAIM] PELのEntryを取得し、Consumerを再割り当てする (対象ID指定)
[XCLAIM ${stream name} ${group name} ${consumer name} ${minimum idle time} $id [$id...] [IDLE ${milliseconds}] [TIME ${milliseconds}] [RETRYCOUNT ${count}] [FORCE] [JUSTID] [LASTID ${last id}]
実行例
127.0.0.1:6379> XCLAIM test group consumer3 10000 1745992556849-0
1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
Options
${stream name} (required)
Streamの名前を指定します。
${group name} (required)
Consumer Groupの名前を指定します。
${consumer name} (required)
Entryを再割り当てするConsumerの名前を指定します。
${minimum idle time} (required)
後述のIDパラメータで指定されたEntryのうち、ここで指定した時間以上が経過しているEntryを取得します。
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
127.0.0.1:6379> XCLAIM test group consumer3 10000 1745992556849-0
1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer2"
2) "1"
2) 1) "consumer3"
2) "1"
${id} (required)
取得するEntryのIDを指定します。
複数指定可能で、指定されたEntryのうち、先述のminimum idle timeが経過しているEntryを取得します。
この性質上、XPENDINGと組み合わせて使用されることが多いです。
IDLE (optional)
取得したEntryのIdle時間を更新するオプションです。
指定しない場合は取得したEntryのIdle時間はリセットされます。
TIME (optional)
IDLE
オプション同様、取得したEntryのIdle時間を更新するオプションです。
ただし、IDLE
オプションは相対的な時間を指定するのに対し、TIME
オプションはUNIX時間で絶対的な時間を指定します。
RETRYCOUNT (optional)
取得したEntryの配信回数を更新するオプションです。
通常配信回数はXREADGROUPで取得した際のみ更新されますが、このオプションを指定することでXCLAIM時も更新が可能です。
FORCE (optional)
指定されたEntryがまだ誰からもREADされていない場合でも、強制的に取得、PELに格納するオプションです。
127.0.0.1:6379> XRANGE test - +
1) 1) "1745909207620-0"
2) 1) "field1"
2) "value1"
2) 1) "1747000000000-0"
2) 1) "field1"
2) "value1"
3) 1) "1747000000000-3"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XPENDING test group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
127.0.0.1:6379> XCLAIM test group consumer 1000 1745909207620-0
(empty array)
127.0.0.1:6379> XPENDING test group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
127.0.0.1:6379> XCLAIM test group consumer 1000 1745909207620-0 FORCE
1) 1) "1745909207620-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XPENDING test group
1) (integer) 1
2) "1745909207620-0"
3) "1745909207620-0"
4) 1) 1) "consumer"
2) "1"
JUSTID (optional)
フィールドは取得せず、IDのみを取得するオプションです。
127.0.0.1:6379> XCLAIM test3 group consumer 1000 1745909207620-0 JUSTID
1) "1745909207620-0"
LASTID (optional)
Consumer Groupの最後の配信Entry IDを更新するオプションです。
既存の最終Entry IDよりも小さなIDを指定した場合は更新されません。
127.0.0.1:6379> XINFO GROUPS test
1) 1) "name"
2) "group"
3) "consumers"
4) (integer) 6
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1745992557786-0"
9) "entries-read"
10) (integer) 8
11) "lag"
12) (integer) 3
127.0.0.1:6379> XCLAIM test group consumer1 100 1745992557786-0 LASTID 0-0
1) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XINFO GROUPS test
1) 1) "name"
2) "group"
3) "consumers"
4) (integer) 6
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1745992557786-0"
9) "entries-read"
10) (integer) 8
11) "lag"
12) (integer) 3
XAUTOCLAIM] PELのEntryを取得し、Consumerの再割り当てする (起点ID指定)
[XAUTOCLAIM ${stream name} ${group name} ${consumer name} ${minimum idle time} ${start} [COUNT ${count}] [JUSTID]
実行例
127.0.0.1:6379> XREADGROUP GROUP group consumer STREAMS test >
1) 1) "test"
2) 1) 1) "1748356796708-0"
2) 1) "field"
2) "value"
Options
${stream name} (required)
該当のStreamの名前を指定します。
${group name} (required)
Consumer Group名を指定します。
${consumer name} (required)
Entryを再割り当てするConsumerの名前を指定します。
${minimum idle time} (required)
対象のEntryのIdle時間条件を指定します。
XCLAIM同様、指定されたIdle時間以上が経過しているEntryを取得します。
${start} (required)
COUNT (optional)
取得するEntryの最大数を指定します。
JUSTID (optional)
フィールドは取得せず、IDのみを取得するオプションです。
Entryの完了通知を行うコマンド
XACK] 完了通知を送る
[XACK ${stream name} ${group name} ${id} [${id}...]
実行例
127.0.0.1:6379> XACK test group 1745992557786-0
(integer) 1
Options
${stream name} (required)
対象のStreamの名前を指定します。
${group name} (required)
Consumer Groupの名前を指定します。
${id} (required)
完了するEntry IDを指定します。複数指定も可能です。
完了したEntryはPELから削除されますが、Streamからは削除されません。
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer3"
2) "1"
127.0.0.1:6379> XACK test group 1745992557786-0
(integer) 1
127.0.0.1:6379> XPENDING test group
1) (integer) 1
2) "1745992556849-0"
3) "1745992556849-0"
4) 1) 1) "consumer3"
2) "1"
127.0.0.1:6379> XRANGE test - +
1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
2) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
3) 1) "1745992558436-0"
2) 1) "field1"
2) "value1"
4) 1) "1745992559064-0"
2) 1) "field1"
2) "value1"
5) 1) "1745992559709-0"
2) 1) "field1"
2) "value1"
指定したIDが存在しない場合もエラーにはなりませんが、何も起きません。
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer3"
2) "1"
127.0.0.1:6379> XACK test group 1745992558436-0
(integer) 0
127.0.0.1:6379> XPENDING test group
1) (integer) 2
2) "1745992556849-0"
3) "1745992557786-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer3"
2) "1"
メタ情報の取得を行うコマンド
XLEN] StreamのEntry数を取得する
[XLEN ${stream name}
実行例
127.0.0.1:6379> XLEN test
(integer) 5
Options
${stream name} (required)
Streamの名前を指定します。
127.0.0.1:6379> XLEN test
(integer) 5
127.0.0.1:6379> XRANGE test - +
1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
2) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
3) 1) "1745992558436-0"
2) 1) "field1"
2) "value1"
4) 1) "1745992559064-0"
2) 1) "field1"
2) "value1"
5) 1) "1745992559709-0"
2) 1) "field1"
2) "value1"
XINFO STREAM] Streamの情報を取得する
[XINFO STREAM ${stream name} [FULL [COUNT ${count}]]
実行例
127.0.0.1:6379> XINFO STREAM test
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1745992559709-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 11
13) "recorded-first-entry-id"
14) "1745992556849-0"
15) "groups"
16) (integer) 1
17) "first-entry"
18) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
19) "last-entry"
20) 1) "1745992559709-0"
2) 1) "field1"
2) "value1"
Options
${stream name} (required)
Streamの名前を指定します。
FULL (optional)
情報を詳細に取得するオプションです。
127.0.0.1:6379> XINFO STREAM test FULL
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1745992559709-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 11
13) "recorded-first-entry-id"
14) "1745992556849-0"
15) "entries"
16) 1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
2) 1) "1745992557786-0"
2) 1) "field1"
2) "value1"
3) 1) "1745992558436-0"
2) 1) "field1"
2) "value1"
4) 1) "1745992559064-0"
2) 1) "field1"
2) "value1"
5) 1) "1745992559709-0"
2) 1) "field1"
2) "value1"
17) "groups"
18) 1) 1) "name"
2) "group"
3) "last-delivered-id"
4) "1745992557786-0"
5) "entries-read"
6) (integer) 8
7) "lag"
8) (integer) 3
9) "pel-count"
10) (integer) 1
11) "pending"
12) 1) 1) "1745992556849-0"
2) "consumer3"
3) (integer) 1745994936298
4) (integer) 2
13) "consumers"
14) 1) 1) "name"
2) "consumer1"
3) "seen-time"
4) (integer) 1745997278941
5) "active-time"
6) (integer) 1745997278941
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
2) 1) "name"
2) "consumer2"
3) "seen-time"
4) (integer) 1745992736573
5) "active-time"
6) (integer) 1745992705234
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
3) 1) "name"
2) "consumer3"
3) "seen-time"
4) (integer) 1745994936298
5) "active-time"
6) (integer) 1745994936298
7) "pel-count"
8) (integer) 1
9) "pending"
10) 1) 1) "1745992556849-0"
2) (integer) 1745994936298
3) (integer) 2
4) 1) "name"
2) "consumer4"
3) "seen-time"
4) (integer) 1745995494272
5) "active-time"
6) (integer) 1745995494272
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
5) 1) "name"
2) "consumer5"
3) "seen-time"
4) (integer) 1745995962945
5) "active-time"
6) (integer) 1745995962945
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
6) 1) "name"
2) "consumer6"
3) "seen-time"
4) (integer) 1745995552524
5) "active-time"
6) (integer) 1745995529967
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
COUNT (optional)
StreamのEntry数とPELのEntry数を制限するオプションです。
127.0.0.1:6379> XINFO STREAM test FULL COUNT 1
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1745992559709-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 11
13) "recorded-first-entry-id"
14) "1745992556849-0"
15) "entries"
16) 1) 1) "1745992556849-0"
2) 1) "field1"
2) "value1"
17) "groups"
18) 1) 1) "name"
2) "group"
3) "last-delivered-id"
4) "1745992557786-0"
5) "entries-read"
6) (integer) 8
7) "lag"
8) (integer) 3
9) "pel-count"
10) (integer) 1
11) "pending"
12) 1) 1) "1745992556849-0"
2) "consumer3"
3) (integer) 1745994936298
4) (integer) 2
13) "consumers"
14) 1) 1) "name"
2) "consumer1"
3) "seen-time"
4) (integer) 1745997278941
5) "active-time"
6) (integer) 1745997278941
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
2) 1) "name"
2) "consumer2"
3) "seen-time"
4) (integer) 1745992736573
5) "active-time"
6) (integer) 1745992705234
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
3) 1) "name"
2) "consumer3"
3) "seen-time"
4) (integer) 1745994936298
5) "active-time"
6) (integer) 1745994936298
7) "pel-count"
8) (integer) 1
9) "pending"
10) 1) 1) "1745992556849-0"
2) (integer) 1745994936298
3) (integer) 2
4) 1) "name"
2) "consumer4"
3) "seen-time"
4) (integer) 1745995494272
5) "active-time"
6) (integer) 1745995494272
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
5) 1) "name"
2) "consumer5"
3) "seen-time"
4) (integer) 1745995962945
5) "active-time"
6) (integer) 1745995962945
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
6) 1) "name"
2) "consumer6"
3) "seen-time"
4) (integer) 1745995552524
5) "active-time"
6) (integer) 1745995529967
7) "pel-count"
8) (integer) 0
9) "pending"
10) (empty array)
XINFO GROUPS] StreamのConsumer Groupの一覧を取得する
[XINFO GROUPS ${stream name}
実行例
127.0.0.1:6379> XINFO GROUPS test
1) 1) "name"
2) "group"
3) "consumers"
4) (integer) 6
5) "pending"
6) (integer) 1
7) "last-delivered-id"
8) "1745992557786-0"
9) "entries-read"
10) (integer) 8
11) "lag"
12) (integer) 3
2) 1) "name"
2) "group2"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1745992559709-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
Options
${stream name} (required)
Streamの名前を指定します。
XINFO CONSUMERS] Consumer Groupに属するConsumerの一覧を取得する
[XINFO CONSUMERS ${stream name} ${group name}
実行例
127.0.0.1:6379> XINFO CONSUMERS test group
1) 1) "name"
2) "consumer1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 2361192
7) "inactive"
8) (integer) 2361192
2) 1) "name"
2) "consumer2"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 6903560
7) "inactive"
8) (integer) 6934899
3) 1) "name"
2) "consumer3"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 4703835
7) "inactive"
8) (integer) 4703835
4) 1) "name"
2) "consumer4"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 4145861
7) "inactive"
8) (integer) 4145861
5) 1) "name"
2) "consumer5"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 3677188
7) "inactive"
8) (integer) 3677188
6) 1) "name"
2) "consumer6"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 4087609
7) "inactive"
8) (integer) 4110166
Options
${stream name} (required)
Streamの名前を指定します。
${group name} (required)
Consumer Groupの名前を指定します。
その他のコマンド
XSETID] Streamの最後の配信Entry IDを設定する
[XSETID ${stream name} ${last id} [ENTRIESADDED ${entries-added}] [MAXDELETEDID ${max-deleted-id}]
Redisが内部的に使用しているもので、Streamの最終配信Entry IDを更新するコマンドです。
まとめ
Redis Streamsの概要とコマンドについてまとめました。
Consumer GroupとPending Entry Listの概念がやや分かりづらい印象を最初は受けましたが、一度理解してしまえば気が利いていて良い機能だと思います。
最後に
AI Shiftではエンジニアの採用に力を入れています!
少しでも興味を持っていただけましたら、カジュアル面談でお話しませんか?
(オンライン・19時以降の面談も可能です!)
【面談フォームはこちら】
-
Cloud Pub/SubなどのマネージドメッセージングサービスではなくRedisを採用した背景等についてはまた別途… ↩︎
Discussion