KubernetesのCustom Controllerの仕組みをコードレベルで理解する
はじめに
kubebuilderを使ってCustom Controllerを作ってみたのですが、内部的な仕組みが全く理解できなかったのでコードを読みながら理解していきたいと思います。
Custom Controllerのアーキテクチャ
Kubernetesが公開しているsample-controllerのリポジトリには以下の図が公開されています。
Ref: https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
簡易的なController
今回はIndexerを提供しない、NewInformerを中心にコードを読んでいきたいと思います。以下にサンプルコードを置いておきます。
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
string(v1.ResourcePods),
v1.NamespaceAll,
fields.Everything(),
)
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
_, controller := cache.NewInformer(watchlist, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println(key) // default/nginx-cd55c47f5-hrgsw
queue.Add(key) // workqueueにenqueueする
fmt.Println(queue.Get()) // default/nginx-cd55c47f5-hrgsw false
}
},
})
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(stopCh)
select {}
Controllerの起動
Controllerは起動時に以下の2点を行います。
-
Reflectorが起動時はList APIで既存リソースの情報をDelta FIFO QueueにEnqueueする。その後は、Watch APIで該当リソースのイベントを検知し、Delta FIFO QueueにEnqueueする。
-
Delta FIFO QueueにEnqueueされたら、オブジェクトをPopしてResourceEventHandler関数を実行する(この時、一般的なCustom Controllerはworkqueueにenqueueする)
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// List & Watch -> Enqueue to Delta FIFO Queue
wg.StartWithChannel(stopCh, r.Run)
// Dequeue -> Resource Event Handlers
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
Watch & List
ListAndWatchでは、オブジェクトの取得と変更を監視します。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
err := r.list(stopCh)
if err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup()
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for {
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
retry.After(err)
if err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
Delta FIFO Queue
リソースの変更を変更を検知するとDelta FIFO QueueへEnqueueします。例えば、リソースが作成されたことを検知した場合はDeltaType = "Added"
を付与してEnqueueします。
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
Informer
一方、InformerはDelta FIFO QueueへEnqueueされるのを待ちます。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
Delta FIFO QueueへEnqueueされるとオブジェクトをPopし、ResourceEventHandler関数を実行します。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// ResourceEventHandler関数を実行する
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
Resource Event Handlers
ResourceEventHandlerではEnqueueされたオブジェクトが作成、更新、削除かどうかを判断して実行します。
func processDeltas(
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
おわりに
Kubernetes公式のsample-controllerとclient-goを読み込むことでCustom Controllerが内部でどのように動いているか理解することができました。OSSのコードを読むことは大事ですね!
Discussion