📖

go-redisのClusterClientとClusterOptionsの挙動についてソースコードから理解する

に公開

はじめに

初めまして。REALITY株式会社でサーバーエンジニアをしている松田と申します。最近は「ドンキーコング バナンザ」や「黒神話:悟空」など、霊長類が主人公のゲームをプレイしています。

今回はredis/go-redis/v9のClusterClientClusterOptionsの挙動について、ソースコードを参照しながら解説していきます。

背景

弊社ではMemorystore for Redisで独立したインスタンスを複数台立てて自前でクラスター運用しており、主にキャッシュ処理のために利用しています。

自前で、というのをもう少し具体的に説明すると、Redisクラスターモードを使用せず、アプリケーション側でコマンドの引数のキーを元に独自にノードを選択しコマンドを発行する運用です。

しかし最近この自前クラスター運用が辛くなってきました。

そこで、自前Redisクラスターに向いているキャッシュの処理をクラスターモードを利用したValkeyクラスター(Memorystore for Valkey)に移行していくプロジェクトを進めています。

ところで弊社ではサーバサイドの開発に主にGolangを使用しており、Redisクライアントとしてgo-redisを使用しています。ValkeyとRedisには基本的に互換性があるため、Valkeyクラスターのクライアントとしてもgo-redisを使うことができます。

これまではスタンドアロン構成へのアクセスしかなかったため、ClientOptionsの理解だけで足りていました。しかしクラスターモードではClusterClientClusterOptionsの挙動理解が不可欠です。加えて関心もあり、実際にソースコードを読んでClusterClientが内部的にどういう処理をしているかを把握することにしました。

本記事ではソースコードを読み解き、その動作と内部構造を解説します(※公式仕様として保証されるものではありません)。
ClusterClientClusterOptionsについては公式ドキュメントや外部資料が現状充実しているとは言えず、実際に実験をしていても理由が分からない挙動に遭遇することが何度かありました。そういった方々への一助となれば幸いです。

想定読者

Valkey(Redis)クラスターのスロットなどの基本的な概念を把握しており、redis/go-redis/v9のClusterClientClusterOptionsの挙動をソースコードレベルで理解したい方

この記事を読んで得られるもの

  • ClusterClientがクラスター構成をどのように管理しているか
  • ClusterClientがコマンドを発行する際の基本的な内部フロー
  • ClusterOptionsの各種パラメータが内部でどのように使われているか

前提

今回コードを読むのに使ったのはredis/go-redis/v9のmasterでコミットバージョンは7b4a537aef9e2670fff6990e81f278021e7c1499 です。

ClusterClientにはPub/Subやトランザクション、パイプラインなどたくさんの重要な機能がサポートされていますが、今回はこれらについては対象外とし、ClusterOptionsと関係するClusterClientの骨組みに絞って解説します。

ClusterClientのコンポーネント

ClusterClientの構造体のコードを覗いてみると、次のような定義となっています(ソースコード)。

type ClusterClient struct {
	opt           *ClusterOptions
	nodes         *clusterNodes
	state         *clusterStateHolder
	cmdsInfoCache *cmdsInfoCache
	cmdable
	hooksMixin
}

このうち、ClusterClientを理解するにあたって重要なコンポーネントはclusterNodesclusterStateHoldercmdsInfoCacheの3つです。さらにこれらを構成する部品として、clusterNodeclusterStateがあります。

この節では、まずクラスター構成に関する4つのコンポーネント(clusterNodeclusterNodesclusterStateclusterStateHolder)について解説し、最後にコマンド情報を管理するcmdsInfoCacheについて説明します。

あらかじめClusterClient内の主要コンポーネントの関係性を簡略化した図を置いておきます。内部処理を追う前に全体像を把握しておくと理解しやすくなるかもしれません。

ClusterClient
 ├── ClusterOptions (ClusterClient初期化時のオプション)
 ├── clusterNodes (ClusterClientが管理しているノード情報の集合)
 │    └── map[addr]*clusterNode
 │          └── Client(各ノードにコマンドを発行するためのクライアント)
 ├── clusterStateHolder (最新のクラスター構成を管理するコンポーネント)
 │    └── clusterState (クラスター構成のスナップショット)
 └── cmdsInfoCache (各コマンドがリードオンリーかどうかを保持)

clusterNode

clusterNodeはクラスター中の1ノードとの接続を管理する構造体であり、定義は次のようになっています(ソースコード)。

type clusterNode struct {
	Client *Client

	latency    uint32 // atomic
	generation uint32 // atomic
	failing    uint32 // atomic
	loaded     uint32 // atomic

	// last time the latency measurement was performed for the node, stored in nanoseconds from epoch
	lastLatencyMeasurement int64 // atomic
}

Clientフィールドを持っていることから想像ができるように、ClusterClientが各ノードに向けてコマンドを発行する際はこのclusterNodeが所有しているClientが使われます。

このClientフィールドは次のnewClusterNode関数内で初期化されます。

func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
	opt := clOpt.clientOptions()
	opt.Addr = addr
	node := clusterNode{
		Client: clOpt.NewClient(opt),
	}

	node.latency = math.MaxUint32
	if clOpt.RouteByLatency {
		go node.updateLatency()
	}

	return &node
}

Client: clOpt.NewClient(opt)がノードに接続するクライアントを初期化している箇所です。ClusterOptionsNewClientフィールドが設定されていなかった場合は、デフォルトで通常のNewClient関数(ソースコード)が代わりに利用されます。

また、この構造体ではノードに関する健全性やPing値も管理しています。健全性の処理にはfailingloaded、Ping値の処理にはlatencylastLatencyMeasurementが使われます。ただしPing値の処理は、ClusterOptionsRouteByLatencyをtrueにした場合のみ行われます。

