💭
Golangでk8s Deploymentを再起動させる
やること
- client-goを使って複数のDeploymentを同時に再起動させる
Golang
Deploymentの取得
- Pod内であれば
rest.InClusterConfig()
でPodのServiceAccountを使用するconfigを取得できる -
clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
でDeploymentを取得- NamespaceとDeploymentの名前が必要
k8s.go
package main
import (
"context"
"errors"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"os"
"strings"
)
func newK8sClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(config)
}
type deploymentClient struct {
namespace string
clientset *kubernetes.Clientset
deployments []*appsv1.Deployment
}
func newDeploymentClient(ctx context.Context, clientset *kubernetes.Clientset) (*deploymentClient, error) {
namespace := os.Getenv("NAMESPACE_NAME")
if namespace == "" {
return &deploymentClient{}, errors.New("NAMESPACE_NAME is empty")
}
deploymentNames := os.Getenv("DEPLOYMENT_NAMES")
if deploymentNames == "" {
return &deploymentClient{}, errors.New("DEPLOYMENT_NAMES is empty")
}
deploymentNamesList := strings.Split(deploymentNames, ",")
deployments := make([]*appsv1.Deployment, 0, len(deploymentNamesList))
for _, deploymentName := range deploymentNamesList {
deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
if err != nil {
return &deploymentClient{}, err
}
deployments = append(deployments, deployment)
}
return &deploymentClient{
namespace: namespace,
clientset: clientset,
deployments: deployments,
}, nil
}
Deploymentの再起動
- https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kubectl/pkg/polymorphichelpers/objectrestarter.go
- RestartのAPIはないため、
kubectl rollout restart
と同様にRestartedAtを現在時間に変更してUpdateする
k8s.go
func (d *deploymentClient) restart(ctx context.Context, deployment *appsv1.Deployment) error {
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
_, err := d.clientset.AppsV1().Deployments(d.namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("Restart failed: %w\n", err)
}
return d.watch(ctx, deployment.ObjectMeta.Name) # 後述
}
Deploymentの状態監視
- 再起動が完了したことを確認するためにDeploymentのConditionがNewReplicaSetAvailableになるまで待つ
- WatchはGetやUpdateと違いLabelSelectorでDeploymentを指定する
k8s.go
const (
maxMonitoringWaitTime int64 = 30
)
func (d *deploymentClient) watch(ctx context.Context, deploymentName string) error {
watch, err := d.clientset.AppsV1().Deployments(d.namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", deploymentName),
TimeoutSeconds: &[]int64{maxMonitoringWaitTime}[0], // https://go.dev/ref/spec#Address_operators
})
if err != nil {
return fmt.Errorf("Watch failed: %w\n", err)
}
resultChan := watch.ResultChan()
for {
# イベントが発火する度にchannelを通じてイベントの情報を受け取ることができる
event, ok := <-resultChan
if !ok {
return errors.New("Monitoring timeout")
}
object, ok := event.Object.(*appsv1.Deployment)
if !ok {
return errors.New("Type assertion from object to deployment failed")
}
for _, condition := range object.Status.Conditions {
if condition.Type == "Progressing" && condition.Reason == "NewReplicaSetAvailable" {
watch.Stop()
return nil
}
}
}
}
複数同時
- restart()をgoroutineで行う
- 全処理の完了はerrgroupで確認する
- waitgroupではエラーのハンドリングが出来ないため
- https://pkg.go.dev/golang.org/x/sync/errgroup
main.go
package main
import (
"context"
"golang.org/x/sync/errgroup"
"log"
)
func init() {
log.SetFlags(log.Llongfile)
}
func main() {
ctx := context.Background()
clientset, err := newK8sClientset()
if err != nil {
log.Fatalf("%v\n", err)
}
deploymentClient, err := newDeploymentClient(ctx, clientset)
if err != nil {
log.Fatalf("%v\n", err)
}
eg := new(errgroup.Group)
for _, deployment := range deploymentClient.deployments {
deployment := deployment // https://go.dev/doc/faq#closures_and_goroutines
eg.Go(func() error {
return deploymentClient.restart(ctx, deployment)
})
}
if err := eg.Wait(); err != nil {
log.Fatalf("%v\n", err)
}
}
Dockerfile
FROM golang:1.21-bookworm as builder
WORKDIR /app
COPY ./src/go.mod ./
COPY ./src/go.sum ./
RUN go mod download
COPY ./src ./
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -ldflags "-s -w" -o main
FROM gcr.io/distroless/static-debian12:nonroot
WORKDIR /app
COPY /app/main /app/main
CMD ["/app/main"]
Helm chart
- Deploymentに対してGet, Watch, UpdateしているのでClusterRoleで権限を付与する
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ .Release.Name }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "labels" . | indent 2 }}
annotations:
# eksの場合
eks.amazonaws.com/role-arn: {{ .Values.serviceAccountArn }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ .Release.Name }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "labels" . | indent 2 }}
rules:
- apiGroups: ["apps"]
resources:
- deployments
verbs: ["get", "watch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ .Release.Name }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "labels" . | indent 2 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ .Release.Name }}
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}
namespace: {{ .Release.Namespace }}
Discussion