🐔

SharedIndexInformer は何が Shared で何が Index なのか?

2024/12/23に公開

はじめに

client-goSharedIndexInformer をぼちぼち触っていてずっと何が Shared で何が Index なのか分からなくてモヤモヤしていたのだが、最近ようやく分かった(気がする)ので改めて調べてメモ。

なお、この内容は client-go v0.32.0 をベースに、controller-runtime に関しては v0.19.3 をベースに調査している。

結論

  • 単一のインスタンス(オブジェクト)に複数のイベントハンドラを登録できる(つまり informer のインスタンスを共有できる)から Shared
  • 単一インスタンスを簡単に共有できるようにするためにファクトリオブジェクトが用意されている。
  • informer が保持しているオブジェクトキャッシュに、検索用の索引が付いているので Index。(普通は namespace の索引が付いている)
  • Shared じゃない informer もある。ただし、名前は Informer ではなく Controller

複数のイベントハンドラを登録できる

当たり前だと思っていたが、そもそも複数のイベントハンドラを登録できること自体が Shared たる所以のようだ[1]

複数イベントハンドラ.go
package main

import (
    "context"
    "time"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func multi(ctx context.Context, client kubernetes.Interface,
    handlers ...cache.ResourceEventHandler) {
    // ファクトリを作成する(factory は informers.SharedInformerFactory 型)
    factory := informers.NewSharedInformerFactoryWithOptions(client, 0)

    // informer を作成する(informer は cache.SharedIndexInformer 型)
    informer := factory.Core().V1().Pods().Informer()

    // ファクトリ配下の全ての informer を開始する(10 秒で停止させる)
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()
    factory.Start(ctx.Done())

    // 全てのイベントハンドラを追加する
    for _, handler := range handlers {
        informer.AddEventHandler(handler)
    }

    // ファクトリ配下の全ての informer が停止するまで待つ
    factory.Shutdown()
}

当たり前のように informer 開始してからイベントハンドラを登録しているが、これが正しく動くのも Shared の利点のようだ。(どんなタイミングで登録しても、ちゃんと最初に全てのオブジェクトに対して OnAddisInInitialList=true で呼ばれる)

また、1 つのイベントハンドラの処理が遅いからと言ってその他のイベントハンドラの呼び出されるタイミングが遅くなると言ったこともないようだ。よくできてるな。だからと言ってイベントハンドラを長時間ブロックしてしまうとそのイベントハンドラの次の呼び出しは遅れる可能性があるのでお勧めしないが。

resync の意味

ところで、引数に指定してある 0 はデフォルトの resync 間隔を指定するものだが、今まで resync の意味を間違って考えていた。

resync の間隔でサーバから一覧を取得してイベントハンドラにイベントを送ってくれるのかと思っていたが[2]、全然違った。

resync の間隔でinformer 内部に持っているオブジェクトのキャッシュに基づいてイベントハンドラにイベントを送ってくれるだけだった。

そんなん分からんよ、と思ったら SharedInformerAddEventHandlerWithResyncPeriod メソッドの説明のところにこんな風に書いてあった。

The resync operation consists of delivering to the handler an update notification for every object in the informer's local cache; it does not add any interactions with the authoritative storage.

日本語訳 by Google こんにゃく
再同期操作は、インフォーマーのローカル キャッシュ内のすべてのオブジェクトの更新通知をハンドラーに配信することから構成されます。権限のあるストレージとのやり取りは追加されません。

う~ん、確かにそれっぽい事は書いてある。でも知らなければ「権限のあるストレージ」って何やねんってなるな[3]。てかそこまでドキュメント読み込まないよね?[4]

てか、そもそも AddEventHandlerWithResyncPeriod ってのがあってイベントハンドラ毎に resync 間隔指定できることすら気付いてなかった…。確かに NewSharedInformerFactoryWithOptions の引数の名前には defaultResync って「デフォルト」が付いてるんだけど、これは informer の resync 間隔のデフォルトって意味であって、イベントハンドラの resync 間隔のデフォルトって意味じゃないよね?

あれ、じゃあ resync ってイベントハンドラ毎に起こるもんなん?ってことは、イベントハンドラ毎に指定したらそれぞれその間隔で resync されんの?と思ったら、これも NewSharedIndexInformerWithOptions のドキュメントに書いてあった。いやいや、普通そんな関数使わなくない?[5]

The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each handler that with a non-zero requested resync period, whether added before or after the informer starts, the nominal resync period is the requested resync period rounded up to a multiple of the informer's resync checking period. Such an informer's resync checking period is established when the informer starts running, and is the maximum of (a) the minimum of the resync periods requested before the informer starts and the options.ResyncPeriod given here and (b) the constant minimumResyncPeriod defined in this file.

これも Google こんにゃくで訳そうとしたらちょっと訳が微妙だった。DeepL でも微妙だったので、もしかしたら原文が微妙なのかもしれない。

  1. options.ResyncPeriod が 0 のハンドラは resync しない
  2. options.ResyncPeriod が 0 以外のハンドラは、options.ResyncPeriod の値を「resync チェック間隔」の整数倍に切り上げた間隔で resync する
  3. 「resync チェック間隔」というのは以下のいずれかの大きい方
    (a) informer 開始前に追加されたハンドラに指定された options.ResyncPeriod の最小値
    (b) ソースファイルの定数 minimumResyncPeriod(現状 1 秒)

なるほどね?[6]

なお、ここには書かれていないがそもそも informer の resync 間隔が 0 の場合はイベントハンドラ追加時に resync 間隔を指定しても resync されない。

あと、これは普通にドキュメントに書かれているが、AddEventHandler を使った場合の resync 間隔は informer の resync 間隔になる。

インスタンスを共有しやすくするためにファクトリオブジェクトがある

ファクトリオブジェクトがあるって言うかファクトリオブジェクトを使う方がメジャーな気がするが[7]、同じファクトリからであれば何度 informer を取得しても同じインスタンスが返される。

インスタンス共有.go
package main

import (
    "context"
    "fmt"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
)

func share(_ context.Context, client kubernetes.Interface) {
    // ファクトリを作成する(factory は informers.SharedInformerFactory 型)
    factory := informers.NewSharedInformerFactoryWithOptions(client, 0)

    // informer を作成する(informer1 は cache.SharedIndexInformer 型)
    informer1 := factory.Core().V1().Pods().Informer()

    // もう一つ informer を作成する(informer2 も cache.SharedIndexInformer 型)
    informer2 := factory.Core().V1().Pods().Informer()

    // インスタンスを比較してみる
    fmt.Println(informer1 == informer2) // true が表示される
}

前述のようにイベントハンドラを複数追加できるので、基本的にはインスタンスを共有しても問題は発生しない。(もちろん個別に削除もできる)

むしろ、informer は前述のように内部にオブジェクト全体をキャッシュしているので、複数のインスタンスを作成するとクライアントのメモリをムダに圧迫する。また、Watch のためにサーバへの接続も保持し続けるので、サーバの負荷も増える。

可能であれば factory を使用するなどしてインスタンスを共有した方が良さそうだ。

ファクトリ作成時のオプション

ファクトリ作成は NewSharedInformerFactory と言う関数もあるが、こちらは単にオプション引数が指定できないだけなので、常に NewSharedInformerFactoryWithOptions を使っておけば問題ない[8]。指定できるオプションは SharedInformerOption 型で、informers パッケージに Withなんちゃら って言う名前で 4 つほどある。

  1. WithCustomResyncConfig:対象のリソース型毎に resync 時間を設定する
  2. WithTweakListOptions:サーバからリソースを取得する際に使用する ListOptions を変更する関数を指定する
  3. WithNamespace:リソース一覧の対象 namespace を指定する
  4. WithTransform:イベントハンドラに渡す際にオブジェクトの変換を行う関数を指定する

みんな大好き[9] Functional Option Pattern なので一覧性に乏しいがドキュメントが正しければこれで全部だ。

ちなみに、WithTweakListOptionsLabelSelectorFieldSelector を設定したり、WithNamespace で特定の namespace を設定したりして取得範囲を絞れば、サーバとの通信量もキャッシュ容量も減らすことができる。

一方で、WithTransform でオブジェクトをスリム化するとキャッシュ容量を減らすことはできるが、当たり前だがサーバとの通信量を減らすことはできない。metadata.managedFields とか不要な事が多いわりにデカいので[10]、サーバ側で削除してくれると嬉しいんだが…

なお、これらのオプションは WithCustomResyncConfig を除いて全てのリソース型の informer に効いてくるので注意が必要だ。リソース型毎に異なる指定をしたければ、ファクトリ自体をそれぞれ個別に作成するしかない[11]

informer のちょっと変わった作り方(GenericInformer 経由)

ファクトリを使った informer の作り方には、下記のような方法もある。

GenericInformer.go
package main

import (
    "context"
    "time"

    corev1 "k8s.io/api/core/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func generic(ctx context.Context, client kubernetes.Interface,
    handler cache.ResourceEventHandler) {
    // ファクトリを作成する(factory は informers.SharedInformerFactory 型)
    factory := informers.NewSharedInformerFactoryWithOptions(client, 0)

    // generic informer を作成する(genericInformer は informers.GenericInformer 型)
    genericInformer, err := factory.ForResource(
        corev1.SchemeGroupVersion.WithResource("pods"),
    )
    utilruntime.Must(err)

    // informer を取得する(informer は cache.SharedIndexInformer)
    informer := genericInformer.Informer()

    // ファクトリ配下の全ての informer を開始する(10 秒で停止させる)
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()
    factory.Start(ctx.Done())

    // イベントハンドラを追加する
    informer.AddEventHandler(handler)

    // ファクトリ配下の全ての informer が停止するまで待つ
    factory.Shutdown()
}

あんまりメジャーじゃない気がするが[12]ForResource メソッドを使うと GenericInformer 型のインスタンスが作成される。コイツだとエラーが返って来る可能性があるのでメソッドチェーン出来ないが[13]、普通の作り方で言うとコイツが factory.Core().V1().Pods() で返って来るモノに対応する。ちなみに普通のヤツは PodInformer とか言う型のようだ[14]

で、その後は普通と作り方と同様にコイツの Informer メソッドを呼び出すと SharedIndexInformer 型のインスタンスを取得することが出来る。

こんなの使って何かいいことあるの?と思わないでもないが、ForResource メソッドの引数型は schema.GroupVersionResource なので、gvr のスライスを作っておいていろんな informer を一括で作って使う、とかやろうとすると便利なのかもしれない。

一括作成.go
gvrs := []schema.GroupVersionResource{
    corev1.SchemeGroupVersion.WithResource("pods"),
    appsv1.SchemeGroupVersion.WithResource("deployments"),
    networkingv1.SchemeGroupVersion.WithResource("networkpolicies"),
}

informers := make(
    map[schema.GroupVersionResource]cache.SharedIndexInformer,
    len(gvrs),
)
for _, gvr := range gvrs {
    gi, _ := factory.ForResource(gvr)
    informers[gvr] = gi.Informer()
}

つまり Core().V1().Pods() とかハードコードせずに使える、と言う意味で Generic なのかな?と。知らんけど。

ちなみに、名前に Generic とか付いてるしコイツなら非標準のリソースの informer も作れるんじゃないかと淡い期待を抱いたりもしたが、残念ながらそんなことはなかった[15]。ぱっと見作れそうな雰囲気を醸し出しているくせに…[16]

ところで、さっきから "pods" とか "deployments" とか "networkpolicies" とかいった文字列リテラルが出てきていてるのだが、こいつらって client-go とか api のどっかに定義されていないもんなのだろうか?[17]正直文字列リテラルで直接指定するのは気持ちが悪い。

てか、そもそも KindResource を別の名前って言うか単数形と複数形にするのは何なん?そこに多大な労力をかけて[18]頑なに単数形と複数形を使い分ける必要ある?ヤツらのこだわりが日本語話者のオレには理解できない[19]

controller-runtime にもファクトリがある

controller-runtime を使っていると、Manager (より正確には Cluster) の GetCache メソッドで取得できるキャッシュクライアント ctrlcache.Cache でも informer を作れることに気付く。そしてこのキャッシュも同じようなファクトリになっているので、何度 informer を取得しても同じインスタンスが返される。

controller-runtime.go
package main

import (
    "context"
    "fmt"

    corev1 "k8s.io/api/core/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
)

func controller(ctx context.Context, mgr ctrl.Manager) {
    // キャッシュクライアントを取得する(cacheClient は ctrlcache.Cache 型)
    var cacheClient ctrlcache.Cache = mgr.GetCache()

    // informer を作成する(informer1 は ctrlcache.Informer 型)
    informer1, err := cacheClient.GetInformer(ctx, &corev1.Pod{})
    utilruntime.Must(err)

    // もう一つ informer を作成する(informer2 も ctrlcache.Informer 型)
    informer2, err := cacheClient.GetInformer(ctx, &corev1.Pod{})
    utilruntime.Must(err)

    // インスタンスを比較してみる
    fmt.Println(informer1 == informer2)     // true が表示される
}

しかし、なんでかは良く分からないが、これで返って来る informer は SharedIndexInformer 型ではなく controller-runtime 独自の ctrlcache.Informer 型である。SharedIndexInformer 型より使用できるメソッドも少ないので、混在して使うと混乱するかもしれない。

ちなみに、少なくともいまのところ実体は同一なので、SharedIndexInformer に型アサーションすれば成功する。だったら戻り値型をわざわざ違う型にしたのはなんでなんだぜ…[20]

controller-runtime では informer が勝手に起動される

controller-runtime では client-go の普通のクライアントである kubernetes.Interface(あるいは実体の*kubernetes.Clientset)ではなく、Manager(より正確には Cluster)の GetClient メソッドで取得できる controller-runtime 独自の ctrlclient.Client を使うことが多いと思う。

その場合、GetList メソッドを使うと対象のリソース型に対応する informer が勝手に起動され、メソッドの結果はその informer キャッシュから返される。これは、ctrlclient.Client じゃなくて ctrlcache.CacheGetList メソッドを使っても同じだ。

このあたりの挙動には注意が必要だが、回避する方法もいろいろあるようなので気力があればちょっと気合いを入れて調べてみたい。と思ったが、「真面目に理解するcontroller-runtime Cache」と言う記事にわりと細かい内容までしっかり書かれていた。もっと早く知りたかった…

ファクトリを使わなくても作れる

ファクトリを使う方がメジャーな気がするが(しつこい)、ファクトリを使わなくても作れる。

ファクトリ未使用.go
package main

import (
    "context"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    informerscorev1 "k8s.io/client-go/informers/core/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func direct(ctx context.Context, client kubernetes.Interface,
    handler cache.ResourceEventHandler) {
    // informer を作成する(informer は cache.SharedIndexInformer 型)
    informer := informerscorev1.NewPodInformer(
        client, metav1.NamespaceAll, 0,
        cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    )

    // イベントハンドラを追加する
    informer.AddEventHandler(handler)

    // informer を開始する(10 秒で停止させる)
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()
    informer.Run(ctx.Done())
}

リソース型毎にパッケージや関数が異なるのでちょっとめんどくさいが、自分でインスタンスを引き回すとかであればこっちでもいいかもしれない。

最後の cache.Indexers{...} とかいう変な(?)引数については後述。

ちなみに、ここでは NewPodInformer と言う関数を使っているが、ほぼ同じようなNewFilteredPodInformer と言う関数もあって、こちらは上の方に出てきた WithTweakListOptions に渡すものと同じものが引数の最後に追加されている。

実はファクトリの informer 作成メソッドはテンプレートメソッドになっていて、最終的にはこの NewFilteredPodInformer が呼びだされるのだ。そして、渡される引数はファクトリ作成時に指定されたものだった。どおりで NewSharedInformerFactoryWithOptions と引数が似通っているわけだ。

なお、informer.Run はファクトリの Start と違って引数のチャネルがクローズされるまで返ってこないので注意が必要だ。普通は別 goroutine で実行することになるだろう。

オブジェクトキャッシュに、検索用の索引が付いてる

何度か触れたように、informer は内部にオブジェクトキャッシュを持っている。この内部のキャッシュは GetStore メソッド、あるいは GetIndexer メソッドで簡単に取得できる。

キャッシュ取得.go
// informer 内部のキャッシュを取得する(cache.Store インタフェース)
store := informer.GetStore()

// informer 内部のキャッシュを取得する(cache.Indexer インタフェース)
indexer := informer.GetIndexer()

見ての通り、戻り値型はそれぞれ Store、あるいは Indexer だ。

どちらでも取得できるモノの実体は同じなのだが、じゃあ何で 2 つもあるかと言うと GetStoreSharedInformer のメソッド、GetIndexerSharedIndexInformer のメソッド、となっているからだ。そして、Indexer の方は Store に対して索引関連のメソッドが追加されたモノとなっているのだ。

期せずして Index の付いてない SharedInformer が出てきたが、つまり、SharedIndexInformerIndex たる所以は、この Indexer にある、ということだったのだ[21]

Store は何気に便利

そもそもこのキャッシュはあんまりメジャーじゃない気がするが[22]、実は何気にお手軽で便利だったりする。よく使いそうなものの使い方を以下に示す。

キャッシュを使う.go
// キャッシュにある全オブジェクトを取得する
var items []any = store.List()

// キャッシュにある全オブジェクトのキーを取得する
var keys []string = store.ListKeys()

// キャッシュから指定したキーを持つオブジェクトを取得する
var item any
var exists bool
var err error
item, exists, err = store.GetByKey("default/nginx")

ジェネリクスが出来る前からあるのでオブジェクトの型が any なのが玉に瑕だが、informer で使ってる分には特定のリソース型以外は入ってこないのでカジュアルにチェック無しで型アサーションしてしまってもいいんじゃないかな?知らんけど。

ちなみに、更新系のメソッド(AddUpdateDeleteReplace)もあるが、informer から取得したヤツで使ってはいけない。下手をするとクラスタが壊れるので。ウソです。クラスタは壊れないが informer は挙動不審になる。

Indexer で索引を使う

せっかくなので、Indexer で索引も使ってみる。ここで言う「索引」はデータベースシステムで使われているような索引(インデックス)とだいたい同じようなモノだと思っておけばよい。

Indexer では複数の索引を付けることができるので、それぞれの索引に名前を付けて区別することになっている。

ファクトリで普通に作った informer の保持するキャッシュの場合、デフォルトで namespace の索引が付いている。informer 自体の動作に namespace の索引が必要な訳ではないのだが、なぜかもれなく付いてくる。

Indexer で追加されているメソッドでよく使いそうなものの使い方を、デフォルトで付いてる namespace の索引の例で以下に示す。

キャッシュの索引を使う.go
// キャッシュから指定した namespace の全オブジェクトを取得する
var items []any
var err error
items, err = indexer.Index(cache.NamespaceIndex, "default")

// キャッシュから指定した namespace の全オブジェクトのキーを取得する
var keys []string
keys, err = indexer.IndexKeys(cache.NamespaceIndex, "default")

// キャッシュにある全ての namespace を取得する
var namespaces []string = indexer.ListIndexFuncValues(cache.NamespaceIndex)

最初の引数 cache.NamespaceIndex がデフォルトで付いてる namespace 用索引の名前(実体は "namespace" と言う単なる文字列)だ。ファクトリがなくても作れるにあった変な引数 cache.Indexers{...} は、ファクトリで作った時と同じように namespace 用索引を付けるためのモノだ。

ファクトリを使わずに自前で作る場合には無くてもいいのだが、client-go のいくつかの関数・メソッドでごくまれに勝手に使われてたりするので、自信が無ければ付けておいた方がいいかしもれない。

なお、最後の ListIndexFuncValues で返って来る値には注意が必要だ。これで返って来るのはあくまでもキャッシュに存在するオブジェクトから算出した値の一覧なので、たとえば上記の例ではキャッシュに存在する Pod の所属する namespace の一覧が取得できるだけであって、実際の namespace 一覧が取得できるわけじゃない。

Indexer に索引を追加する

IndexerAddIndexers メソッドで後から索引を追加することもできる。

ノード名を索引として追加する例を以下に示す。

索引を追加.go
informer.GetIndexer().AddIndexers(cache.Indexers{
    "nodename": func(obj any) ([]string, error) {
        pod, ok := obj.(*corev1.Pod)
        if !ok {
            return nil, fmt.Errorf("*corev1.Pod 型じゃないよ!!!1!:%T", obj)
        }
        return []string{pod.Spec.NodeName}, nil
    },
})

見れば分かるかもしれないが、索引は「索引の名前」(この例の場合 "nodename")と「オブジェクトから索引の値を算出する関数」(func... の部分)のペアである。

当初不思議に思ったのが、関数の戻り値型が []string であることで、つまりオブジェクトと索引の値は「多」対「多」の関係になるということだ。これは普通のデータベースの索引とはちょっと違うと思うが、まぁ Indexer はそういうもんだと言う事で。

たとえばラベルのキーなんかを索引にするのが例になるだろうか?

ラベルのキー.go
informer.GetIndexer().AddIndexers(cache.Indexers{
    "labelkeys": func(obj any) ([]string, error) {
        pod, ok := obj.(*corev1.Pod)
        if !ok {
            return nil, fmt.Errorf("*corev1.Pod 型じゃないってばよ!!!1!:%T", obj)
        }
        slices.Collect(maps.Keys(pod.Labels)), nil
    },
})

1 つの Pod が複数のラベルキーを持つことは普通にあるし、複数の Pod が同じラベルキーを持つことも普通にあるので、意外といい例じゃない?[23]

ちなみに、以前は informer が開始されてからは索引を追加できなかったが、1.30(client-go のバージョンで言うと 0.30)からできるようになったようだ。

IndexerIndexers

Indexer と言う型は「Store インタフェースに索引関連のメソッドを追加したインタフェース」なのに、Indexers と言う型は「『索引の名前』と『オブジェクトから索引の値を算出する関数』の map」なのは、質の悪い嫌がらせなんじゃないかと思う[24]

Lister ってのもある

ところで、factory で informer を作成する際のトドメのメソッドは Informer だが、これと並んで Lister と言うメソッドもある。コイツを使うと名前でオブジェクトを取得できたりラベルセレクタ(labels.Selector)で検索できたりするのだが、正直使ったことなかったし何なら Lister メソッドの存在すら記憶になかった[25]

で、実はコイツは informer の内部キャッシュ、つまり Indexer を対象に処理するものだった。そうだったのか…

そう考えると何気に便利なのかもしれない。

Lister の使い方

Lister の使い方を示す例をざっくり作ってみた。これで何となく分かると思う。

lister.go
package main

import (
    "context"
    "fmt"

    "k8s.io/apimachinery/pkg/labels"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    listerscorev1 "k8s.io/client-go/listers/core/v1"
)

func lister(ctx context.Context, client kubernetes.Interface) {
    // ファクトリを作成する(factory は informers.SharedInformerFactory 型)
    factory := informers.NewSharedInformerFactoryWithOptions(client, 0)

    // lister を作成する(lister は listerscorev1.PodLister 型)
    var lister listerscorev1.PodLister = factory.Core().V1().Pods().Lister()

    // ファクトリ配下の全ての informer を開始する
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    factory.Start(ctx.Done())

    // 全ての informer が sync するまで待つ
    for key, isSynced := range factory.WaitForCacheSync(ctx.Done()) {
        if !isSynced {
            panic(fmt.Errorf("%v が sync できなかった", key))
        }
    }

    // ラベルセレクタを作成する(selector は labels.Selector 型)
    selector, err := labels.Parse("tier=control-plane")
    utilruntime.Must(err)

    // ラベルセレクタで検索する
    pods, err := lister.List(selector)
    utilruntime.Must(err)
    fmt.Printf("list の結果(全 namespace):%s\n", selector)
    for _, pod := range pods {
        fmt.Printf("%v/%v\n", pod.Namespace, pod.Name)
    }

    // kube-system 配下の lister を取得する
    // (nsLister は listercorev1.PodNamespaceLister 型)
    nsLister := lister.Pods("kube-system")

    // 名前で取得する
    pod, err := nsLister.Get("kube-apiserver-kind-control-plane")
    utilruntime.Must(err)
    fmt.Printf("Get の結果:%v/%v\n", pod.Namespace, pod.Name)

    // namespace 配下をラベルセレクタで検索する
    nsPods, err := lister.List(selector)
    utilruntime.Must(err)
    fmt.Printf("list の結果(kube-system):%s\n", selector)
    for _, pod := range nsPods {
        fmt.Printf("%v/%v\n", pod.Namespace, pod.Name)
    }

    // ファクトリ配下の全ての informer を停止させる
    cancel()
    factory.Shutdown()

ちゃんと戻り値型が *corev1.Pod とかなのが地味にいいよね?

ちなみに、そもそも informer を開始してないとキャッシュの中身が空なので機能しないし、開始してても sync が終わってない(SharedInformerHasSynced メソッドが true になってない)とキャッシュの中身が中途半端なのでおかしなことになる。

Lister は informer から自力で作れる

上記ではファクトリから Lister を取得したが、informer さえあれば(より正確には indexer さえあれば)Lister を自作できる。

informerからListerを作る.go
lister := listerscorev1.NewPodLister(informer.GetIndexer())

ちなみに、namespace で絞ったヤツ(PodNamespaceLister とか)の List メソッドは、基となる indexer に namespace の索引が付いてないと処理が遅くなる。付いてなくても死なずに遅くなるだけなのはいいのか悪いのか…

GenericLister とかいうのもある

ファクトリから informer を作成する方法に GenericInformer ってのがあったが、これにも Lister メソッドってのがあって、それを使うと GenericLister と言う Lister の仲間が作成される。また、indexer から自前で作成することもできる(NewGenericLister

どのへんが Generic かと言うと、戻り値型が *corev1.Pod とかじゃなくて runtime.Object になっているあたりと、NewGenericLister 関数のパッケージが各リソース型それぞれの階層じゃなくて k8s.io/client-go/tools/cache にあるあたりかな?[26]

が、いずれにせよあんまり使い道は無いかもしれない。と言うのも runtime.Object だとイマイチ使い勝手が良くないのと、Generic じゃない Lister も実体は ResourceIndexer であり、ジェネリクスを使って作られているので、そこにある New 関数で作れるから。

ただし、ジェネリクスと言う事は実行時に型を選択することはできないので、真に動的につくるんであれば GenericLister に頼ってみるのもいいのかもしれない。

Shared じゃない Informer もある(Controller

これもあんまりメジャーじゃない気がするが[27]Shared じゃない Informer もある。NewInformerWithOptions 関数を使えば作れるのだが引数がちょっと面倒だ。

Sharedじゃないinformer.go
package main

import (
    "context"
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func informer(ctx context.Context, client kubernetes.Interface,
    handler cache.ResourceEventHandler) {
    // 10 秒でタイムアウトするコンテキストを作っておく
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    // informer を作成する(store は cache.Store、controller は cache.Controller 型)
    store, controller := cache.NewInformerWithOptions(cache.InformerOptions{
        ListerWatcher: &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                return client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                return client.CoreV1().Pods(metav1.NamespaceAll).Watch(ctx, options)
            },
        },
        ObjectType: &corev1.Pod{},
        Indexers:   cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
        Handler:    handler,
    })

    // informer を開始する(10 秒で停止させる)
    controller.Run(ctx.Done())

    // namespace の一覧を出力する
    indexer := store.(cache.Indexer)
    fmt.Println(indexer.ListIndexFuncValues(cache.NamespaceIndex))
}

引数は InformerOptions 一つだけだ。

InformerOptions の要素

InformerOptions で指定してある各要素は以下の通りだ。

ListerWatcher

NewInformerWithOptions がメンドクサイ最大の要因は ListerWatcher で、型はまんま cache.ListerWatcher インタフェースだがこれがかなり鬱陶しい。が、よくよく見れば大したことはしていなくて、名前の通り List をする関数と Watch をする関数を定義しているだけだ。

実は NewListWatchFromClient とか NewFilteredListWatchFromClient とか言う関数もあってもうちょっとだけお手軽に ListerWatcher のオブジェクトを作れないわけではない。

お手軽ListerWatcher.go
// お手軽 ListerWatcher その1
lw1 := cache.NewListWatchFromClient(
    client.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.Everything(),
)

// お手軽 ListerWatcher その2
lw2 := cache.NewFilteredListWatchFromClient(
    client.CoreV1().RESTClient(), "pods", metav1.NamespaceAll,
    func(options *metav1.ListOptions) {},
)

だが、この方法で作った ListerWatcher は Timeout の設定が微妙だったり Protocol Buffers 使ってくれなかったり、更にはここでもまた "pods" とか使わなきゃいけないので、自前で定義した方が良さそうな気がする。鬱陶しくはあるけどそんなにムズい内容でもないしね。

ところで、何で List と Watch の 2 つが必要かと言えば List で一覧を取得してから Watch でその後の変更を監視する、とやっているからだ。これは kubectl コマンドでも --watch-only を指定すると変更差分だけが出てきて最初の一覧は出てこないことからもそれなりに知られているんじゃないかと思う。

しかし、最近の kubernetes には Watch だけで最初の一覧取得とその後の変更監視をできるような機能が追加されていて、client-go でも環境変数を KUBE_FEATURE_WatchListClient=1 のように設定しておくとその機能を使うことが出来る。その場合、ListFuncnil でも動く[28]

完全に余談だが、KUBE_FEATURE_WatchListClient=1 を設定すると List も内部で Watch を使用して取得するようになる。その方がサーバ側のメモリ負荷が高くならないらしい。

ObjectType

ObjectType は Watch の結果で送られてくるイベントの対象オブジェクトが指定された型かどうかをチェックするためのもので、指定しなくても通常は問題ない。問題無いが、コイツを指定すると万一違うオブジェクトが送られてきた場合にエラーを吐いてイベントを無視してくれるので、念のため指定しておいても良さそうだ。

なお、List の結果の方はチェックしてくれない。何となく片手落ちな気がしないでもないが、何か理由でもあるのだろうか?

Indexer

Indexers はいつものヤツなので、これも必要なければ指定しなくても問題ない。

Handler

最後が Handler だが、これがココにあるのが Shared じゃない理由だ。つまり Shared じゃない informer はハンドラを最初に指定する必要があって、後から追加とかできない。もちろん AddEventHandler メソッドも無い。

戻り値型が Indexer じゃなくて Store

NewInformerWithOptionsController 作ると、漏れなく Store が付いてくる。付いてくるのはいいんだがこれが Indexer じゃなくて Store になってる。実体は一緒なので型アサーションすればいいっちゃあいいんだが、ちょっとどうなのよ、と思わなくもない。

実は NewIndexerInformer とか NewTransformingIndexerInformer とか言う関数もあるんだが、コイツら軒並み Deprecated に指定されててあんまり使いたくない感じだ。

NewIndexInformerWithOptions 作っておいてくれんかなぁ…

SharedIndexInformer の陰には Controller がいる

これは完全に余談だが、調べてみたら実は SharedIndexInformerController のイベントハンドラだった。

何を言ってるのかわからね~と思うがおれも何をしてるのかわからなかった。頭がどうにかなりそうだった。

つまり、SharedIndexInformerController から見てイベントハンドラとして振舞っていて、SharedIndexInformer はそのイベントハンドラが呼ばれると自分に登録されている外部のイベントハンドラたちを呼びまくってる、と言う事だ。SharedIndexInformer がイベントを送られた時にごにょごにょやってくれてるおかげで我々はイベントハンドラの追加削除をフリーダムにできるわけだ。

SharedIndexInformer すげぇな[29]

が、逆に言えばイベントハンドラの追加削除を無秩序にやる必要が無ければ、Controllerを直接使った方が効率的ではあるわけだ。

なるほどね~

終わりに

SharedIndexInformerSharedIndex の理由を調べてみたら思いのほか深くまで見る羽目になったが、それ相応の成果は得られたように思う。

この記事が、オレのように informer の深みにハマった皆さんがの助けになれば幸いである。

それでは、良い client-go ライフを!

脚注
  1. もしかしたらオレの思い違いかもしれない。 ↩︎

  2. こうだと思ってた人おらん?オレだけ? ↩︎

  3. 多分これがサーバの事だと思うんだけど… ↩︎

  4. 読み込まないのはオレだけかもしれない… ↩︎

  5. 個人の感想です。 ↩︎

  6. 良く分かってない時の顔。 ↩︎

  7. 個人の感想です。 ↩︎

  8. 名前が長いのでオプションを指定しないのであれば NewSharedInformerFactory でもよいが。てか、オプション指定しなけりゃいいだけなので分けて用意する必要なくね?歴史的経緯かな? ↩︎

  9. オレは嫌いだが。 ↩︎

  10. コイツ Protocol Buffers でもフィールド名とかがそのまま入って来るのでちっさくならないくてウザい… ↩︎

  11. 実は同じファクトリ使って無理やり変える方法も無いわけではないが… ↩︎

  12. 気のせいかもしれない。 ↩︎

  13. こういうのを愚直に if err != nil でチェックしていくのが Go らしい書き方なのだろうが、チリョクもワンリョクも足りないので正直鬱陶しいと感じてしまう… ↩︎

  14. PodInformerGenericInformerInformer と付いてるくせに informer じゃないワナ。名前がややこし過ぎませんかね。 ↩︎

  15. ソースを見ると分かるが gvr による巨大な switch 文だった ↩︎

  16. 個人の感想です。 ↩︎

  17. 実は "pods" なら "k8s.io/api/core/v1" 配下に ResourcePods とかを見つけることはできたが、たまたま一緒なだけで何か用途が違うっぽいようなので使わなかった… ↩︎

  18. 個人の感想です。 ↩︎

  19. 主語がデカい。 ↩︎

  20. 多分 controller-runtime にコントロールされてるから下手なメソッド触んじゃねぇぞ、と言う意思表明なんじゃないかと思う。知らんけど。 ↩︎

  21. 多分。 ↩︎

  22. 気のせいかもしれない。 ↩︎

  23. 自画自賛 ↩︎

  24. 個人の感想です。 ↩︎

  25. ぶっちゃけコイツ影薄くないですかね? ↩︎

  26. もしかしたらオレの思い違いかもしれない。 ↩︎

  27. 気のせいかもしれない。 ↩︎

  28. お勧めはしないが… ↩︎

  29. Controller だって十分すごい。 ↩︎

Discussion