generationについては少し補足します。go-redisが管理しているクラスター構成やノード情報は、スケールインやスケールアウト、ノードのフェイルオーバーなどのイベントに追従して更新する必要があります。古いクラスター情報は保持していても意味がないため破棄しますが、その判定のために各クラスター構成やノード情報にバージョン番号のようなものを付与しています。この役割を担っているのがgenerationです。

clusterNodes

clusterNodesは先ほど説明したclusterNodeを束ねる構造体です。定義は次のようになります(ソースコード)。

type clusterNodes struct {
	opt *ClusterOptions

	mu          sync.RWMutex
	addrs       []string
	nodes       map[string]*clusterNode
	activeAddrs []string
	closed      bool
	onNewNode   []func(rdb *Client)

	generation uint32 // atomic
}

nodesは、アドレス値をキーとして対応するclusterNodeを取得するmapです。
generationは、グローバルに持っている最新のクラスター構成の世代番号を表します。

clusterNodesNewClusterClient内で1度だけ生成され、以降そのClusterClient内で使い回されます。
そのため、このclusterNodesClusterClientにおける全世代のclusterNodeを集約する構造体となります。
したがって、generationが10のclusterNodesnodesの中に、generationが1のclusterNodeが格納されている瞬間が存在し得ます(これは後述するclusterStateHolderが呼び出すclusterNodes.GCによって時間が経てば解消されます)。

closedは、ClusterClient.Close()が呼び出されたかどうかのフラグです。trueの場合、それ以降の接続を伴う処理はエラーとなります。

addrsには、ClusterOptions.Addrsも含め、これまでにclusterNodeで管理されたノードのアドレスが格納されます。
一方activeAddrsには古い世代のノードアドレスは含まれず、現在利用中のノードアドレスのみが格納されると思っていただければ大丈夫です。

clusterState

clusterStateはクラスター構成のスナップショット情報を管理する構造体です(ソースコード)。

type clusterState struct {
	nodes   *clusterNodes
	Masters []*clusterNode
	Slaves  []*clusterNode

	slots []*clusterSlot

	generation uint32
	createdAt  time.Time
}

Mastersはクラスターのマスターノード、Slavesはレプリカノードを表します。generationはこのクラスター構成の世代番号です。clusterNodesで出てきたやつですね。

clusterSlotは次のような定義の構造体であり、スロット値の区間とそれらを担当しているノード群がどれであるかの情報を保持しています(ソースコード)。

type clusterSlot struct {
	start int
	end   int
	nodes []*clusterNode
}

clusterStateは次のようなnewClusterStateという関数で作成されます(ソースコード)。

func newClusterState(
	nodes *clusterNodes, slots []ClusterSlot, origin string,
) (*clusterState, error) {
	c := clusterState{
		nodes: nodes,

		slots: make([]*clusterSlot, 0, len(slots)),

		generation: nodes.NextGeneration(), // 最新の世代番号+1が入る
		createdAt:  time.Now(), // 作成時刻を保持しておく
	}

	...(省略)

	for _, slot := range slots {
		var nodes []*clusterNode
		for i, slotNode := range slot.Nodes {
			addr := slotNode.Addr
			...(省略)

			node, err := c.nodes.GetOrCreate(addr)
			if err != nil {
				return nil, err
			}

			node.SetGeneration(c.generation)
			nodes = append(nodes, node)


			if i == 0 { // CLUSTER SLOTSコマンドの仕様により、0番目にマスターノードの情報が入る
				c.Masters = appendIfNotExist(c.Masters, node)
			} else {
				c.Slaves = appendIfNotExist(c.Slaves, node)
			}
		}

		c.slots = append(c.slots, &clusterSlot{
			start: slot.Start,
			end:   slot.End,
			nodes: nodes,
		})
	}

	sort.Sort(clusterSlotSlice(c.slots))

	time.AfterFunc(time.Minute, func() {
		nodes.GC(c.generation)
	})

	return &c, nil
}

長いので上から順に紐解いていきます。

まず引数の定義から見ていきましょう。
nodesはこれまでも説明してきたclusterNodeを集約する構造体ですね。
slotsCLUSTER SLOTSコマンドの実行結果から生成されたデータ、あるいはClusterOptionsClusterSlotsが渡されている場合はそれの実行結果が直接渡されます。
originCLUSTER SLOTSコマンドを発行したノードのアドレスが入ります。

次にgeneration: nodes.NextGeneration(), についてです。
nodesが最新の世代番号を持っているので、新しいclusterState(クラスター構成のスナップショット)はその世代番号の次の番号をフィールドとして保持することになります。

次にループ処理の中身を見ていきます。

	for _, slot := range slots {
		var nodes []*clusterNode
		for i, slotNode := range slot.Nodes {
			addr := slotNode.Addr
			...(省略)

			node, err := c.nodes.GetOrCreate(addr)
			if err != nil {
				return nil, err
			}

			node.SetGeneration(c.generation)
			nodes = append(nodes, node)

			if i == 0 { // CLUSTER SLOTSコマンドの仕様により、0番目にマスターノードの情報が入る
				c.Masters = appendIfNotExist(c.Masters, node)
			} else {
				c.Slaves = appendIfNotExist(c.Slaves, node)
			}
		}

		c.slots = append(c.slots, &clusterSlot{
			start: slot.Start,
			end:   slot.End,
			nodes: nodes,
		})
	}

この部分では各スロット情報をもとにclusterNodeを作成しています。
スロット情報に対応する各ノードのアドレスをもとにclusterNodeを取得あるいはnewClusterNodeにより生成したのち(c.nodes.GetOrCreate)、世代番号を設定し、マスターノードあるいはレプリカノードとして管理対象に追加しています。
CLUSTER SLOTSコマンドは0番目にマスターノードの情報を返し、それ以降はレプリカノードの情報を返すので(参考)、i == 0かどうかでマスターノードかどうかの判定ができます。
そして各スロットの区間に対応するノード群をc.slotsとして保持するようにしています。

コード中で「…(省略)」と書いてある箇所について

