図を描きながら理解するK8s sample-controller
目的
sample-controllerを読みながら、Custom Controllerを実装するときのポイントを理解していくのが目的です。
「カスタムリソース/カスタムコントローラとは?」という方は下記公式docを読むと良いです。
sample-controllerの機能
ソースコードを読む前にsample-controllerでどのような機能が実現されるのかをみてみましょう。
ざっくりですが下記機能が実装されているようです。
- CRDsの
Fooから、Deploymentを作成する- コンテナは
nginx:latestを使用する
- コンテナは
-
Fooの.spec.replicasを変更すると、対応するDeploymentの.spec.replicasも変更される
下記のようなYAMLを適用するとexample-foo Deploymetが作成され、最終的に1つ(replicas: 1)のNginx Podが起動します。
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
name: example-foo
spec:
deploymentName: example-foo
replicas: 1
コードを深堀する前に、想像で「こういう構成かな?」という図を描いてみます。

FooとDeploymentの操作を行うため、Kubernetes API Serverへ接続するための何かしらのクライアントライブラリを利用してResoruceを取得してそうです。
Kubernetes ClientはAPI Serverにアクセスし、得られたデータをFoo/Deploymentの処理を行う部分に渡し、具体的な処理が行われそうです。
.spec.replicasの差分があれば再度Kubernetes Clientを利用してResourceの更新もやりそうですね。
コードリーディングしつつ、上記図をブラッシュアップしていこうと思います。
コードリーディング
コードの構成
リポジトリ直下の
- main.go
- controller.go
で実装されているため、上記2つのファイルを主にみていこうと思います。
main.go
安全な停止処理(signals.SetupSignalHandler())
main.goのはじめの方で下記関数が呼ばれています。
コメントにあるように、SIGTERMやSIGINTが送られたきたときの考慮がされています。
make(chan os.Signal, 2)の部分ですが、1度目のシグナル受け取りではcancel()でgracefullyに停止を試み、
2度目のシグナル受け取り時にはos.Exit(1)で強制的に停止させているようです。
また、SetupSignalHandler()は1度だけ呼び出されることを意図しているため、それをonlyOneSignalHander channelで制御しています。
複数回よばれたときclose(onlyOneSignalHander)がpanicを起こすので開発者が気づける仕組みです。
API Serverの接続情報の設定
Deploymentのアクセスで利用するクライアントと、Fooのアクセスで利用するクライアントをここでしています。
kubeInformerFactoryとexampleInformerFactoryがありますが、このInformerがCustom Controllerを実装する上で非常に重要な要素となっています。
現時点では「API Serverとやり取りするためのもの」とみておきます。
はじめに想像で描いていた図を少し更新してみます。
変更点は「Kubernetes Client」の部分を「Informer」にしています。

Controllerの作成
用意した接続情報やInformerからControllerを作成しています。
具体的な処理は後述のcontroller.goで確認しましょう。
Informerの起動
Informerを起動させています。
具体的な処理は後述のcontroller.goで確認しましょう。
Controllerの実行
Controllerを実行しています。
具体的な処理は後述のcontroller.goで確認しましょう。
controller.go
Controllerの作成(func NewController())
Event処理
機能要件ではないですが、Custom Controllerがどのような処理をしていたのかを記録するためのEvent準備がここで設定されています。
なくてもFoo/Deploymentの操作には影響はないですが、運用を見据えると必要な仕組みです。
キューの設定
API Serverから取得したFooを一時的に保持するためのキューを作るのですが、その設定がされています。
RateLimiterとあるように、制限をかけるようにしているようです。1つずつ見てみましょう。
NewTypedItemExponentialFailureRateLimiter
ここで設定されている2つの引数ですが、下記を意味しています。
| 引数 | 意味 | 設定値 |
|---|---|---|
baseDelay time.Duration |
アイテム処理失敗時の待機時間に使用される遅延時間。 baseDelay*2^<num-failures>で計算される。 |
5*time.Millisecond |
maxDelay time.Duration |
最大待機時間 | 1000*time.Second |
上記の設定により、繰り返し処理に失敗するようなFooは徐々に待ち時間が増えることになります。
仮に待ち時間がなかった場合、特定のキューが短時間で繰り返し処理されることになりコントローラに負荷がかかることが予想できます。
処理ができなかった場合などの障害系を考慮した設定だと思います。
TypedBucketRateLimiter
こちらですが、rate.NewLimiterの引数をみてみましょう。
| 引数 | 意味 | 設定値 |
|---|---|---|
r Limit |
1秒あたりに処理可能なアイテム数 | rate.Limit(50) |
b int |
一時的な高負荷時のバースト数 | 300 |
障害系のためのNewTypedItemExponentialFailureRateLimiterにたいして、こちらは安定化のための制御です。
sample-controllerではFooの内容からDeploymentの操作を行うため、ここで処理速度の制限をすることでAPI Serverに負荷をかけすぎないような仕組みにしているのだと思います。
controllerの作成
Controllerの実体がここで作成されます。各項目についてみていきましょう。
| 項目 | 説明 |
|---|---|
| kubeclientset | Kubernetesがもともと持つ各種Resourceへアクセスするクライアント |
| sampleclientset | sample-controllerのCRDs Resourceへアクセスするクライアント |
| deploymentsLister | deploymentInformerがもつDeploymentのキャッシュへアクセスするためのもの |
| deploymentsSynced | deploymentInformerのキャッシュが同期できたかのフラグ |
| foosLister | fooInformerがもつFooのキャッシュへアクセスするためのもの |
| fooSynced | fooInformerのキャッシュが同期できたかのフラグ |
| workqueue |
Fooを一時的に保存するキュー |
| recorder |
Events Resource登録用のもの |
WorkQueueとLister、clientsetも図に追加しましょう。
recorderは機能には影響がない部分なので割愛します。

