🦉

k8sのコントローラはどのようにリソース変更イベントを伝搬しているのか

2023/05/10に公開

はじめに

本記事はKubernetesのコントローラの解説記事のPart3になる。
解説記事では、下記3パートに分けてKubernetesのコントローラについて解説している。

  1. 各コントローラはどのようにリソースを更新しているのか
  2. どのようにリソースの変更を検知しているのか
  3. どのようにリソースの変更を各コントローラに送信しているのか

Part1,2では、「各コントローラはどのようにリソースを更新しているのか」、「どのようにリソースの変更を検知しているのか」を解説している。もしこれらをまだ読んでいない&コントローラの全体像を把握したい方がいれば、そちらを先に読むことをお勧めする。

https://zenn.dev/x_color/articles/cfcb3e46ce0ec5

https://zenn.dev/x_color/articles/90798c7ab4c6e5

それでは、「どのようにリソースの変更を各コントローラに送信しているのか」について解説していく。

※解説している処理・コードはKubernetes v1.27.0のものとなる。

イベント検知後のイベント送信のしくみ

イベントの送信はSharedIndexInformerが行っている。
前回の記事で解説したとおりSharedIndexInformerは下記を担うコンポーネントとなっている。

  • 各リソースの状態変更検知
  • 状態のキャッシュ
  • 各コントローラへイベントを送信

リソースの状態変更検知については前回の記事で解説しているので、本記事ではキャッシュとイベント送信について解説する。

SharedIndexInformerは上記責務を担うために下記コンポーネントを起動する。

  • controller
    • Reflector
    • processLoop
  • sharedProcessor

(processLoopは正確にはコンポーネントとは言えないが、わかりやすさのため記載)

この中で、イベント送信に関わるコンポーネントはprocessLoopとsharedProcessorとなっている。本記事ではこれら2つについて解説していく。(Reflectorについては前回の記事を参照してほしい)

processLoop()はcontrollerによって起動される処理となっており、Reflectorによって検出されたリソース変更イベントをsharedProcessorへ送信する役割を担っている。
また、sharedProcessorは、受け取ったイベントを各コントローラへ送信する役目を担っている。

sharedProcessorへのイベント送信のしくみ

先程記載した通り、sharedProcessorへのイベント送信はcontroller.processLoop()が行っている。このメソッドはcontrollerによってReflectorと並列して起動され、下記処理を行う。

  1. 変更イベントをキューから取得する
  2. 変更イベントのイベントタイプを判定する
  3. 過去のリソース状態をキャッシュから取得する
  4. キャッシュを最新の状態で更新する
  5. イベントタイプに応じたイベント通知をsharedProcessorに送信する

ここからは上記処理の実装を追っていこう。
controller.processLoop()は、キューから変更イベントを取得する。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/controller.go#L190-L203

このイベントはsharedIndexInformer.HandleDeltas()を経由し、processDeltas()へ渡される。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/controller.go#L440-L472

この関数は、渡された変更イベントのタイプを判定し、タイプに応じたイベント通知をsharedProcessorに送信する。(通知の送信時に利用されるメソッドであるOnAdd()などはsharedIndexInformerのメソッドとして定義されている

また、送信前にキャッシュから過去のリソース状態を取得し、それも合わせて送信している。取得後にはキャッシュを最新状態へ更新している。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/controller.go#L453-L456

ちなみにこのキャッシュはSharedIndexInformerの生成時に生成されている。(実態はcache構造体

ここまでの処理により、リソースの状態変更が検出され、イベントとしてsharedProcessorへ送信される。

コントローラへのイベント送信のしくみ

sharedProcessorの起動

SharedIndexInformerはcontrollerと並列してsharedProcessorを起動する。これはイベントを各コントローラへ送信する役目を担っている。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L496

sharedProcessorは起動時に各コントローラ用のイベントリスナであるprocessListenerを起動する。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L796-L799

これらイベントリスナは各コントローラの生成時に登録される。
具体的には、Deployment用コントローラの場合、コントローラの生成時にAddEventListener()が呼び出される。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/pkg/controller/deployment/deployment_controller.go#L115-L126

AddEventListener()は内部でprocessListenerを生成しsharedProcessorに登録している。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L604-L609

イベントの送信

controllerから受け取ったイベントは下記流れで各コントローラに送信される。

  1. controllerからイベントを受け取る
  2. 各sharedListenerへイベントを送信する
  3. sharedListenerはイベントをチャネルに格納する
  4. 定期的にチャネルに格納されたイベントを取得する
  5. イベントに応じたコントローラのイベントハンドラを実行

controllerはイベント通知をsharedProcessorへ送信する際にsharedProcessor.distribute()を利用する。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L774-L790

この関数が各種SharedListenerへイベントを送信している。送信されたイベントはそのままsharedListenerが管理しているチャネルへ送られる

このチャネルに送られたイベントは、sharedProcessorが起動時に起動しているsharedListener.run()sharedListener.pop()によって利用される。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L796-L799

これらは下記役割を担っている。

  • sharedListener.pop(): チャネルに送られたイベントを取得し、run()側が利用できるようにイベントを送信する
  • sharedListener.run(): pop()経由で送られてきたイベントを取得し、適切なイベントハンドラを実行する

sharedListener.pop()は複雑なことをやっているように見える。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L931-L959

これは、不要な場合は処理をスキップするなどの最適化を行っているため。すごく単純化すると処理としては下記と同等。

func pop(){
    for {
        p.nextCh <-p.addCh:
    }
}

sharedListener.run()は下記となっている。定期的にsharedListener.pop()経由で流れてくるイベント有無を確認し、あればイベントに適したイベントハンドラを呼び出している。

https://github.com/kubernetes/kubernetes/blob/1b4df30b3cdfeaba6024e81e559a6cd09a089d65/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L961-L986

このイベントハンドラは各コントローラが登録しているものになるので、これが呼び出された時点でコントローラにイベントが伝搬したことになる。

これでようやく、Part1の記事で解説したリソースの更新処理などが呼び出される。

おわりに

これまで3つの記事に分けて、Kubernetesのコントローラのしくみを解説してきた。
再掲になるが、もしほかの記事を読んでいない方がいれば、読んでいただけるとうれしい。

https://zenn.dev/x_color/articles/cfcb3e46ce0ec5

https://zenn.dev/x_color/articles/90798c7ab4c6e5

Kubernetesは、さまざまなコンポーネントが連携することで全体がうまく動作する設計となっている。1つの機能を理解するだけでも多くのコンポーネントを理解しなければならないので読み解くのには時間がかかった。
しかし、goroutineやチャネル、キューなどを巧みに利用して実装しているので、読んで勉強になった所も多い。また、時間のあるときにほかのコンポーネントの実装を読んでみたい。

Discussion