上記のコード中で「…(省略)」と書いてある箇所についてはループバックアドレスに関する処理を行っています。今回は本質ではないので省略していますがなぜそのような処理が必要なのかなど興味がある方はこちらのPRが参考になるかもしれません。

では最後のnodes.GCを呼び出している部分について見ていきましょう。

	time.AfterFunc(time.Minute, func() {
		nodes.GC(c.generation)
	})

newClusterStateを呼び出してから、つまり新しいクラスター構成のスナップショットを作成してから1分後にnodes.GCを呼び出すような処理です。nodes.GCは以下のようなコードとなっています(ソースコード)。

// GC removes unused nodes.
func (c *clusterNodes) GC(generation uint32) {
	var collected []*clusterNode

	c.mu.Lock()

	c.activeAddrs = c.activeAddrs[:0]
	now := time.Now()
	for addr, node := range c.nodes {
		if node.Generation() >= generation {
			c.activeAddrs = append(c.activeAddrs, addr)
			if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
				go node.updateLatency()
			}
			continue
		}

		delete(c.nodes, addr)
		collected = append(collected, node)
	}

	c.mu.Unlock()

	for _, node := range collected {
		_ = node.Client.Close()
	}
}

この処理の中で、古い世代のclusterNodeclusterNodesから削除したり、各ノードのPing値を計算していたりします。
また、activeAddrsの更新もこのタイミングで行われており、古いclusterNodeが保持するClientについてのClose処理も行われています。

次にいったんコードについての説明を止めてclusterStateがスロットに関する情報を持っている理由について話します。

Valkeyクラスターの各ノードではそれぞれ特定の範囲のスロットを管轄しており、それ以外のスロットに属するキーを含むコマンドを実行しようとした際にはMOVEDASKを応答で返すようになっています。
MOVEDASKにはそのキーのスロットを管轄しているノード、すなわち正しい向き先のノードに関するアドレス情報が入っているので、エラーからその情報を読み取ってリクエストを投げれば正しいノードにコマンドを発行することはできますが、毎回それをやるとコマンドのたびに無駄に1リクエスト走らせることになってしまい処理効率の低下につながります。

しかしキーのスロットの計算式は仕様で決まっていて公開されており(Valkeyのページ)、実際go-redisでもinternal/hashtag/hashtag.goSlotという関数でスロットの計算を行っています(ソースコード)。
よってどのノードがどのスロットを担当しているかを把握しておけば無駄にMOVEDASKに遭遇することを抑えられるのです。

したがって、スロット情報を保持するclusterStateではコマンドを実行するノードの選択ロジックも実装されています。
例えばSETなどのデータを破壊的に変更するコマンドについて、処理するノードを決めるメソッドのslotMasterNodeがあります(ソースコード)。

func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
	nodes := c.slotNodes(slot)
	if len(nodes) > 0 {
		return nodes[0], nil
	}
	return c.nodes.Random()
}

slotNodesがやっている処理は基本的に引数のslotを含むようなスロットの範囲を持つclusterNodeのスライスを返しているだけです。
戻り値にはマスターノードだけでなくレプリカノードが含まれている可能性がありますが、0番目にマスターノードの情報が入っているという前提で実装されているので、nodes[0]を返せばマスターノードを選択できるのです。

それとは別にslotSlaveNodeというメソッドもあります(ソースコード)。

func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
	nodes := c.slotNodes(slot)
	switch len(nodes) {
	case 0:
		return c.nodes.Random()
	case 1:
		return nodes[0], nil
	case 2:
		slave := nodes[1]
		if !slave.Failing() && !slave.Loading() {
			return slave, nil
		}
		return nodes[0], nil
	default:
		var slave *clusterNode
		for i := 0; i < 10; i++ {
			n := rand.Intn(len(nodes)-1) + 1
			slave = nodes[n]
			if !slave.Failing() && !slave.Loading() {
				return slave, nil
			}
		}

		// All slaves are loading - use master.
		return nodes[0], nil
	}
}

詳細な説明は省略しますが、c.slotNodesの戻り値の0番目に入るのはマスターノードであることを考えると、レプリカノードがある場合はそれをランダムに選択しようとしている処理になっているということがわかります。
FailingLoadingというのはネットワーク疎通の問題や起動直後かどうかを表していると考えてください。つまり健全なレプリカノードを極力選択しようとしているメソッドだということです。

さて、ここで難しいのはクラスター構成は常に固定のものではなく、スケールインやスケールアウトなどによって変動したり、あるいはノードのフェイルオーバーなどにより容易に変更されてしまうことです。
したがってその辺りのクラスター構成の変更をうまいこと実行中のコマンドに影響を出さずにclusterNodesに反映させるようにする処理の一つがGCであり、clusterStateを追従させるコンポーネントが次のclusterStateHolderとなるのです。

clusterStateHolder

clusterStateHolderは主にclusterStateの更新に関して責務を持つ構造体です(ソースコード)。

type clusterStateHolder struct {
	load func(ctx context.Context) (*clusterState, error)

	state     atomic.Value
	reloading uint32 // atomic
}

この構造体の主な役割は、現在のクラスター構成のスナップショット(clusterState)を保持し、必要に応じてそれを更新することです。stateに入るのは現在のclusterStateです。並行にアクセスされうるため、atomic.Valueとして保持しています。loadはクラスター状態を更新する処理であり、通常は以下のClusterClient.loadStateが設定されます(ソースコード)。

