go-redisのClusterClientとClusterOptionsの挙動についてソースコードから理解する
はじめに
初めまして。REALITY株式会社でサーバーエンジニアをしている松田と申します。最近は「ドンキーコング バナンザ」や「黒神話:悟空」など、霊長類が主人公のゲームをプレイしています。
今回はredis/go-redis/v9のClusterClient
とClusterOptions
の挙動について、ソースコードを参照しながら解説していきます。
背景
弊社ではMemorystore for Redisで独立したインスタンスを複数台立てて自前でクラスター運用しており、主にキャッシュ処理のために利用しています。
自前で、というのをもう少し具体的に説明すると、Redisクラスターモードを使用せず、アプリケーション側でコマンドの引数のキーを元に独自にノードを選択しコマンドを発行する運用です。
しかし最近この自前クラスター運用が辛くなってきました。
そこで、自前Redisクラスターに向いているキャッシュの処理をクラスターモードを利用したValkeyクラスター(Memorystore for Valkey)に移行していくプロジェクトを進めています。
ところで弊社ではサーバサイドの開発に主にGolangを使用しており、Redisクライアントとしてgo-redisを使用しています。ValkeyとRedisには基本的に互換性があるため、Valkeyクラスターのクライアントとしてもgo-redisを使うことができます。
これまではスタンドアロン構成へのアクセスしかなかったため、Client
とOptions
の理解だけで足りていました。しかしクラスターモードではClusterClient
とClusterOptions
の挙動理解が不可欠です。加えて関心もあり、実際にソースコードを読んでClusterClient
が内部的にどういう処理をしているかを把握することにしました。
本記事ではソースコードを読み解き、その動作と内部構造を解説します(※公式仕様として保証されるものではありません)。
ClusterClient
やClusterOptions
については公式ドキュメントや外部資料が現状充実しているとは言えず、実際に実験をしていても理由が分からない挙動に遭遇することが何度かありました。そういった方々への一助となれば幸いです。
想定読者
Valkey(Redis)クラスターのスロットなどの基本的な概念を把握しており、redis/go-redis/v9のClusterClient
とClusterOptions
の挙動をソースコードレベルで理解したい方
この記事を読んで得られるもの
-
ClusterClient
がクラスター構成をどのように管理しているか -
ClusterClient
がコマンドを発行する際の基本的な内部フロー -
ClusterOptions
の各種パラメータが内部でどのように使われているか
前提
今回コードを読むのに使ったのはredis/go-redis/v9のmasterでコミットバージョンは7b4a537aef9e2670fff6990e81f278021e7c1499 です。
ClusterClient
にはPub/Subやトランザクション、パイプラインなどたくさんの重要な機能がサポートされていますが、今回はこれらについては対象外とし、ClusterOptions
と関係するClusterClient
の骨組みに絞って解説します。
ClusterClient
のコンポーネント
ClusterClient
の構造体のコードを覗いてみると、次のような定義となっています(ソースコード)。
type ClusterClient struct {
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder
cmdsInfoCache *cmdsInfoCache
cmdable
hooksMixin
}
このうち、ClusterClient
を理解するにあたって重要なコンポーネントはclusterNodes
、clusterStateHolder
、cmdsInfoCache
の3つです。さらにこれらを構成する部品として、clusterNode
とclusterState
があります。
この節では、まずクラスター構成に関する4つのコンポーネント(clusterNode
、clusterNodes
、clusterState
、clusterStateHolder
)について解説し、最後にコマンド情報を管理するcmdsInfoCache
について説明します。
あらかじめClusterClient
内の主要コンポーネントの関係性を簡略化した図を置いておきます。内部処理を追う前に全体像を把握しておくと理解しやすくなるかもしれません。
ClusterClient
├── ClusterOptions (ClusterClient初期化時のオプション)
├── clusterNodes (ClusterClientが管理しているノード情報の集合)
│ └── map[addr]*clusterNode
│ └── Client(各ノードにコマンドを発行するためのクライアント)
├── clusterStateHolder (最新のクラスター構成を管理するコンポーネント)
│ └── clusterState (クラスター構成のスナップショット)
└── cmdsInfoCache (各コマンドがリードオンリーかどうかを保持)
clusterNode
clusterNode
はクラスター中の1ノードとの接続を管理する構造体であり、定義は次のようになっています(ソースコード)。
type clusterNode struct {
Client *Client
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
loaded uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds from epoch
lastLatencyMeasurement int64 // atomic
}
Client
フィールドを持っていることから想像ができるように、ClusterClient
が各ノードに向けてコマンドを発行する際はこのclusterNode
が所有しているClient
が使われます。
このClient
フィールドは次のnewClusterNode
関数内で初期化されます。
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
opt := clOpt.clientOptions()
opt.Addr = addr
node := clusterNode{
Client: clOpt.NewClient(opt),
}
node.latency = math.MaxUint32
if clOpt.RouteByLatency {
go node.updateLatency()
}
return &node
}
Client: clOpt.NewClient(opt)
がノードに接続するクライアントを初期化している箇所です。ClusterOptions
のNewClient
フィールドが設定されていなかった場合は、デフォルトで通常のNewClient
関数(ソースコード)が代わりに利用されます。
また、この構造体ではノードに関する健全性やPing値も管理しています。健全性の処理にはfailing
とloaded
、Ping値の処理にはlatency
とlastLatencyMeasurement
が使われます。ただしPing値の処理は、ClusterOptions
でRouteByLatency
をtrueにした場合のみ行われます。
generation
については少し補足します。go-redisが管理しているクラスター構成やノード情報は、スケールインやスケールアウト、ノードのフェイルオーバーなどのイベントに追従して更新する必要があります。古いクラスター情報は保持していても意味がないため破棄しますが、その判定のために各クラスター構成やノード情報にバージョン番号のようなものを付与しています。この役割を担っているのがgeneration
です。
clusterNodes
clusterNodes
は先ほど説明したclusterNode
を束ねる構造体です。定義は次のようになります(ソースコード)。
type clusterNodes struct {
opt *ClusterOptions
mu sync.RWMutex
addrs []string
nodes map[string]*clusterNode
activeAddrs []string
closed bool
onNewNode []func(rdb *Client)
generation uint32 // atomic
}
nodes
は、アドレス値をキーとして対応するclusterNode
を取得するmapです。
generation
は、グローバルに持っている最新のクラスター構成の世代番号を表します。
clusterNodes
はNewClusterClient
内で1度だけ生成され、以降そのClusterClient
内で使い回されます。
そのため、このclusterNodes
はClusterClient
における全世代のclusterNode
を集約する構造体となります。
したがって、generation
が10のclusterNodes
のnodes
の中に、generation
が1のclusterNode
が格納されている瞬間が存在し得ます(これは後述するclusterStateHolder
が呼び出すclusterNodes.GC
によって時間が経てば解消されます)。
closed
は、ClusterClient.Close()
が呼び出されたかどうかのフラグです。trueの場合、それ以降の接続を伴う処理はエラーとなります。
addrs
には、ClusterOptions.Addrs
も含め、これまでにclusterNode
で管理されたノードのアドレスが格納されます。
一方activeAddrs
には古い世代のノードアドレスは含まれず、現在利用中のノードアドレスのみが格納されると思っていただければ大丈夫です。
clusterState
clusterStateはクラスター構成のスナップショット情報を管理する構造体です(ソースコード)。
type clusterState struct {
nodes *clusterNodes
Masters []*clusterNode
Slaves []*clusterNode
slots []*clusterSlot
generation uint32
createdAt time.Time
}
Mastersはクラスターのマスターノード、Slavesはレプリカノードを表します。generation
はこのクラスター構成の世代番号です。clusterNodes
で出てきたやつですね。
clusterSlot
は次のような定義の構造体であり、スロット値の区間とそれらを担当しているノード群がどれであるかの情報を保持しています(ソースコード)。
type clusterSlot struct {
start int
end int
nodes []*clusterNode
}
clusterState
は次のようなnewClusterState
という関数で作成されます(ソースコード)。
func newClusterState(
nodes *clusterNodes, slots []ClusterSlot, origin string,
) (*clusterState, error) {
c := clusterState{
nodes: nodes,
slots: make([]*clusterSlot, 0, len(slots)),
generation: nodes.NextGeneration(), // 最新の世代番号+1が入る
createdAt: time.Now(), // 作成時刻を保持しておく
}
...(省略)
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
...(省略)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return nil, err
}
node.SetGeneration(c.generation)
nodes = append(nodes, node)
if i == 0 { // CLUSTER SLOTSコマンドの仕様により、0番目にマスターノードの情報が入る
c.Masters = appendIfNotExist(c.Masters, node)
} else {
c.Slaves = appendIfNotExist(c.Slaves, node)
}
}
c.slots = append(c.slots, &clusterSlot{
start: slot.Start,
end: slot.End,
nodes: nodes,
})
}
sort.Sort(clusterSlotSlice(c.slots))
time.AfterFunc(time.Minute, func() {
nodes.GC(c.generation)
})
return &c, nil
}
長いので上から順に紐解いていきます。
まず引数の定義から見ていきましょう。
nodes
はこれまでも説明してきたclusterNode
を集約する構造体ですね。
slots
はCLUSTER SLOTS
コマンドの実行結果から生成されたデータ、あるいはClusterOptions
のClusterSlots
が渡されている場合はそれの実行結果が直接渡されます。
origin
はCLUSTER SLOTS
コマンドを発行したノードのアドレスが入ります。
次にgeneration: nodes.NextGeneration(),
についてです。
nodes
が最新の世代番号を持っているので、新しいclusterState
(クラスター構成のスナップショット)はその世代番号の次の番号をフィールドとして保持することになります。
次にループ処理の中身を見ていきます。
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
...(省略)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return nil, err
}
node.SetGeneration(c.generation)
nodes = append(nodes, node)
if i == 0 { // CLUSTER SLOTSコマンドの仕様により、0番目にマスターノードの情報が入る
c.Masters = appendIfNotExist(c.Masters, node)
} else {
c.Slaves = appendIfNotExist(c.Slaves, node)
}
}
c.slots = append(c.slots, &clusterSlot{
start: slot.Start,
end: slot.End,
nodes: nodes,
})
}
この部分では各スロット情報をもとにclusterNode
を作成しています。
スロット情報に対応する各ノードのアドレスをもとにclusterNode
を取得あるいはnewClusterNode
により生成したのち(c.nodes.GetOrCreate
)、世代番号を設定し、マスターノードあるいはレプリカノードとして管理対象に追加しています。
CLUSTER SLOTS
コマンドは0番目にマスターノードの情報を返し、それ以降はレプリカノードの情報を返すので(参考)、i == 0
かどうかでマスターノードかどうかの判定ができます。
そして各スロットの区間に対応するノード群をc.slots
として保持するようにしています。
コード中で「…(省略)」と書いてある箇所について
上記のコード中で「…(省略)」と書いてある箇所についてはループバックアドレスに関する処理を行っています。今回は本質ではないので省略していますがなぜそのような処理が必要なのかなど興味がある方はこちらのPRが参考になるかもしれません。
では最後のnodes.GC
を呼び出している部分について見ていきましょう。
time.AfterFunc(time.Minute, func() {
nodes.GC(c.generation)
})
newClusterState
を呼び出してから、つまり新しいクラスター構成のスナップショットを作成してから1分後にnodes.GC
を呼び出すような処理です。nodes.GC
は以下のようなコードとなっています(ソースコード)。
// GC removes unused nodes.
func (c *clusterNodes) GC(generation uint32) {
var collected []*clusterNode
c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue
}
delete(c.nodes, addr)
collected = append(collected, node)
}
c.mu.Unlock()
for _, node := range collected {
_ = node.Client.Close()
}
}
この処理の中で、古い世代のclusterNode
をclusterNodes
から削除したり、各ノードのPing値を計算していたりします。
また、activeAddrs
の更新もこのタイミングで行われており、古いclusterNode
が保持するClient
についてのClose
処理も行われています。
次にいったんコードについての説明を止めてclusterState
がスロットに関する情報を持っている理由について話します。
Valkeyクラスターの各ノードではそれぞれ特定の範囲のスロットを管轄しており、それ以外のスロットに属するキーを含むコマンドを実行しようとした際にはMOVED
やASK
を応答で返すようになっています。
MOVED
やASK
にはそのキーのスロットを管轄しているノード、すなわち正しい向き先のノードに関するアドレス情報が入っているので、エラーからその情報を読み取ってリクエストを投げれば正しいノードにコマンドを発行することはできますが、毎回それをやるとコマンドのたびに無駄に1リクエスト走らせることになってしまい処理効率の低下につながります。
しかしキーのスロットの計算式は仕様で決まっていて公開されており(Valkeyのページ)、実際go-redisでもinternal/hashtag/hashtag.go
のSlot
という関数でスロットの計算を行っています(ソースコード)。
よってどのノードがどのスロットを担当しているかを把握しておけば無駄にMOVED
やASK
に遭遇することを抑えられるのです。
したがって、スロット情報を保持するclusterState
ではコマンドを実行するノードの選択ロジックも実装されています。
例えばSET
などのデータを破壊的に変更するコマンドについて、処理するノードを決めるメソッドのslotMasterNode
があります(ソースコード)。
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
}
slotNodes
がやっている処理は基本的に引数のslot
を含むようなスロットの範囲を持つclusterNode
のスライスを返しているだけです。
戻り値にはマスターノードだけでなくレプリカノードが含まれている可能性がありますが、0番目にマスターノードの情報が入っているという前提で実装されているので、nodes[0]
を返せばマスターノードを選択できるのです。
それとは別にslotSlaveNode
というメソッドもあります(ソースコード)。
func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
switch len(nodes) {
case 0:
return c.nodes.Random()
case 1:
return nodes[0], nil
case 2:
slave := nodes[1]
if !slave.Failing() && !slave.Loading() {
return slave, nil
}
return nodes[0], nil
default:
var slave *clusterNode
for i := 0; i < 10; i++ {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Failing() && !slave.Loading() {
return slave, nil
}
}
// All slaves are loading - use master.
return nodes[0], nil
}
}
詳細な説明は省略しますが、c.slotNodes
の戻り値の0番目に入るのはマスターノードであることを考えると、レプリカノードがある場合はそれをランダムに選択しようとしている処理になっているということがわかります。
Failing
やLoading
というのはネットワーク疎通の問題や起動直後かどうかを表していると考えてください。つまり健全なレプリカノードを極力選択しようとしているメソッドだということです。
さて、ここで難しいのはクラスター構成は常に固定のものではなく、スケールインやスケールアウトなどによって変動したり、あるいはノードのフェイルオーバーなどにより容易に変更されてしまうことです。
したがってその辺りのクラスター構成の変更をうまいこと実行中のコマンドに影響を出さずにclusterNodes
に反映させるようにする処理の一つがGC
であり、clusterState
を追従させるコンポーネントが次のclusterStateHolder
となるのです。
clusterStateHolder
clusterStateHolder
は主にclusterState
の更新に関して責務を持つ構造体です(ソースコード)。
type clusterStateHolder struct {
load func(ctx context.Context) (*clusterState, error)
state atomic.Value
reloading uint32 // atomic
}
この構造体の主な役割は、現在のクラスター構成のスナップショット(clusterState
)を保持し、必要に応じてそれを更新することです。state
に入るのは現在のclusterState
です。並行にアクセスされうるため、atomic.Value
として保持しています。load
はクラスター状態を更新する処理であり、通常は以下のClusterClient.loadState
が設定されます(ソースコード)。
func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
if c.opt.ClusterSlots != nil {
slots, err := c.opt.ClusterSlots(ctx)
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots, "")
}
addrs, err := c.nodes.Addrs()
if err != nil {
return nil, err
}
var firstErr error
for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx]
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
slots, err := node.Client.ClusterSlots(ctx).Result()
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
return newClusterState(c.nodes, slots, addr)
}
/*
* No node is connectable. It's possible that all nodes' IP has changed.
* Clear activeAddrs to let client be able to re-connect using the initial
* setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
* which might have chance to resolve domain name and get updated IP address.
*/
c.nodes.mu.Lock()
c.nodes.activeAddrs = nil
c.nodes.mu.Unlock()
return nil, firstErr
}
loadState
のやっていることを簡略化して説明すると、接続できる可能性があるアドレスについてランダムにピックしていき、CLUSTER SLOTS
コマンドを発行してスロットと対応するノードのマッピング情報を取得します。
そしてそれを使ってnewClusterState
を呼び出し、新しいclusterState
を作成している、という感じです。
なおClusterOptions.ClusterSlots
が設定されている場合についてはCLUSTER SLOTS
を呼び出さずにすませることができます。
具体的には後述の「ClusterOptionsをちょっと詳しく解説」についてをみていただければと思います。
続きです、まだreloading
というフィールドが残っていますが一旦後回しにします。clusterStateHolder
の役目は極力、本当のクラスター構成と齟齬がないclusterState
を提供することです。例えばclusterState.Get
メソッドを見てみましょう(ソースコード)。
func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
v := c.state.Load()
if v == nil {
return c.Reload(ctx)
}
state := v.(*clusterState)
if time.Since(state.createdAt) > 10*time.Second {
c.LazyReload()
}
return state, nil
}
Get
がやっていることは単にc.state
、つまりフィールドとして保持しているclusterState
を返すだけではありません。
state
の作成時刻(=createdAt
)から10秒経過していたのならばc.LazyReload()
という現在のc.state
を新しいclusterState
に置き換えるメソッドを呼び出します。
これにより、clusterStateHolder
経由でclusterState
を取得すれば本物のクラスター構成と近いスナップショットが手に入るというわけです。
ただ後々出てきますが、実際にはclusterState
の更新処理はGet
の中での時間経過判定以外でも発火します。
つまりどういうことが起きるかというとクラスター情報の更新処理が非常に頻繁に呼び出されてしまうケースがあるのです。
しかし、例えば1秒以内にクラスターの構成が複数回変わるなどということは想定しづらいです。
したがって、更新処理自体は必要なタイミングで都度行いたいけれど、かといって無駄にクラスター情報を取得しまくってもパフォーマンス低下につながったり処理のブロックに繋がるだけだからいい感じにしたいという需要が発生します。
それを実現するのに一役買っているのがLazyReload
、そしてそれが扱うclusterStateHolder
のフィールドreloading
です(LazyReload
という名前を見てすでにピンときていた方もいらっしゃると思います)。
LazyReload
のコードは次のようになります(ソースコード)。
func (c *clusterStateHolder) LazyReload() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
_, err := c.Reload(context.Background())
if err != nil {
return
}
time.Sleep(200 * time.Millisecond)
}()
}
やっていること自体は至極シンプルです。
reloading
フィールドが0でなければ(つまりreload中の場合は)即座に処理を終了します。
そうでなかった場合はreloading
フィールドを1にし、そしてc.Reload
、具体的にはClusterClient.loadState
の処理を呼び出し、clusterStateHolder.state
に新しく格納する処理をゴルーチンで動かします。
このままだと単なる更新処理ですが、defer
でreloading
フィールドを0に戻すのに200ミリ秒待っているところがポイントです。
これによりこの200ミリ秒の間はLazyReload
を呼ばれても多重実行になることはありません。
以上がclusterState
をいい感じに最新に保つ努力をしているコンポーネント、clusterStateHolder
の説明でした。
cmdsInfoCache
最後のコンポーネントcmdsInfoCache
は、COMMAND
コマンドの実行結果を保存するキャッシュであり、実態としては次のようなmapです。
{
"get": &CommandInfo{
Name: "get",
Arity: 2,
Flags: []string{"readonly", "fast"},
ACLFlags: []string{"@read", "@string", "@fast"},
FirstKeyPos: 1,
LastKeyPos: 1,
StepCount: 1,
ReadOnly: true,
},
"set": &CommandInfo{
Name: "set",
Arity: -3,
Flags: []string{"write", "denyoom"},
ACLFlags: []string{"@write", "@string", "@slow"},
FirstKeyPos: 1,
LastKeyPos: 1,
StepCount: 1,
ReadOnly: false,
},
....
}
COMMAND
の実行は初期化時に渡されるClusterClient.cmdInfo
を通して行なっており、sync.Once
のようなもの(ソースコード)を使って一つのClusterClient
につきただ一度だけCOMMAND
を発行するように制御されています(ソースコード)。
sync.Onceのようなものとは?
sync.Once
との違いはerror
を戻り値として返すこと、およびエラー時は再実行できるようにしていることです。
func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error) {
err := c.once.Do(func() error {
cmds, err := c.fn(ctx)
if err != nil {
return err
}
lowerCmds := make(map[string]*CommandInfo, len(cmds))
// Extensions have cmd names in upper case. Convert them to lower case.
for k, v := range cmds {
lowerCmds[internal.ToLower(k)] = v
}
c.cmds = lowerCmds
return nil
})
return c.cmds, err
}
cmdsInfoCache
は、あるコマンドがデータに破壊的な変更を及ぼさない、リードオンリーなものかどうかを判定するのに使っています。
例えばGET
はリードオンリーなコマンドですが、SET
はリードオンリーではありません。
そして基本的にはリードオンリーでないコマンドはクラスターのレプリカノードでは実行することができません。
ClusterOptions
にはGET
やMGET
などのリードオンリーなコマンドをレプリカノードに向けて発行するためのオプションReadOnly
がありますが、その際このcmdsInfoCache
が保持しているデータを用いてレプリカノードに向けてもいいコマンドかどうかを判断しているというわけです。
なお、COMMAND
コマンドが返すデータの中には、キーがコマンドの引数のどの位置に存在しているかを表すプロパティが存在しています。
なのでこれを使ってスロット値の計算に使うキーを抽出することも可能だと思われるのですが、現在のgo-redisでは TODO: Use the data in CommandInfo to determine the first key position.
というコメントがついており記事を執筆した2025年8月25日時点では対応されていないようです(参考)。
これで主要コンポーネントに関する説明は終わりです。今度は処理フローの解説に入っていきます。
ClusterClientの処理フロー
次にClusterClient
が実際にどのようにコマンドを実行しているのかを追っていきましょう。
初期化
まずNewClusterClient
のコードについて見ていきます。
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
if opt == nil {
panic("redis: NewClusterClient nil options")
}
opt.init()
c := &ClusterClient{
opt: opt,
nodes: newClusterNodes(opt),
}
c.state = newClusterStateHolder(c.loadState)
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
c.cmdable = c.Process
c.initHooks(hooks{
dial: nil,
process: c.process,
pipeline: c.processPipeline,
txPipeline: c.processTxPipeline,
})
return c
}
initHooks
やcmdable
についてはいったん気にしなくて良いです。今回注目すべきは opt.init()
です。これにより、ClusterOptions
内でゼロ値になっていた値がClusterOptions
が設定するデフォルトの値に変更されます。具体的なコードは以下の通りです。
func (opt *ClusterOptions) init() {
switch opt.MaxRedirects {
case -1:
opt.MaxRedirects = 0
case 0:
opt.MaxRedirects = 3
}
if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true
}
if opt.PoolSize == 0 {
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
}
if opt.ReadBufferSize == 0 {
opt.ReadBufferSize = proto.DefaultBufferSize
}
if opt.WriteBufferSize == 0 {
opt.WriteBufferSize = proto.DefaultBufferSize
}
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
case 0:
opt.ReadTimeout = 3 * time.Second
}
switch opt.WriteTimeout {
case -1:
opt.WriteTimeout = 0
case 0:
opt.WriteTimeout = opt.ReadTimeout
}
if opt.MaxRetries == 0 {
opt.MaxRetries = -1
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
case 0:
opt.MinRetryBackoff = 8 * time.Millisecond
}
switch opt.MaxRetryBackoff {
case -1:
opt.MaxRetryBackoff = 0
case 0:
opt.MaxRetryBackoff = 512 * time.Millisecond
}
if opt.NewClient == nil {
opt.NewClient = NewClient
}
}
基本的には妥当な値設定に見えるのですが、MaxRetries
に何も指定しなかった場合は-1、すなわちリトライがされない設定になっているところが気になるかもしれません。ネットワーク疎通の問題によりコマンドの実行が一時的に失敗するなどは十分考えられるケースであり、そのようなケースに対する対応としてリトライ設定を入れるというのは一般的なものです。にもかかわらずMaxRetries
を-1にするような設定がデフォルトで大丈夫なのでしょうか?結論から言うとClusterClient
においては問題ないような挙動になっています。それではClusterClient
がどのようなフローでコマンドを実行しているかについて見ていきましょう。
コマンド実行
ClusterClient.Get
を例に出して説明します。まず、ClusterClient
の定義を再掲します。
type ClusterClient struct {
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder
cmdsInfoCache *cmdsInfoCache
cmdable
hooksMixin
}
Get
の処理が定義されているのは、このうちのcmdable
です。cmdable
の定義は
type cmdable func(ctx context.Context, cmd Cmder) error
となっており、cmdable
自体に以下のようなGet
メソッドが定義されています(ソースコード)。
func (c cmdable) Get(ctx context.Context, key string) *StringCmd {
cmd := NewStringCmd(ctx, "get", key)
_ = c(ctx, cmd)
return cmd
}
ClusterClient.Get
を呼び出す時に実行されているのはこのcmdable
に定義されているメソッドなわけですね。さて、NewClusterClient
のコードを読むと以下のような箇所がありました。
c.cmdable = c.Process
つまり、ClusterClient.Get(ctx, key)
が呼び出された時に何が起こるかと言うと、ClusterClient.Process(ctx, NewStringCmd(ctx, "get", key))
が実行されていたわけです。
そしてClusterClient.Process
の定義を読み進めていくと、実態がClusterClient.process
という以下のメソッドであることがわかります(フックについては説明の簡略化のために今回は省略します)。
このprocess
メソッドがClusterClient
がコマンドを実行する処理フローの中心部ということになります(ソースコード)。
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(cmd, -1)
var node *clusterNode
var moved bool
var ask bool
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
// MOVED and ASK responses are not transient errors that require retry delay; they
// should be attempted immediately.
if attempt > 0 && !moved && !ask {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return err
}
}
if node == nil {
var err error
node, err = c.cmdNode(ctx, cmd.Name(), slot)
if err != nil {
return err
}
}
if ask {
ask = false
pipe := node.Client.Pipeline()
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
} else {
lastErr = node.Client.Process(ctx, cmd)
}
// If there is no error - we are done.
if lastErr == nil {
return nil
}
if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
if isReadOnly {
c.state.LazyReload()
}
node = nil
continue
}
// If slave is loading - pick another node.
if c.opt.ReadOnly && isLoadingError(lastErr) {
node.MarkAsFailing()
node = nil
continue
}
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
c.state.LazyReload()
var err error
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
continue
}
if shouldRetry(lastErr, cmd.readTimeout() == nil) {
// First retry the same node.
if attempt == 0 {
continue
}
// Second try another node.
node.MarkAsFailing()
node = nil
continue
}
return lastErr
}
return lastErr
}
ちょっと長いので上から順番に読み進めていきましょう。
slot := c.cmdSlot(cmd, -1)
はコマンドに含まれるキーからスロット値を計算する処理です。
NewStringCmd(ctx, "get", key)
のようにcmd
には引数となるキーが指定されているのでそのキーに対して CRC16(key) mod 16384
を計算することにより、スロット値を計算できます。
なおキーの値を計算するにあたって、コマンド中に出現するキーの位置を取得する必要があるのですが、現在はアドホックな処理によって実現されています(参考)。
そして次にループの処理です。maxRedirects+1
回分だけ処理が実行されます。
まず指数バックオフの部分について見ていきます。
// MOVED and ASK responses are not transient errors that require retry delay; they
// should be attempted immediately.
if attempt > 0 && !moved && !ask {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return err
}
}
これは、前回処理に失敗した理由が、応答にMOVED
でもASK
でもないエラーが返ってきた場合に行われる処理です。
MOVED
、あるいはASK
はコマンドを実行するノードの指定を間違ったことを表すので即座にリトライが可能です。
なのでそれ以外の原因、例えばネットワーク疎通に問題が起きているときやクラスターがダウンしている時などはjitterありの指数バックオフで一定以上の時間をあけてからリトライしています。
なおこの際の指数バックオフの最小待ち時間と最大待ち時間の設定として、ClusterOptions
のMinRetryBackoff
とMaxRetryBackoff
が使用されます。
次はコマンドを発行するノードを選択するロジックのコードです。
if node == nil {
var err error
node, err = c.cmdNode(ctx, cmd.Name(), slot)
if err != nil {
return err
}
}
cmdNode
によってコマンドを発行するべきclusterNode
が返されます。cmdNode
については重要な処理なので実際にコードを見て見ましょう(ソースコード)。
func (c *ClusterClient) cmdNode(
ctx context.Context,
cmdName string,
slot int,
) (*clusterNode, error) {
state, err := c.state.Get(ctx)
if err != nil {
return nil, err
}
if c.opt.ReadOnly {
cmdInfo := c.cmdInfo(ctx, cmdName)
if cmdInfo != nil && cmdInfo.ReadOnly {
return c.slotReadOnlyNode(state, slot)
}
}
return state.slotMasterNode(slot)
}
c.state.Get(ctx)
の戻り値であるstate
はclusterState
型の値です。clusterState
はクラスター構成のスナップショットを表現する構造体でした。
c.state
はclusterStateHolder
なので、Get
の戻り値は極力最新のクラスター構成を反映しようという処理が通された結果得られたclusterState
になります。
そしてClusterOptions
にてReadOnly
をtrueに設定していたかいないかでその後のノード選択ロジックが変わります。
trueにしていた場合、コマンドがリードオンリーなものかどうかの情報をcmdsInfoCache
経由で取得します。
そしてリードオンリーだった場合については専用のノード選択ロジック処理を実行します。
ここで注意すべきなのはコマンドがリードオンリーじゃなかった場合についてはエラーになるのではなく、単にstate.slotMasterNode
、すなわちマスターノードを選択するロジックにフォールバックされるということですね。
次です。ここまでの処理でコマンドを発行するべきノードを決定できたので、実際にコマンドを発行する処理に入ります。その部分は以下のコードです。
if ask {
ask = false
pipe := node.Client.Pipeline()
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
} else {
lastErr = node.Client.Process(ctx, cmd)
}
// If there is no error - we are done.
if lastErr == nil {
return nil
}
ask
は前回の処理の応答としてASK
が返ってきたかどうかを表すフラグです。
ASK
が返ってきたということは、コマンドの引数となるキーのスロット値を担当している担当のノードが変更中であるということです。
ASK
が返されたキーについて処理をする場合は、ASKING
コマンドを発行したのち、リダイレクト先のノードに対してコマンドを発行する、というフローに従って処理を行う必要があります。
ASKING
の後に発行されるコマンドは同一コネクション上で、順序通りに行われることを保証しなければならないのでパイプラインを使っています。詳しくは https://valkey.io/topics/cluster-spec/#ask-redirection をご覧ください。
ask
がfalseの場合は、選択されたclusterNode
が保持しているClient
を使ってコマンドの実行処理を行います。
いずれにせよエラーを吐かずに正常終了したらそこでprocess
は終わりです。
そして、残りの処理は全てエラーを吐いた場合の対応です。
if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
if isReadOnly {
c.state.LazyReload()
}
node = nil
continue
}
最初にReadOnlyError
かどうか、あるいは接続プールから取得したコネクションが、何らかの理由で既に閉じられていたかどうかを判定しています。
ReadOnlyError
は平たくいうとレプリカノードに向けてリードオンリーでないコマンドを発行してしまった時に出るエラーです(正確にはRedis側の設定次第でレプリカノードにリードオンリーでないコマンドを発行することは可能)。
レプリカノードに向けてリードオンリーでないコマンドを発行したということは、go-redis側でマスターノードだと思っていたものがレプリカノードに変わっているということを表しているのでクラスター構成を最新のものに合わせる必要があります。
したがってc.state.LazyReload()
で保持しているクラスター構成情報の更新をしています。また、次のループでノードを再選択するよう、 node = nil
をしています。
// If slave is loading - pick another node.
if c.opt.ReadOnly && isLoadingError(lastErr) {
node.MarkAsFailing()
node = nil
continue
}
この部分も似たような処理ですね。isLoadingError
はインスタンスが起動中でまだコマンドを受け付けられない状態であるかどうかを判定しています。
この場合後からノードが使用可能になることが予想されるので、クラスター構成を更新する必要はないですが、かと言ってそのノードを直近で使用してもコマンドが実行できないのでnode.MarkAsFailing()
で15秒間不健全扱いにし、ノード選択ロジックで極力選ばれないようにしています。
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
c.state.LazyReload()
var err error
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
continue
}
エラー判定処理も終わりが近づいてきました。isMovedError
はコマンドの応答にMOVED
あるいはASK
が返ってきたかどうかを判定します。addr
はリダイレクト先のアドレスです。
moved
またはask
がtrueの場合は今管理しているクラスター構成のスナップショットと実際のクラスター構成が異なるということを意味しているので、ここでもc.state.LazyReload()
を走らせます。
また、リダイレクト先のノードを次回のループで使えばコマンドは成功するはずなので、それを使うために node, err = c.nodes.GetOrCreate(addr)
でclusterNode
の取得あるいは作成処理をしつつnode
に代入しています。
if shouldRetry(lastErr, cmd.readTimeout() == nil) {
// First retry the same node.
if attempt == 0 {
continue
}
// Second try another node.
node.MarkAsFailing()
node = nil
continue
}
shouldRetry
はその名の通り対象のエラーについて、リトライをしても大丈夫かどうかを判定する処理です。
例えばlastErr
がcontext.DeadlineExceeded
だったりしたらfalseになります。
逆にcontext.DeadlineExceeded
以外のタイムアウトエラーで、かつ実行したコマンドがBLPOP
コマンドのように引数にtimeout
を設定するようなものだった場合、再実行するのではなくそのまま終了するのが望ましい挙動なのでshouldRetry
はfalseになります。
また、attempt == 0
、つまり初回のループの時はnode.MarkAsFailing()
を実行しませんが2回目以降は実行するようになっています。
以上でprocess
の説明が終わりました。
これでmaxRedirects
が設定されていればmaxRetry
が-1でも問題ない理由がわかったのではないでしょうか。
つまりmaxRedirect
によって回数が制御されるprocess
という処理自体が元々指数バックオフによるリトライの仕組みを内包していたからです。
ClusterOptionsをちょっと詳しく解説
ここまでのコードリーディングによりClusterClient
がどのような処理をしているのかわかりました。お付き合いいただきありがとうございます。最後に、今まで見てきたClusterClient
の実装を踏まえてGoDocにあるClusterOptions
の公開フィールドについて見ていきましょう。
ReadOnly、RouteByLatency、RouteRandomly
ReadOnly
をtrue、あるいはRouteByLatency
とRouteRandomly
のいずれかをtrueにした場合、clusterNode
の選択ロジックはClusterClient.slotReadOnlyNode
によって行われるのでした。
func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
}
コードを見ていただいたらわかる通り、
-
RouteByLatency
がtrueの場合 -
RouteRandomly
がtrueの場合 -
ReadOnly
だけがtrueの場合
とで挙動が異なります。slotSlaveNode
の処理はすでに紹介しました。では、slotRandomNode
のコードを見てみましょう(ソースコード)。
func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.nodes.Random()
}
if len(nodes) == 1 {
return nodes[0], nil
}
randomNodes := rand.Perm(len(nodes))
for _, idx := range randomNodes {
if node := nodes[idx]; !node.Failing() {
return node, nil
}
}
return nodes[randomNodes[0]], nil
}
注目すべきところはrandomNodes := rand.Perm(len(nodes))
以降の処理ですね。
nodes[0]
にマスターノードに関する情報が入っている前提で実装されていることはこれまでにお伝えしてきましたが、slotRandomNode
の処理においてはインデックス0、すなわちマスターノードを特別扱いしていないのです。
すなわちRouteRandomly
をtrueにした場合、マスターノード含めて全ての対象となるノードの中からランダムにコマンドを発行するノードが選択されます。
同様にslotClosestNode
についてもマスターノードが選ばれるロジックになっています。
まとめると、レプリカが優先的に選択されるのはslotSlaveNode
が実行される「ReadOnly
だけがtrueの場合」だけなのです。
ドキュメントを見ると
// Enables read-only commands on slave nodes.
ReadOnly bool
// Allows routing read-only commands to the closest master or slave node.
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool
とあり、確かにRouteByLatency
やRouteRandomly
がマスターノードにもコマンドを飛ばすことは読み取れるのですが、ReadOnly
だけをtrueにしたときにレプリカノードだけにコマンドを飛ばせるようにできることをこれだけから読み取るのはさすがに難しくないか…?と思いました。
ClusterSlots
newClusterState
の引数として渡す、どのスロット範囲をどのノードが対応しているか、ということを表すために使われているフィールドでした。これを設定すると、本来内部でCLUSTER SLOTS
コマンドを実行してスロット情報を取得するところをカスタマイズできます。
// Optional function that returns cluster slots information.
// It is useful to manually create cluster of standalone Redis servers
// and load-balance read/write operations between master and slaves.
// It can use service like ZooKeeper to maintain configuration information
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func(context.Context) ([]ClusterSlot, error)
GoDocのコメントに書いてある通り、例えば今までの弊社のようなスタンドアロンのRedisインスタンス群で自前クラスターを組んでいるようなケースに対してもClusterClient
を活用することができます。自前でノードの選択アルゴリズムを実装する必要がなくなるので便利ですね。また、Valkeyクラスター(あるいはRedisクラスター)を使った開発中のよくあるユースケースとして本番環境では実際のクラスターを使いたいけれど、ローカルの自動テストなどではValkey一台だけを立てたい、みたいな事例を散見します。すでに世の中に出ている記事などを拝見すると、ラッパーを作ったり、あるいはgo-redisの場合はUniversalClient
を使うというケースが多いようです。UniversalClient
を使えるのであれば使ってしまうのがてっとり早いとは思いますが、何らかの事情によりClusterClient
のまま使いたいという場合にはClusterOptions.ClusterSlots
に以下のような関数を設定することによりClusterClient
のまま扱うこともできちゃったりします。
func clusterSlots(ctx context.Context) ([]redis.ClusterSlot, error) {
return []redis.ClusterSlot{
{
Start: 0,
End: 16383, // スロットの最大値
Nodes: []ClusterNode{{Addr: "127.0.0.1:6379"}},
},
}
}
この手法は実際go-redis自身のテストコードにも使われています(参考)。
MinRetryBackoff、MaxRetryBackoff
clusterNode
が保持するClient
の初期化時にも使われるのですが、先に見たようにClusterClient
のprocess
の中のループにおける指数バックオフにも使われます。具体的なコードは以下の通りです。
func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
個人的にはClient
の設定に使われている値をそのままClusterClient
の内部でも使うということに違和感がありましたが、MinRetryBackoff
などを流用しても動作上も支障は確かにないのかもなーということで納得しました。
Addrs
CLUSTER SLOTS
を呼び出すための初期シードとなるためのアドレスです。なので、クラスターの全ノードのアドレスを格納していなくても大丈夫ですが、指定したアドレスのノードがダウンしている可能性も鑑みて複数個設定するのが望ましいと思われます。
MaxRedirects
process
でみましたね。最大のリダイレクトの回数、もといClusterClient.process
内のループを最大何回繰り返すかを決めるための設定値です。
NewClient
clusterNode
が保持するClient
を作成するために使うことができるフィールドです。例えば公式ガイドにあるように、インスタンスごとに認証情報が異なっている場合などに活用できます。NewClient
フィールドを1から自作することはほぼなくて、基本的にはNewClient
関数を内部で呼び出す前にOptions
の値を改変するために使われるケースが多いのではないでしょうか?弊社では自動テストを実行する際、Docker ComposeでValkeyクラスターを立ち上げているのですが、CLUSTER SLOTS
コマンドで返ってくるアドレスをそのまま使ってしまうとホスト上で動いているGoアプリケーションから接続できなくなってしまうのでアドレスを変換する処理に使っています。
その他フィールド
すごい雑にまとめてしまいましたが、具体的には以下のフィールドです。
- ClientName
- Dialer
- OnConnect
- Protocol
- Username
- Password
- CredentialsProvider
- CredentialsProviderContext
- StreamingCredentialsProvider
- MaxRetries
- MinRetryBackoff
- MaxRetryBackoff
- DialTimeout
- ReadTimeout
- WriteTimeout
- PoolFIFO
- PoolSize
- PoolTimeout
- MinIdleConns
- MaxIdleConns
- MaxActiveConns
- ConnMaxIdleTime
- ConnMaxLifetime
- ReadBufferSize
- WriteBufferSize
- TLSConfig
- DisableIndentity
- IdentitySuffix
- UnstableResp3
- ContextTimeoutEnabled
以上のフィールドはclusterNode
が保持するClient
を生成する際にClientOptions
として渡されるものです(ContextTimeoutEnabled
についてはClusterClient
の処理においても使われていますがトランザクションやパイプライン関係のものなのでもあるので今回は割愛します)。これがわかれば、例えばGoDoc内のClusterOptions
のPoolSize
に関して、// applies per cluster node and not for the whole cluster
というコメントがついている意味がすんなり理解できるのではないでしょうか。
おわりに
今回説明した内容以外にも、例えばクラスターの重要な挙動としてレプリカノード上でコマンドを実行するためには事前にREADONLY
を発行しないといけずその辺の処理をどこでやっているかとか、他にも色々説明したいことはあるのですがそれを説明するには余白と僕の根性が足りず…orzただ今回解説した骨組みがわかっていれば、前よりは比較的楽にコードを辿れるのではないでしょうか?そうなっていただければ頑張って記事を書いた身としては幸いです。
なお、本記事はREALITYという会社として初めて技術記事をnoteではなくZennに投稿しようという試みのもと投稿された記事でもあります。
noteとはまた違った読者の方々にも届くと嬉しいなー。
REALITYでは現在サーバーエンジニアを募集しています。
Discussion