Redis の公式Goライブラリ go-redis の話
本ブログではRedisの公式GOライブラリに昇格したgo-redisを紹介します。
If you’re new to Go-Redis, please do explore it! Version 9 adds support for the RESP3 protocol, introduces a new hooks API, improves pipeline retries, and allows performance monitoring via OpenTelemetry.
パイプライン、トランザクション、楽観的ロック、排他制御、レートリミット、PubSubや、マルチスレッドで実行した場合の競合状態も見ていきます。ソースコードはこちらからご覧になれます。
ローカル環境にRedisをインストール
Redisをローカルのmacにインストールします。
brew install redis
Redisサーバを起動します。
redis-server
Dockerで起動もできます。
docker run -p 6379:6379 --name some-redis -d redis
CLIクライアントを起動します。
redis-cli
続けて下記のキャプチャのように、対話型でコマンドを実行してみてください。
Hello World
簡単なHello World
プログラムを書いて動作確認しましょう。
go-redis
をインストールします。V8とレポジトリの場所が変わっているのでご注意ください。
go get github.com/redis/go-redis/v9
package main
import (
"context"
"github.com/redis/go-redis/v9"
)
func main() {
var ctx = context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
PoolSize: 1000,
})
rdb.Set(ctx, "mykey1", "hoge", 0) // キー名 mykey1で文字列hogeをセット
ret, err := rdb.Get(ctx, "mykey1").Result() // キー名mykey1を取得
if err != nil {
println("Error: ", err)
return
}
println("Result: ", ret)
}
rdb.Set(キー、値)とrdb.Get(キー)で値の設定と取得をします。また、Set()で有効期限を設定できます。(0は無限)
パイプライン
パイプラインとは複数のRedisコマンドを一つにまとめて投げるものです。コマンドを1回1回投げるよりも、バッチ(バルク)でまとめて投げれば1回のラウンドトリップで済み速くなります。
rdb.Pipelined()
に一連のRedisコマンドをクロージャで渡します。以下のプログラムではカウンターを2回インクリメントしています。
func main() {
// 初期化は同じなので、これ以降省略します。
pipeLine(rdb)
}
func pipeLine(rdb *redis.Client) {
var incr *redis.IntCmd // カウンター
_, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
incr = pipe.Incr(ctx, "pipelined_counter") // コマンド1
incr = pipe.Incr(ctx, "pipelined_counter") // コマンド2
pipe.Expire(ctx, "pipelined_counter", time.Hour)
return nil
})
if err != nil {
panic(err)
}
// The value is available only after the pipeline is executed.
fmt.Println(incr.Val()) // 実行結果を確認する。結果: 2
}
マルチスレッドで同時に実行
さてここでクイズです。マルチスレッドで同時にパイプラインを実行したらどうなるでしょうか?
const THREADS = 2 # スレッド(goroutine)数
const NumOfGet = 1
func raceCondition(rdb *redis.Client) {
var result []string
// お約束のマルチスレッド処理
var wg sync.WaitGroup
for i := 0; i < THREADS; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// パイプライン処理をパラレル実行
ret := pipeLine(rdb)
result = append(result, ret)
}()
}
wg.Wait()
fmt.Printf("done. result counter> %v \n", result) // 結果を確認
}
func pipeLine(rdb *redis.Client) string {
key := "pipelined_counter" // カウンター
// cmdsにはコマンドの戻り値がリストで返ります。
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Incr(ctx, key) // カウンターをインクリメント
for i := 0; i < NumOfGet; i++ {
pipe.Get(ctx, key) // カウンターの値を取得
}
return nil
})
value, err := cmds[len(cmds)-1].(*redis.StringCmd).Result()
return value
}
2つのスレッドで実施します。それぞれのスレッドは、パイプラインでインクリメントを1回、取得(GET)を1回実行します。各スレッドの最後のGETの値をリストで返します。
実行します。
done. result counter> [1 2]
スレッド1が1、スレッド2が2を返しました。特に問題(干渉)はなさそうです。
次に、NumOfGet
にとんでもなく大きな数字をセットして実行します。
const NumOfGet = 1000
なんとどちらのスレッドも同じ値を返しました!
done. result counter> [2 2]
RDB的な表現をすればノンリピータブルリード(ファジーリード)が発生した訳です。トランザクション中に別のトランザクションの更新が見えてしまう現象です。
pipe.Incr(ctx, key)
for i := 0; i < NumOfGet; i++ {
// 1000回Get()を実行している間に別のスレッドがIncr()を実行する
pipe.Get(ctx, key) // 同じ操作で違う結果になる(ファジーリード)
}
干渉を防いでみよう
Redisのトランザクションを使用すると回避できます。以下のように、Pipelined
をTxPipelined
に修正します。
RDBに例えるとトランザクションの分離レベルがシリアライザブルになります。つまり、スレッド1とスレッド2を逐次的に実行したのと同じ結果になります。
- cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
+ cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
楽観的ロック
注意点としてRedisのトランザクションでは前のコマンドの戻り値を、次のコマンドの引数に使用するようなことはできません。例えば、インクリメントする処理をトランザクション内で書けません。
tran
ret := get(key1) // 0
set(key1, ret+1) // 1
end
トランザクションの開始前に値を取得しインクリメントする。
ret := get(key1)
ret++
<- ここで他のトランザクションにkey1を更新される可能性はある
tran
set(key1, ret)
end
ただこの場合、key1の値がtran開始前に他のトランザクションによって更新されている可能性があります。つまり、2つのスレッドが同時に実行すると、結果が1になる(2ではなく)。そこで、他者の更新を検出し、上書きしないようにするために楽観的ロックを使用します。
少しコードが複雑なので、まずはJS風の疑似コードで説明します。いきなり具体的なコードを暴力的に見せるよりも、抽象度を上げた方が理解しやすいと思いました。
RedisではWATCHコマンドを使用して楽観的ロックを実装します。
key = "キー名"
rdb.Watch(ctx, (tx) => {
n = tx.Get(key)
n++
try tx.Set(key) catch print("別のスレッドに先越された!")
}, key)
ReactJSのuseEffect()
のような感じです。useEffectの場合は値が変更したらクロージャを実行しますが、Watchの場合は値が更新されたら失敗させます。
失敗時のリトライ処理を付け加えます。
key = "キー名"
rdb.Watch(ctx, (tx) => {
n = tx.Get(key)
n++
loop {
try
tx.Set(key)
break // 成功したらループを抜ける
catch
print("別のスレッドに先越された!")
continue // リトライ
}
}, key)
成功するまでループでグルグル回すわけです。実際にはバックオフやリトライ回数を有限にします。
さて、もう一息です。tx.Set()
をパイプライン処理にします。
key = "キー名"
rdb.Watch(ctx, (tx) => {
n = tx.Get(key)
n++
loop {
try
tx.TxPipelined(pipe => {
pipe.Set(key) // パイプラインで実行します。
}
break
catch
print("別のスレッドに先越された!")
continue
}
}, key)
お待たせしました。
ちゃんとしたGoのコードをお見せします。
// Increment transactional increments the key using GET and SET commands.
func increment(rdb *redis.Client, key string) error {
// Transactional function.
txf := func(tx *redis.Tx) error {
n, err := tx.Get(ctx, key).Int() // カウンター
if err != nil && err != redis.Nil {
return err
}
n++ // カウンターをインクリメント
// Operation is committed only if the watched keys remain unchanged.
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, n, 0) // カウンターを更新
return nil
})
return err
}
// 成功するまでループ。リトライ回数はmaxRetries
for i := 0; i < maxRetries; i++ {
err := rdb.Watch(ctx, txf, key)
if err == nil {
return nil // 成功したらループを抜ける
}
if err == redis.TxFailedErr {
fmt.Println("Optimistic lock lost. Retry.")
continue // 失敗したらループの先頭へ
}
return err
}
return errors.New("increment reached maximum number of retries")
}
RedisのWATCHの実装にロールバックは不要であることが述べられています。シングルスレッドであるため、トランザクション開始時にWATCH変数が更新さてないことが確認されれば良いからです。
The key point about optimistic locking is that when a WATCHed key is changed, the whole transaction is discarded immediately when the client commits it using EXEC. Redis has a main, single-threaded command execution loop, so when the transaction queue is being executed no other command will run. This means that Redis transactions have a true serializable isolation level, and also means that no rollback mechanism is required to implement WATCH. Redis公式
排他ロック
Redisのキーを排他ロックとして利用する裏技です。Go言語のsync.Mutex
に似ています。
Kubernetesなど、複数インスタンスの実行環境では、sync.Mutexは使えません。Goのランタイム内での排他ロックだからです。別のインスタンスでは別のGoランタイムが動いています。
そこでRedisのキーを複数インスタンス間の排他ロックとして使用できます。
rdb.SetNX(key)
でキーを作成(ロック取得)します。すでにキーが存在する場合はエラーになります。つまり他のスレッドがすでにロックを取得済みということになります。
rdb.Del()
でキーを削除(ロック解放)します。
func ExampleLock(rdb *redis.Client) error {
rLock := "Mutex" // sync.Mutexみたいな
// キーの作成。すでにキーが存在するとエラーになる。
nx, err := rdb.SetNX(ctx, rLock, true, 10*time.Second).Result()
if err != nil {
return err
}
if !nx {
// キーの作成失敗。ロックできず。
return errors.New("cannot to get a lock as someone else has it")
} else {
fmt.Println("get a lock") // ロック成功
}
// deferでロック解放
defer rdb.Del(ctx, rLock)
// CRITICAL SECTION
// ここで共有リソースを操作する
counter += 1
return nil
}
レートリミット
ネットワークのトラフィックに一定の制限を加える手法をレートリミットと言います。サーバがダウンしないように、もしくはオートスケールなクラウドの料金が高額にならないようにリミッターを設置しておくわけです。
redis_rateライブラリを使うと簡単にできます。
go get github.com/go-redis/redis_rate/v10
プログラムを書きます。
func main() {
rdb := redis.NewClient(&redis.Options{...})
limiter := redis_rate.NewLimiter(rdb)
// 100リクエスト送信
for i := 0; i < 100; i++ {
res, err := limiter.Allow(ctx, "user-ip:192.168.5.789", redis_rate.PerSecond(10))
if err != nil {
panic(err)
}
if res.Allowed == 1 {
println("return 200 ok")
} else {
println("return 429 too many requests")
}
time.Sleep(50 * time.Millisecond)
}
}
実行します。
redis_rate.PerSecond()
で単位時間あたりのリクエスト数を設定します。キー名はなんでも良いのですが、DoS攻撃対策であればuser-ip:(クライアントのIPアドレス)
にしたり、特定のエンドポイントを制限するならurl:/openai/chat-complete
などとします。
内部的にはuser-ip:192.168.5.789
というキーをRedisに作成してリクエスト数をカウントしているのかと思います。
レートリミットのアルゴリズムにはleaky bucketとsliding windowがあります。redis_rate
は前者になります。
このサンプルプログラムでは、バケツの底から10個/秒の水玉
が漏れる(バケツの水が減る)ことになります。
PubSub
Goチャネルのようにメッセージキューを使用して非同期にメッセージを送受信できます。注意点としては、メッセージは全てのサブスクライバーにブロードキャストされます。最初にメッセージを取得した一人だけには限定したい場合は、一工夫必要です。後でご紹介します。
また、メッセージ送信時に誰もサブスクライブしていなかった場合は、そのメッセージは揮発します。その文脈では同期的になります。
パブリッシャーとサブスクライバーの2つのプログラムを作成します。
func main() {
rdb := redis.NewClient(&redis.Options{...})
pubsub := rdb.Subscribe(ctx, "mychannel1") // サブスク開始
defer pubsub.Close()
ch := pubsub.Channel() // Goチャネル型(chan)を返す
for msg := range ch { // メッセージを受け取る
fmt.Println(msg.Channel, msg.Payload)
}
// ここに到達することはない
}
Redisのmychannel1
チャネルをサブスクします。Goチャネルを取得しイテレータを使用してメッセージを受信します。GoチャネルでRedisチャネルを抽象化するとこはエレガントですね。
次にパブリッシャー側です。
func main() {
rdb := redis.NewClient(&redis.Options{...})
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("msg:%d", i) // メッセージのペイロード
err := rdb.Publish(ctx, "mychannel1", msg).Err() // 送信
if err != nil {
panic(err)
}
fmt.Println("publish:", msg)
time.Sleep(1 * time.Second) // スリープ
}
}
スリープを入れながらメッセージを10個送信します。
サブスクライバー、パブリッシャーの順番で起動します。逆だとメッセージが揮発するため受信できません。
マルチスレッドにしてみる
さて、サブスクライバー側をマルチスレッドにしてみましょう。複数のKubernetesインスタンスで同じチャネルをサブスクしていると思ってください。
func raceCondition(ctx context.Context, rdb *redis.Client) {
var wg sync.WaitGroup
for i := 0; i < THREADS; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 複数のスレッドでサブスク
subscriber(ctx, rdb)
}()
}
wg.Wait()
}
スレッド数を2で実行した結果です。ご覧のように両方のスレッドが同じメッセージを受信しています。
一つのサブスクライバーだけ受信させる
ユースケースによってはこの仕様は困るかと思います。例えば、バックエンドでメールを送信する場合や送金処理などです。Kubernetesのマイクロサービスで複数インタンスを起動している場合、メールが何度も飛んでしまいます。
解決方法として、以前説明した排他ロックrdb.SetNX
を利用します。
排他ロックを取得できたスレッドだけがメッセージを処理すれば良いです。ロックの取得に失敗したスレッドは何もしません。それぞれのメッセージにIDを持たせます。1メッセージ=1ロックになります。
nx, err := rdb.SetNX(ctx, "メッセージID", true, 1*time.Hour).Result()
何か共通の処理を実装する場合は、ラッパー関数を作成するのが便利です。そこで、LockMessage
関数を作成し、その中で排他ロック処理を入れます。
func LockMessage(ctx context.Context, rdb *redis.Client, msg *redis.Message, action Action) error {
// 手抜きでペイロード自体をキーにします。実際にはユニークなメッセージIDをペイロードに保持します。
rLock := msg.Payload
// 排他ロックの取得
nx, err := rdb.SetNX(ctx, rLock, true, 1*time.Hour).Result()
if err != nil {
return err // something wrong with Redis or network
}
if !nx {
return nil // ロック取得失敗。何もしない。
} else {
fmt.Printf("lock on %s \n", rLock) // ロックをゲット!
}
// メッセージの処理
err = action(msg)
if err != nil {
return err // 厳密にやるなら、リカバリーバッチなどで処理できなかったメッセージをリトライします。
}
return nil
}
呼び出し部分です。
for msg := range ch {
// ラッパー関数のクロージャにやりたいことを渡します。
err := LockMessage(ctx, rdb, msg, func(msg *redis.Message) error {
// メッセージを処理します
fmt.Printf("process msg: thread #%d, %v \n", i, msg.Payload)
time.Sleep(10 * time.Second) // なんかやってるフリ
return nil
})
if err != nil {
fmt.Println(err)
}
}
スレッド数3で実行した結果です。上手くいきましたね。1つのメッセージを1つのスレッドのみが処理しています。Kubernetesのマイクロサービスとしてサブスクライバーを実装すればスケーラブルなシステムが実現できます。マイクロサービスなので、サブスク側だけRustで実装する技も使えます。クラウドの使用量も抑えることができます。
参考
Discussion