func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
	if c.opt.ClusterSlots != nil {
		slots, err := c.opt.ClusterSlots(ctx)
		if err != nil {
			return nil, err
		}
		return newClusterState(c.nodes, slots, "")
	}

	addrs, err := c.nodes.Addrs()
	if err != nil {
		return nil, err
	}

	var firstErr error

	for _, idx := range rand.Perm(len(addrs)) {
		addr := addrs[idx]

		node, err := c.nodes.GetOrCreate(addr)
		if err != nil {
			if firstErr == nil {
				firstErr = err
			}
			continue
		}

		slots, err := node.Client.ClusterSlots(ctx).Result()
		if err != nil {
			if firstErr == nil {
				firstErr = err
			}
			continue
		}

		return newClusterState(c.nodes, slots, addr)
	}

	/*
	 * No node is connectable. It's possible that all nodes' IP has changed.
	 * Clear activeAddrs to let client be able to re-connect using the initial
	 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
	 * which might have chance to resolve domain name and get updated IP address.
	 */
	c.nodes.mu.Lock()
	c.nodes.activeAddrs = nil
	c.nodes.mu.Unlock()

	return nil, firstErr
}

loadStateのやっていることを簡略化して説明すると、接続できる可能性があるアドレスについてランダムにピックしていき、CLUSTER SLOTSコマンドを発行してスロットと対応するノードのマッピング情報を取得します。
そしてそれを使ってnewClusterStateを呼び出し、新しいclusterStateを作成している、という感じです。

なおClusterOptions.ClusterSlotsが設定されている場合についてはCLUSTER SLOTSを呼び出さずにすませることができます。
具体的には後述の「ClusterOptionsをちょっと詳しく解説」についてをみていただければと思います。

続きです、まだreloadingというフィールドが残っていますが一旦後回しにします。clusterStateHolderの役目は極力、本当のクラスター構成と齟齬がないclusterStateを提供することです。例えばclusterState.Getメソッドを見てみましょう(ソースコード)。

func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
	v := c.state.Load()
	if v == nil {
		return c.Reload(ctx)
	}

	state := v.(*clusterState)
	if time.Since(state.createdAt) > 10*time.Second {
		c.LazyReload()
	}
	return state, nil
}

Getがやっていることは単にc.state、つまりフィールドとして保持しているclusterStateを返すだけではありません。
stateの作成時刻(=createdAt)から10秒経過していたのならばc.LazyReload()という現在のc.stateを新しいclusterStateに置き換えるメソッドを呼び出します。
これにより、clusterStateHolder経由でclusterStateを取得すれば本物のクラスター構成と近いスナップショットが手に入るというわけです。

ただ後々出てきますが、実際にはclusterStateの更新処理はGetの中での時間経過判定以外でも発火します。
つまりどういうことが起きるかというとクラスター情報の更新処理が非常に頻繁に呼び出されてしまうケースがあるのです。
しかし、例えば1秒以内にクラスターの構成が複数回変わるなどということは想定しづらいです。

したがって、更新処理自体は必要なタイミングで都度行いたいけれど、かといって無駄にクラスター情報を取得しまくってもパフォーマンス低下につながったり処理のブロックに繋がるだけだからいい感じにしたいという需要が発生します。
それを実現するのに一役買っているのがLazyReload、そしてそれが扱うclusterStateHolderのフィールドreloadingです(LazyReloadという名前を見てすでにピンときていた方もいらっしゃると思います)。

LazyReloadのコードは次のようになります(ソースコード)。

func (c *clusterStateHolder) LazyReload() {
	if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
		return
	}
	go func() {
		defer atomic.StoreUint32(&c.reloading, 0)

		_, err := c.Reload(context.Background())
		if err != nil {
			return
		}
		time.Sleep(200 * time.Millisecond)
	}()
}

やっていること自体は至極シンプルです。
reloadingフィールドが0でなければ(つまりreload中の場合は)即座に処理を終了します。
そうでなかった場合はreloadingフィールドを1にし、そしてc.Reload、具体的にはClusterClient.loadStateの処理を呼び出し、clusterStateHolder.stateに新しく格納する処理をゴルーチンで動かします。

このままだと単なる更新処理ですが、deferreloadingフィールドを0に戻すのに200ミリ秒待っているところがポイントです。
これによりこの200ミリ秒の間はLazyReloadを呼ばれても多重実行になることはありません。

以上がclusterStateをいい感じに最新に保つ努力をしているコンポーネント、clusterStateHolderの説明でした。

cmdsInfoCache

最後のコンポーネントcmdsInfoCacheは、COMMANDコマンドの実行結果を保存するキャッシュであり、実態としては次のようなmapです。

{
	"get": &CommandInfo{
		Name: "get",
		Arity: 2,
		Flags: []string{"readonly", "fast"},
		ACLFlags: []string{"@read", "@string", "@fast"},
		FirstKeyPos: 1,
		LastKeyPos: 1,
		StepCount: 1,
		ReadOnly: true,
	},
	"set": &CommandInfo{
		Name: "set",
		Arity: -3,
		Flags: []string{"write", "denyoom"},
		ACLFlags: []string{"@write", "@string", "@slow"},
		FirstKeyPos: 1,
		LastKeyPos: 1,
		StepCount: 1,
		ReadOnly: false,
	},
	....
}

COMMANDの実行は初期化時に渡されるClusterClient.cmdInfoを通して行なっており、sync.Onceのようなもの(ソースコード)を使って一つのClusterClientにつきただ一度だけCOMMANDを発行するように制御されています(ソースコード)。

sync.Onceのようなものとは?

sync.Onceとの違いはerrorを戻り値として返すこと、およびエラー時は再実行できるようにしていることです。

func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error) {
	err := c.once.Do(func() error {
		cmds, err := c.fn(ctx)
		if err != nil {
			return err
		}

		lowerCmds := make(map[string]*CommandInfo, len(cmds))

		// Extensions have cmd names in upper case. Convert them to lower case.
		for k, v := range cmds {
			lowerCmds[internal.ToLower(k)] = v
		}

		c.cmds = lowerCmds
		return nil
	})
	return c.cmds, err
}

cmdsInfoCacheは、あるコマンドがデータに破壊的な変更を及ぼさない、リードオンリーなものかどうかを判定するのに使っています。
例えばGETはリードオンリーなコマンドですが、SETはリードオンリーではありません。
そして基本的にはリードオンリーでないコマンドはクラスターのレプリカノードでは実行することができません。