Informerの設定
main.goで出てきたInformerですが、このInformerのAddEventHandlerを使うことで対象Resourceの作成/更新/削除が行われたときに処理を実行できます。
Foo Resourceに対する処理の登録
Foo Resourceが新たに作成された場合(Add)と更新があった場合(Update)はenqueueFooを呼び出しキューへの登録を行います。
削除された場合(Delete)の挙動は実装されていません。
多分ですが、あるFooが削除されたとき、対応するDeploymentは.metadata.ownerReferencesの情報をもとに自動で削除されるためではないかと思います。
Deployment Resourceに対する処理の登録
こちらはDeployment Resourceに対するイベント登録で、新規作成/更新/削除に処理(handleObject)が登録されています。
handleObject()
handleObject()ですが、いくつかの分岐があり"ならでは"の考慮がされています。
こちらも一つずつ見ていきます。
UpdateFuncのResourceVersionの比較
「UpdateFuncは更新があった際に呼び出される」と理解していたのですが、下記コードを読むとnewDepl.ResourceVersionとoldDepl.ResourceVersionの比較がされています。
ここが同じになることがありうるのか、という点についてですが、コメントにあるように「Periodic resync`を有効にしていると、Informerがもつキャッシュに対してUpdate Eventを発行させるようです。
実はその設定もmain.goの中でされていました。
30秒ごとにInformerがもつキャッシュにUpdateFuncを実行させて、更新イベントを拾えなかった場合のフォローをしています。
DeletedFinalStateUnknownの考慮
client-goの内部の話になりますが、InformerはAPI Serverから取得した情報をキャッシュとして保持しています。
一時的なネットワーク切断等、Informerが持つキャッシュに保持される前にResourceの作成と削除が行われてしまった場合の考慮がここでされています。
ちなみにtombstoneという変数名は「墓石」を意味しています。
このような状況は普通に起こりうることのようで、kubernetes/ingress-nginxの下記issueでも「cache.DeletedFinalStateUnknownを取り扱う必要がある」とコメントされています。
正常系である場合は次の処理になります。
クラスタ内に存在するDeploymentですが、Fooが管理していないものは対象外としてreturnし、Fooが管理するDeploymentの場合は対応するFooをenqueueFoo()で登録しています。
一旦ここまでで図の再整理をしてみましょう。
Informerに対するAddFunc/UpdateFunc/DeleteFuncを追加しています。
-
FooとDeploymentのInformerを作る - それぞれの
InformerにAddFunc/UpdateFunc/DeleteFuncで処理を登録する。 - キューを用意し、
Fooを投入する。

ここまででAPI ServerへのFooとDeploymentの取得処理と、キューへの登録ができるようになりました。
この後はキューからFooを取得し処理をしていけば良さそうです。
図にあるDeploymentLister/kubeclientset/sampleclientsetはあとあとでてきます。
Controllerの実行(func (c *Controller) Run(ctx context.Context, workers int))
Informerが持つキャッシュの初回同期待ち
Informerの起動直後はInformerが内部で保持するキャッシュの同期がされていない状態であるため、その同期が完了するまでの待機処理が書かれています。
キューからデータを取得して処理するワーカの起動
キューの書き込みはInformerに登録したAddFunc/UpdateFunc/DeleteFuncが担っているのに対して、書き込まれたFooをキューから取得して処理をするワーカが必要です。
そのワーカの起動がここで行われています。
workersはmain.goの下記で指定された2が入っており、2つのワーカがキューからFooを取得して処理するような仕組みにしているようです。
キューからのデータ取得
processNextWorkItem()でキューからのデータの取得と処理が行われています。
キューからの取得ですが、下記が主な処理になっています。
| メソッド名 | 内容 |
|---|---|
Get() |
キューからFooを取得する。 |
Done(objRef) |
処理の成功/失敗に関わらず、objRefの処理が終わったことをdeferで伝える |
Forget(objRef) |
処理が成功し、objRefをキューから除く |
AddRateLimited(ObjRef) |
処理が失敗したため、RateLimitつきでキューに再投入する |
業務ロジック(syncHandler)
syncHandler()がsample-controllerの核となる業務ロジックが実装されている部分です。
foosListerというものが使われていますが、先述したようにこれはInformerが内部で保持するキャッシュにアクセスするためのものです。
objectRef.NamespaceとobjectRef.Nameで絞り込んでFooを取得しています。
foosListerと同じくdeploymentsListerを使って処理対象のFooに対応するDeploymentを取得しています。
まずはFooが存在するが、対応するDeploymentがまだ存在しない場合の処理が実装されています。
Create()でAPI Serverに新たなDeploymentの情報を渡していますね。
次はFooが存在し、かつ対応するDeploymentも存在する場合です。
Fooの.spec.replicasとDeloymentの.spec.replicasを比較して、必要に応じてDeplomentのレプリカ数を変更する必要があります。
最後にはFooの.statusの更新をして業務ロジックの実装は完了となります。
非機能ではありますが、Eventの登録も最後にしていました。
キューからデータを読み取り、FooとDeploymentの処理をする部分についても図に整理しましょう。

以上です。
sample-controllerリポジトリの概要図と比較してみる
ここまで、コードリーディングを進めつつ図を更新しました。
実はsample-controllerのリポジトリにはclient-goとCustom Controllerの各種役割が描かれた概要図があります。その概要図と今回作成した図を見比べてみましょう。

| sample-controllerリポジトリの図 | 今回作成した図 |
|---|---|
| Resource Event Handlers |
fooInformer/deploymentInformerに登録した各種処理 |
| Informer Reference |
fooInformer/deploymentInformer
|
| Indexer Reference |
foosLister/deploymentsLister ※Listerは内部でIndexerをみている |
| Workqueue |
Fooを格納するWorkQueue |
大まかな流れは同じのようです。
client-goの部分は今回は深堀していませんが、ReflectorもResourceを監視する上で重要なコンポーネントとなります。
sample-controllerから読み取れたポイント
sample-controllerを一通り読んでみて、どのように設計すべきかのポイントが見えてきたのでまとめてみます。
ただ、これは主観的な部分もあるため「かならずこうすべき」というものではないことを先に述べておきます。
API Serverに負荷をかけない
K8sならではのポイントはこれでしょうか。
Kubernetesを構成する要素の中でもAPI Serverは特に重要なものです。特定のCustom Controllerが負荷をかけてしまいクラスタ全体が不安定になっては目も当てられません。
sample-controllerではRateLimitをかけたキューを使うことで正常系/異常系の速度調整がされています。
また、今回は深堀しませんでしたがInformerにもAPI Serverへの負荷をかけないようにする機能が備わっています。
分解点を作って処理をわける
InformerのAddFunc/UpdateFunc/DeleteFuncにsyncHandler()までの処理を詰め込むのも可能ですが、負荷のかかる処理を書いてしまった場合にそこで処理が止まってしまう可能性もあるでしょう。
そういった点も考慮し、キューなどを挟んで「API Serverから必要なデータを取得して溜める」処理と「溜めたデータを読み取って、API Serverに書き込みリクエストを送る」処理とで分けているのではないかと思います。
運用を考える(独自のEventを登録する)
「Custom Controllerの中で何が起きていたのか」の理解の手助けとしてEventを登録しておくのも運用を見据えると大切だと思います。
ログの出力でもよいかもしれませんが、Kubernetesのエコシステムの中で動くものなのでEvent登録は便利だなと思いました。
宿題
下記は宿題として残しておきます。
- client-goの内部の仕組み
- 今回は
Reflector/DeltaFIFO/Informer/Indexerへはあまり深入りしませんでした -
SharedIndexInformerなどの機能も深堀していきたいです
- 今回は
- Custom Controllerのリーダ選出
- sample-controllerは単体で動かしますが、冗長構成にしたときにはどのような考慮が必要になるでしょうか
- client-goにそういった機能はありそうです
- カスタムリソースの作り方、関連のGoコードの生成
- 今回はCRDsはすでに提供されており、
sampleclientsetなどのコードもすでに生成されていました
- 今回はCRDsはすでに提供されており、
参考リンク
Discussion