🌸

Go + Kubernetesを使ったバッチ処理

2022/05/29に公開

概要

Webサービスを運用する際に、何らかの形で

・定期的に、大量のデータ処理を一括で行いたい

というユースケースは多いと思う。

今回はKubernetesのCronJobを使用してバッチ処理基盤を構築する方法を説明する。

尚、言語はGoとする。

使用するGoのライブラリ

アプリ側の実装

ディレクトリ構成

cmd
  |_ batch
    |_ main.go
pkg
  |_ batch
    |_ cmd
       |_ user
         |_ cmd.go
    |_ job
       |_ user
         |_ job.go
    |_ registry
       |_ registry.go
  |_ util
    |_ closure
      |_ util.go
build
  |_ batch
    |_ Dockerfile

cmd/batch/main.go (サーバー起動処理)

メインとなる、サーバー起動処理を書いていく。

Closeが必要な外部リソースのClientやインスタンスは、ここでNewしておく。

cmd/batch/main.go
type config struct {
   mySQLAddr                      string
   mySQLUser                      string
   mySQLPassword                  string
   mySQLDB                        string
}

func main() {
   // MySQLとか外部リソースの環境変数を受け取る
  config := &config{
	mySQLAddr:                      os.Getenv("MYSQL_ADDR"),
	mySQLUser:                      os.Getenv("MYSQL_USER"),
	mySQLPassword:                  os.Getenv("MYSQL_PASSWORD"),
	mySQLDB:                        os.Getenv("MYSQL_DB"),
  }
  defer func() {
    p := recover()
    if p == nil && err == nil {
       return
    }
    
    // Podを異常終了にするためExitCodeを指定
    os.Exit(1)
   }()
  
  // Close処理をまとめて行う
  closeListener := &closure.CloseListener{}
  defer closeListener.Close()
  
  // cobra.Commandを生成
  rootCmd, err := newRootCmd(ctx, config, closeListener, slackNotifier)
  if err != nil {
    return err
  }

  // 実行
  if err := rootCmd.ExecuteContext(ctx); err != nil {
    return err
  }
}

func newRootCmd(ctx context.Context, c *Config, closeListener *closure.CloseListener) (*cobra.Command, error) {
   
    dataSource := fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true&loc=Local",c.mySQLUser, c.mySQLPassword, c.mySQLAddress, c.mySQLDatabase)
    db, err := sql.Open("mysql", dataSource)
    if err != nil {
	return nil, err
    }
    closeListener.Add(func() {
      if err := db.Close(); err != nil {
	return nil, err
    }
   })
   
   // DI
   cmd, err := registry.Register(db *sql.DB)
   if err != nil {
     return nil, err
   }
   
   return cmd, nil
}

pkg/util/closure/util.go (クローズ処理)

*sql.DBなどCloseを行う必要があるClient用にUtil関数を作っておく。
Clientがどんどん増えていった時にオススメ。

pkg/uti/clouse/util.go
package closure

type CloseListener struct {
  fs []func()
}

func (cl *CloseListener) Add(f func()) {
  cl.fs = append(cl.fs, f)
}

func (cl *CloseListener) Close() {
  // 追加された逆順でCloseする
  for i := len(cl.fs) - 1; i >= 0; i-- {
     f := cl.fs[i]
     f()
  }
  cl.fs = nil
}

pkg/batch/registry/registry.go (DI)

DIをまとめて書いていく。
DIツールには有名なDigを使用する。

Provideで必要なオブジェクトを構築し、最後にInvokeで実行したい関数を実行する。
DIが増えてくるとNewするのが面倒になってくるのでおすすめ。

pkg/batch/registry/registry.go
var JobSet = []interface{}{
  userjob.NewJob,
}

func Register(adminDB *sql.DB) (*cobra.Command, error) {
  container := dig.New()

  // 外部リソース
  if err := providesContainer(container, []interface{}{
     func() *sql.DB {
	return db
     },
   }); err != nil {
     return nil, err
  }

   // Jobs
   if err := providesContainer(container, JobSet); err != nil {
      return nil, err
   }

   rootCmd := &cobra.Command{
     Use:   "batch",
     Short: "batch for server",
     Run: func(cmd *cobra.Command, args []string) {
	if len(args) == 0 {
	   cmd.HelpFunc()(cmd, args)
	}
     },
   }
   
   if err := container.Invoke(func(
     userJob user.Job,
   ) error {
     rootCmd.AddCommand(
      user.NewCmd(userJob),
     )
     return nil
   }); err != nil {
     return nil, err
   }

   return rootCmd, nil
}



func providesContainer(c *dig.Container, containers []interface{}) error {
   for _, container := range containers {
      err := c.Provide(container)
      if err != nil {
	 return err
       }
   }
   return nil
}

pkg/batch/job/user/job.go (実際に行う処理を書いてく)

pkg/batch/job/user/job.go
type Job interface {
    UpdateUser(ctx context.Context) error
    DeleteUser(ctx context.Context) error
}

type job struct {
    db *sql.DB
}

func NewJob(db *sql.DB) Job {
   return &job{
     db: db,
   }
}

func (j *job) UpdateUser(ctx context.Context) error {
  // 実行したいバッチの処理を書く
  
  return nil
}

func (j *job) DeleteUser(ctx context.Context) error {
  // 実行したいバッチの処理を書く
  
  return nil
}