ClusterOptionsにはGETMGETなどのリードオンリーなコマンドをレプリカノードに向けて発行するためのオプションReadOnlyがありますが、その際このcmdsInfoCacheが保持しているデータを用いてレプリカノードに向けてもいいコマンドかどうかを判断しているというわけです。

なお、COMMANDコマンドが返すデータの中には、キーがコマンドの引数のどの位置に存在しているかを表すプロパティが存在しています。
なのでこれを使ってスロット値の計算に使うキーを抽出することも可能だと思われるのですが、現在のgo-redisでは TODO: Use the data in CommandInfo to determine the first key position. というコメントがついており記事を執筆した2025年8月25日時点では対応されていないようです(参考)。

これで主要コンポーネントに関する説明は終わりです。今度は処理フローの解説に入っていきます。

ClusterClientの処理フロー

次にClusterClientが実際にどのようにコマンドを実行しているのかを追っていきましょう。

初期化

まずNewClusterClientのコードについて見ていきます。

func NewClusterClient(opt *ClusterOptions) *ClusterClient {
	if opt == nil {
		panic("redis: NewClusterClient nil options")
	}
	opt.init()

	c := &ClusterClient{
		opt:   opt,
		nodes: newClusterNodes(opt),
	}

	c.state = newClusterStateHolder(c.loadState)
	c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
	c.cmdable = c.Process

	c.initHooks(hooks{
		dial:       nil,
		process:    c.process,
		pipeline:   c.processPipeline,
		txPipeline: c.processTxPipeline,
	})

	return c
}

initHookscmdableについてはいったん気にしなくて良いです。今回注目すべきは opt.init()です。これにより、ClusterOptions内でゼロ値になっていた値がClusterOptionsが設定するデフォルトの値に変更されます。具体的なコードは以下の通りです。

func (opt *ClusterOptions) init() {
	switch opt.MaxRedirects {
	case -1:
		opt.MaxRedirects = 0
	case 0:
		opt.MaxRedirects = 3
	}

	if opt.RouteByLatency || opt.RouteRandomly {
		opt.ReadOnly = true
	}

	if opt.PoolSize == 0 {
		opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
	}
	if opt.ReadBufferSize == 0 {
		opt.ReadBufferSize = proto.DefaultBufferSize
	}
	if opt.WriteBufferSize == 0 {
		opt.WriteBufferSize = proto.DefaultBufferSize
	}

	switch opt.ReadTimeout {
	case -1:
		opt.ReadTimeout = 0
	case 0:
		opt.ReadTimeout = 3 * time.Second
	}
	switch opt.WriteTimeout {
	case -1:
		opt.WriteTimeout = 0
	case 0:
		opt.WriteTimeout = opt.ReadTimeout
	}

	if opt.MaxRetries == 0 {
		opt.MaxRetries = -1
	}
	switch opt.MinRetryBackoff {
	case -1:
		opt.MinRetryBackoff = 0
	case 0:
		opt.MinRetryBackoff = 8 * time.Millisecond
	}
	switch opt.MaxRetryBackoff {
	case -1:
		opt.MaxRetryBackoff = 0
	case 0:
		opt.MaxRetryBackoff = 512 * time.Millisecond
	}

	if opt.NewClient == nil {
		opt.NewClient = NewClient
	}
}

基本的には妥当な値設定に見えるのですが、MaxRetriesに何も指定しなかった場合は-1、すなわちリトライがされない設定になっているところが気になるかもしれません。ネットワーク疎通の問題によりコマンドの実行が一時的に失敗するなどは十分考えられるケースであり、そのようなケースに対する対応としてリトライ設定を入れるというのは一般的なものです。にもかかわらずMaxRetriesを-1にするような設定がデフォルトで大丈夫なのでしょうか?結論から言うとClusterClientにおいては問題ないような挙動になっています。それではClusterClientがどのようなフローでコマンドを実行しているかについて見ていきましょう。

コマンド実行

ClusterClient.Getを例に出して説明します。まず、ClusterClientの定義を再掲します。

type ClusterClient struct {
	opt           *ClusterOptions
	nodes         *clusterNodes
	state         *clusterStateHolder
	cmdsInfoCache *cmdsInfoCache
	cmdable
	hooksMixin
}

Getの処理が定義されているのは、このうちのcmdableです。cmdableの定義は

type cmdable func(ctx context.Context, cmd Cmder) error

となっており、cmdable自体に以下のようなGetメソッドが定義されています(ソースコード)。

func (c cmdable) Get(ctx context.Context, key string) *StringCmd {
	cmd := NewStringCmd(ctx, "get", key)
	_ = c(ctx, cmd)
	return cmd
}

ClusterClient.Getを呼び出す時に実行されているのはこのcmdableに定義されているメソッドなわけですね。さて、NewClusterClientのコードを読むと以下のような箇所がありました。

c.cmdable = c.Process

つまり、ClusterClient.Get(ctx, key)が呼び出された時に何が起こるかと言うと、ClusterClient.Process(ctx, NewStringCmd(ctx, "get", key)) が実行されていたわけです。
そしてClusterClient.Processの定義を読み進めていくと、実態がClusterClient.processという以下のメソッドであることがわかります(フックについては説明の簡略化のために今回は省略します)。
このprocessメソッドがClusterClientがコマンドを実行する処理フローの中心部ということになります(ソースコード)。

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
	slot := c.cmdSlot(cmd, -1)
	var node *clusterNode
	var moved bool
	var ask bool
	var lastErr error
	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
		// MOVED and ASK responses are not transient errors that require retry delay; they
		// should be attempted immediately.
		if attempt > 0 && !moved && !ask {
			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
				return err
			}
		}

		if node == nil {
			var err error
			node, err = c.cmdNode(ctx, cmd.Name(), slot)
			if err != nil {
				return err
			}
		}

		if ask {
			ask = false

			pipe := node.Client.Pipeline()
			_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
			_ = pipe.Process(ctx, cmd)
			_, lastErr = pipe.Exec(ctx)
		} else {
			lastErr = node.Client.Process(ctx, cmd)
		}

		// If there is no error - we are done.
		if lastErr == nil {
			return nil
		}
		if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
			if isReadOnly {
				c.state.LazyReload()
			}
			node = nil
			continue
		}

		// If slave is loading - pick another node.
		if c.opt.ReadOnly && isLoadingError(lastErr) {
			node.MarkAsFailing()
			node = nil
			continue
		}

		var addr string
		moved, ask, addr = isMovedError(lastErr)
		if moved || ask {
			c.state.LazyReload()

			var err error
			node, err = c.nodes.GetOrCreate(addr)
			if err != nil {
				return err
			}
			continue
		}

		if shouldRetry(lastErr, cmd.readTimeout() == nil) {
			// First retry the same node.
			if attempt == 0 {
				continue
			}

			// Second try another node.
			node.MarkAsFailing()
			node = nil
			continue
		}

		return lastErr
	}
	return lastErr
}

ちょっと長いので上から順番に読み進めていきましょう。

slot := c.cmdSlot(cmd, -1) はコマンドに含まれるキーからスロット値を計算する処理です。
NewStringCmd(ctx, "get", key) のようにcmdには引数となるキーが指定されているのでそのキーに対して CRC16(key) mod 16384 を計算することにより、スロット値を計算できます。
なおキーの値を計算するにあたって、コマンド中に出現するキーの位置を取得する必要があるのですが、現在はアドホックな処理によって実現されています(参考)。

そして次にループの処理です。maxRedirects+1回分だけ処理が実行されます。
まず指数バックオフの部分について見ていきます。

		// MOVED and ASK responses are not transient errors that require retry delay; they
		// should be attempted immediately.
		if attempt > 0 && !moved && !ask {
			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
				return err
			}
		}

これは、前回処理に失敗した理由が、応答にMOVEDでもASKでもないエラーが返ってきた場合に行われる処理です。
MOVED、あるいはASKはコマンドを実行するノードの指定を間違ったことを表すので即座にリトライが可能です。
なのでそれ以外の原因、例えばネットワーク疎通に問題が起きているときやクラスターがダウンしている時などはjitterありの指数バックオフで一定以上の時間をあけてからリトライしています。
なおこの際の指数バックオフの最小待ち時間と最大待ち時間の設定として、ClusterOptionsMinRetryBackoffMaxRetryBackoffが使用されます。

次はコマンドを発行するノードを選択するロジックのコードです。

		if node == nil {
			var err error
			node, err = c.cmdNode(ctx, cmd.Name(), slot)
			if err != nil {
				return err
			}
		}

cmdNodeによってコマンドを発行するべきclusterNodeが返されます。cmdNodeについては重要な処理なので実際にコードを見て見ましょう(ソースコード)。

func (c *ClusterClient) cmdNode(
	ctx context.Context,
	cmdName string,
	slot int,
) (*clusterNode, error) {
	state, err := c.state.Get(ctx)
	if err != nil {
		return nil, err
	}

	if c.opt.ReadOnly {
		cmdInfo := c.cmdInfo(ctx, cmdName)
		if cmdInfo != nil && cmdInfo.ReadOnly {
			return c.slotReadOnlyNode(state, slot)
		}
	}
	return state.slotMasterNode(slot)
}

c.state.Get(ctx)の戻り値であるstateclusterState型の値です。clusterStateはクラスター構成のスナップショットを表現する構造体でした。
c.stateclusterStateHolderなので、Getの戻り値は極力最新のクラスター構成を反映しようという処理が通された結果得られたclusterStateになります。

そしてClusterOptionsにてReadOnlyをtrueに設定していたかいないかでその後のノード選択ロジックが変わります。
trueにしていた場合、コマンドがリードオンリーなものかどうかの情報をcmdsInfoCache経由で取得します。
そしてリードオンリーだった場合については専用のノード選択ロジック処理を実行します。
ここで注意すべきなのはコマンドがリードオンリーじゃなかった場合についてはエラーになるのではなく、単にstate.slotMasterNode、すなわちマスターノードを選択するロジックにフォールバックされるということですね。

次です。ここまでの処理でコマンドを発行するべきノードを決定できたので、実際にコマンドを発行する処理に入ります。その部分は以下のコードです。

		if ask {
			ask = false

			pipe := node.Client.Pipeline()
			_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
			_ = pipe.Process(ctx, cmd)
			_, lastErr = pipe.Exec(ctx)
		} else {
			lastErr = node.Client.Process(ctx, cmd)
		}
		
		// If there is no error - we are done.
		if lastErr == nil {
			return nil
		}

askは前回の処理の応答としてASKが返ってきたかどうかを表すフラグです。
ASKが返ってきたということは、コマンドの引数となるキーのスロット値を担当している担当のノードが変更中であるということです。
ASKが返されたキーについて処理をする場合は、ASKINGコマンドを発行したのち、リダイレクト先のノードに対してコマンドを発行する、というフローに従って処理を行う必要があります。
ASKINGの後に発行されるコマンドは同一コネクション上で、順序通りに行われることを保証しなければならないのでパイプラインを使っています。詳しくは https://valkey.io/topics/cluster-spec/#ask-redirection をご覧ください。

askがfalseの場合は、選択されたclusterNodeが保持しているClientを使ってコマンドの実行処理を行います。
いずれにせよエラーを吐かずに正常終了したらそこでprocessは終わりです。

そして、残りの処理は全てエラーを吐いた場合の対応です。

		if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
			if isReadOnly {
				c.state.LazyReload()
			}
			node = nil
			continue
		}