pkg/batch/cmd/user/cmd.go (Jobに対応するコマンドを記述)

userという1つのドメインに、subCmdとしてupdateとdelete、、など色々なユースケースのJobを追加していく。

pkg/batch/cmd/user/cmd.go
func NewCmd(j user.Job) *cobra.Command {
    cmd := &cobra.Command{
        Use:   "user",
        Short: "user command",
    }

   subCmds := []*cobra.Command{
         {
	Use:   "update",
	Short: "ユーザーを更新する",
	RunE: func(cmd *cobra.Command, args []string) error {
	       if err := j.UpdateUser(cmd.Context()); err != nil {
		return err
	        }
	        return nil
	},
         },
         {
	Use:   "delete",
	Short: "ユーザーを削除する",
	RunE: func(cmd *cobra.Command, args []string) error {
	       if err := j.DeleteUser(cmd.Context()); err != nil {
		return err
	        }
	        return nil
	},
         },
     }

    cmd.AddCommand(subCmds...)
 
    return cmd
}

build/batch/Dockerfile

イメージを軽くしたいので、マルチステージビルドしとく。

# ===== build go binary =====
FROM golang:1.17.8-alpine as go-builder

WORKDIR /go/src/github.com/xxx/yyy-server

COPY go.mod .
COPY go.sum .
RUN go mod download

COPY cmd cmd
COPY pkg pkg

RUN go build -o batch cmd/batch/main.go

# ==== build docker image ====
FROM alpine

RUN apk update && apk add --no-cache ca-certificates bash tzdata && \
    cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime && \
    apk del tzdata

COPY --from=go-builder /go/src/github.com/xxx/yyy-server /batch

Kubernetes側の実装

ディレクトリ構成

k8sのパッケージマネージャとしてはHelmを使用する。

batch
  |_ templates
    |_ cronjob.yaml
  |_ values.yaml
  |_ Chart.yaml

batch/values.yaml

環境ごとにvalues-dev.yaml、values-stg.yamlなど作ってもOK。

batch/values.yaml
# 環境名
env: dev

# 共通の値をセット
common:
  # 成功した履歴を残す件数
  successfulJobsHistoryLimit: 1
  # 失敗した履歴を残す件数
  failedJobsHistoryLimit: 2
  # ImagePullPolicy
  imagePullPolicy: IfNotPresent
  # 同時実行
  concurrencyPolicy: Forbid
  # 参照するImageのタグ
  imageTag: v1.0.0

# 各Batch情報
# scheduleは世界標準時で記述する
# 分、時、日、月、曜日
batches:
- name: user-update
  # 毎朝2時30分に実行(世界標準だと17時)
  schedule: "30 17 * * *"
  args:
    - "user"
    - "update"
  resources:
    requests:
      cpu: 10m
      memory: 128Mi
  completions: 1
  parallelism: 1
  startingDeadlineSeconds: 60
- name: user-delete
  # 毎朝3時30分に実行(世界標準だと18時)
  schedule: "30 18 * * *"
  args:
    - "user"
    - "delete"
  resources:
    requests:
      cpu: 10m
      memory: 128Mi
  backoffLimit: 3
  # 2並列で走らせる
  completions: 2
  parallelism: 2
  startingDeadlineSeconds: 60

batch/templates/cronjob.yaml

実際に適用するcronjobのマニフェストファイル。

common → 全Jobに適用する値を定義。
batches → 各Job固有の値を定義。

という形にしてる。

batch/templates/cronjob.yaml
{{- range $i, $batch := $.Values.batches -}}
{{ $app := printf "%s-batch-%s" $.Values.env $batch.name }}
{{- if ne $i 0 }}
---
{{- end }}
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: {{ $app }}
  labels:
    app: {{ $app }}
  namespace: batch
spec:
  successfulJobsHistoryLimit: {{ $.Values.common.successfulJobsHistoryLimit }}
  failedJobsHistoryLimit: {{ $.Values.common.failedJobsHistoryLimit }}
  schedule: {{ $batch.schedule | quote }}
  concurrencyPolicy: {{ $.Values.common.concurrencyPolicy }}
  startingDeadlineSeconds: {{ $.Values.common.startingDeadlineSeconds }}
  jobTemplate:
    spec:
      backoffLimit: {{ $batch.backoffLimit }}
      completions: {{ $batch.completions }}
      parallelism: {{ $batch.parallelism }}
      template:
        spec:
          restartPolicy: Never
          containers:
          - name: batch
            image: "xxx/xxx/github.com/yyy/batch:{{ $.Values.common.imageTag }}"
            imagePullPolicy: {{ $.Values.common.imagePullPolicy }}
            command:
            - "/batch"
            args:
              {{- with $batch.args -}}
              {{- toYaml . | nindent 12 }}
              {{- end }}
            resources:
              {{- with $batch.resources }}
              {{- toYaml . | nindent 14 }}
              {{- end }}
            env:
              # MYSQL
              - name: MYSQL_ADDR
                value: "{{ $.Values.mysql.addr }}"
              - name: MYSQL_USER
                value: "{{ $.Values.mysql.user }}"
              - name: MYSQL_PASSWORD
                value: "{{ $.Values.mysql.password }}"
              - name: MYSQL_DB
                value: "{{ $.Values.mysql.db }}"

{{- end -}}

Discussion