最初にReadOnlyErrorかどうか、あるいは接続プールから取得したコネクションが、何らかの理由で既に閉じられていたかどうかを判定しています。
ReadOnlyErrorは平たくいうとレプリカノードに向けてリードオンリーでないコマンドを発行してしまった時に出るエラーです(正確にはRedis側の設定次第でレプリカノードにリードオンリーでないコマンドを発行することは可能)。
レプリカノードに向けてリードオンリーでないコマンドを発行したということは、go-redis側でマスターノードだと思っていたものがレプリカノードに変わっているということを表しているのでクラスター構成を最新のものに合わせる必要があります。
したがってc.state.LazyReload()で保持しているクラスター構成情報の更新をしています。また、次のループでノードを再選択するよう、 node = nil をしています。

		// If slave is loading - pick another node.
		if c.opt.ReadOnly && isLoadingError(lastErr) {
			node.MarkAsFailing()
			node = nil
			continue
		}

この部分も似たような処理ですね。isLoadingErrorはインスタンスが起動中でまだコマンドを受け付けられない状態であるかどうかを判定しています。
この場合後からノードが使用可能になることが予想されるので、クラスター構成を更新する必要はないですが、かと言ってそのノードを直近で使用してもコマンドが実行できないのでnode.MarkAsFailing()で15秒間不健全扱いにし、ノード選択ロジックで極力選ばれないようにしています。

		var addr string
		moved, ask, addr = isMovedError(lastErr)
		if moved || ask {
			c.state.LazyReload()

			var err error
			node, err = c.nodes.GetOrCreate(addr)
			if err != nil {
				return err
			}
			continue
		}

エラー判定処理も終わりが近づいてきました。isMovedErrorはコマンドの応答にMOVEDあるいはASKが返ってきたかどうかを判定します。addrはリダイレクト先のアドレスです。
movedまたはaskがtrueの場合は今管理しているクラスター構成のスナップショットと実際のクラスター構成が異なるということを意味しているので、ここでもc.state.LazyReload()を走らせます。
また、リダイレクト先のノードを次回のループで使えばコマンドは成功するはずなので、それを使うために node, err = c.nodes.GetOrCreate(addr)clusterNodeの取得あるいは作成処理をしつつnodeに代入しています。

		if shouldRetry(lastErr, cmd.readTimeout() == nil) {
			// First retry the same node.
			if attempt == 0 {
				continue
			}

			// Second try another node.
			node.MarkAsFailing()
			node = nil
			continue
		}

shouldRetryはその名の通り対象のエラーについて、リトライをしても大丈夫かどうかを判定する処理です。
例えばlastErrcontext.DeadlineExceededだったりしたらfalseになります。
逆にcontext.DeadlineExceeded以外のタイムアウトエラーで、かつ実行したコマンドがBLPOPコマンドのように引数にtimeoutを設定するようなものだった場合、再実行するのではなくそのまま終了するのが望ましい挙動なのでshouldRetryはfalseになります。
また、attempt == 0、つまり初回のループの時はnode.MarkAsFailing()を実行しませんが2回目以降は実行するようになっています。

以上でprocessの説明が終わりました。
これでmaxRedirectsが設定されていればmaxRetryが-1でも問題ない理由がわかったのではないでしょうか。
つまりmaxRedirectによって回数が制御されるprocessという処理自体が元々指数バックオフによるリトライの仕組みを内包していたからです。

ClusterOptionsをちょっと詳しく解説

ここまでのコードリーディングによりClusterClientがどのような処理をしているのかわかりました。お付き合いいただきありがとうございます。最後に、今まで見てきたClusterClientの実装を踏まえてGoDocにあるClusterOptionsの公開フィールドについて見ていきましょう。

ReadOnly、RouteByLatency、RouteRandomly

ReadOnlyをtrue、あるいはRouteByLatencyRouteRandomlyのいずれかをtrueにした場合、clusterNodeの選択ロジックはClusterClient.slotReadOnlyNodeによって行われるのでした。

func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
	if c.opt.RouteByLatency {
		return state.slotClosestNode(slot)
	}
	if c.opt.RouteRandomly {
		return state.slotRandomNode(slot)
	}
	return state.slotSlaveNode(slot)
}

コードを見ていただいたらわかる通り、

  • RouteByLatencyがtrueの場合
  • RouteRandomlyがtrueの場合
  • ReadOnlyだけがtrueの場合

とで挙動が異なります。slotSlaveNodeの処理はすでに紹介しました。では、slotRandomNodeのコードを見てみましょう(ソースコード)。

func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
	nodes := c.slotNodes(slot)
	if len(nodes) == 0 {
		return c.nodes.Random()
	}
	if len(nodes) == 1 {
		return nodes[0], nil
	}
	randomNodes := rand.Perm(len(nodes))
	for _, idx := range randomNodes {
		if node := nodes[idx]; !node.Failing() {
			return node, nil
		}
	}
	return nodes[randomNodes[0]], nil
}

注目すべきところはrandomNodes := rand.Perm(len(nodes))以降の処理ですね。
nodes[0]にマスターノードに関する情報が入っている前提で実装されていることはこれまでにお伝えしてきましたが、slotRandomNodeの処理においてはインデックス0、すなわちマスターノードを特別扱いしていないのです。
すなわちRouteRandomlyをtrueにした場合、マスターノード含めて全ての対象となるノードの中からランダムにコマンドを発行するノードが選択されます。
同様にslotClosestNodeについてもマスターノードが選ばれるロジックになっています。

まとめると、レプリカが優先的に選択されるのはslotSlaveNodeが実行される「ReadOnlyだけがtrueの場合」だけなのです。
ドキュメントを見ると

	// Enables read-only commands on slave nodes.
	ReadOnly bool
	// Allows routing read-only commands to the closest master or slave node.
	// It automatically enables ReadOnly.
	RouteByLatency bool
	// Allows routing read-only commands to the random master or slave node.
	// It automatically enables ReadOnly.
	RouteRandomly bool

とあり、確かにRouteByLatencyRouteRandomlyがマスターノードにもコマンドを飛ばすことは読み取れるのですが、ReadOnlyだけをtrueにしたときにレプリカノードだけにコマンドを飛ばせるようにできることをこれだけから読み取るのはさすがに難しくないか…?と思いました。

ClusterSlots

newClusterStateの引数として渡す、どのスロット範囲をどのノードが対応しているか、ということを表すために使われているフィールドでした。これを設定すると、本来内部でCLUSTER SLOTSコマンドを実行してスロット情報を取得するところをカスタマイズできます。

	// Optional function that returns cluster slots information.
	// It is useful to manually create cluster of standalone Redis servers
	// and load-balance read/write operations between master and slaves.
	// It can use service like ZooKeeper to maintain configuration information
	// and Cluster.ReloadState to manually trigger state reloading.
	ClusterSlots func(context.Context) ([]ClusterSlot, error)

GoDocのコメントに書いてある通り、例えば今までの弊社のようなスタンドアロンのRedisインスタンス群で自前クラスターを組んでいるようなケースに対してもClusterClientを活用することができます。自前でノードの選択アルゴリズムを実装する必要がなくなるので便利ですね。また、Valkeyクラスター(あるいはRedisクラスター)を使った開発中のよくあるユースケースとして本番環境では実際のクラスターを使いたいけれど、ローカルの自動テストなどではValkey一台だけを立てたい、みたいな事例を散見します。すでに世の中に出ている記事などを拝見すると、ラッパーを作ったり、あるいはgo-redisの場合はUniversalClientを使うというケースが多いようです。UniversalClientを使えるのであれば使ってしまうのがてっとり早いとは思いますが、何らかの事情によりClusterClientのまま使いたいという場合にはClusterOptions.ClusterSlotsに以下のような関数を設定することによりClusterClientのまま扱うこともできちゃったりします。

func clusterSlots(ctx context.Context) ([]redis.ClusterSlot, error) {
	return []redis.ClusterSlot{
		{
			Start: 0,
			End: 16383, // スロットの最大値
			Nodes: []ClusterNode{{Addr: "127.0.0.1:6379"}},
		},
	}
}

この手法は実際go-redis自身のテストコードにも使われています(参考)。

MinRetryBackoff、MaxRetryBackoff

clusterNodeが保持するClientの初期化時にも使われるのですが、先に見たようにClusterClientprocessの中のループにおける指数バックオフにも使われます。具体的なコードは以下の通りです。

func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}

個人的にはClientの設定に使われている値をそのままClusterClientの内部でも使うということに違和感がありましたが、MinRetryBackoffなどを流用しても動作上も支障は確かにないのかもなーということで納得しました。

Addrs

CLUSTER SLOTSを呼び出すための初期シードとなるためのアドレスです。なので、クラスターの全ノードのアドレスを格納していなくても大丈夫ですが、指定したアドレスのノードがダウンしている可能性も鑑みて複数個設定するのが望ましいと思われます。

MaxRedirects

processでみましたね。最大のリダイレクトの回数、もといClusterClient.process内のループを最大何回繰り返すかを決めるための設定値です。

NewClient

clusterNodeが保持するClientを作成するために使うことができるフィールドです。例えば公式ガイドにあるように、インスタンスごとに認証情報が異なっている場合などに活用できます。NewClientフィールドを1から自作することはほぼなくて、基本的にはNewClient関数を内部で呼び出す前にOptionsの値を改変するために使われるケースが多いのではないでしょうか?弊社では自動テストを実行する際、Docker ComposeでValkeyクラスターを立ち上げているのですが、CLUSTER SLOTSコマンドで返ってくるアドレスをそのまま使ってしまうとホスト上で動いているGoアプリケーションから接続できなくなってしまうのでアドレスを変換する処理に使っています。

その他フィールド

すごい雑にまとめてしまいましたが、具体的には以下のフィールドです。

- ClientName
- Dialer
- OnConnect
- Protocol
- Username
- Password
- CredentialsProvider
- CredentialsProviderContext
- StreamingCredentialsProvider
- MaxRetries
- MinRetryBackoff
- MaxRetryBackoff
- DialTimeout
- ReadTimeout
- WriteTimeout 
- PoolFIFO
- PoolSize
- PoolTimeout
- MinIdleConns
- MaxIdleConns
- MaxActiveConns
- ConnMaxIdleTime
- ConnMaxLifetime
- ReadBufferSize
- WriteBufferSize
- TLSConfig
- DisableIndentity
- IdentitySuffix
- UnstableResp3
- ContextTimeoutEnabled

以上のフィールドはclusterNodeが保持するClientを生成する際にClientOptionsとして渡されるものです(ContextTimeoutEnabledについてはClusterClientの処理においても使われていますがトランザクションやパイプライン関係のものなのでもあるので今回は割愛します)。これがわかれば、例えばGoDoc内のClusterOptionsPoolSizeに関して、// applies per cluster node and not for the whole cluster というコメントがついている意味がすんなり理解できるのではないでしょうか。

おわりに

今回説明した内容以外にも、例えばクラスターの重要な挙動としてレプリカノード上でコマンドを実行するためには事前にREADONLYを発行しないといけずその辺の処理をどこでやっているかとか、他にも色々説明したいことはあるのですがそれを説明するには余白と僕の根性が足りず…orzただ今回解説した骨組みがわかっていれば、前よりは比較的楽にコードを辿れるのではないでしょうか?そうなっていただければ頑張って記事を書いた身としては幸いです。

なお、本記事はREALITYという会社として初めて技術記事をnoteではなくZennに投稿しようという試みのもと投稿された記事でもあります。
noteとはまた違った読者の方々にも届くと嬉しいなー。

REALITYでは現在サーバーエンジニアを募集しています。

https://hrmos.co/pages/1218800560317673472/jobs/0010170

Discussion