🌸
Go + Kubernetesを使ったバッチ処理
概要
Webサービスを運用する際に、何らかの形で
・定期的に、大量のデータ処理を一括で行いたい
というユースケースは多いと思う。
今回はKubernetesのCronJobを使用してバッチ処理基盤を構築する方法を説明する。
尚、言語はGoとする。
使用するGoのライブラリ
-
spf13/cobra
→ CLI用のライブラリ
-
uber-go/dig
→ DI用のライブラリ
アプリ側の実装
ディレクトリ構成
cmd
|_ batch
|_ main.go
pkg
|_ batch
|_ cmd
|_ user
|_ cmd.go
|_ job
|_ user
|_ job.go
|_ registry
|_ registry.go
|_ util
|_ closure
|_ util.go
build
|_ batch
|_ Dockerfile
cmd/batch/main.go (サーバー起動処理)
メインとなる、サーバー起動処理を書いていく。
Closeが必要な外部リソースのClientやインスタンスは、ここでNewしておく。
cmd/batch/main.go
type config struct {
mySQLAddr string
mySQLUser string
mySQLPassword string
mySQLDB string
}
func main() {
// MySQLとか外部リソースの環境変数を受け取る
config := &config{
mySQLAddr: os.Getenv("MYSQL_ADDR"),
mySQLUser: os.Getenv("MYSQL_USER"),
mySQLPassword: os.Getenv("MYSQL_PASSWORD"),
mySQLDB: os.Getenv("MYSQL_DB"),
}
defer func() {
p := recover()
if p == nil && err == nil {
return
}
// Podを異常終了にするためExitCodeを指定
os.Exit(1)
}()
// Close処理をまとめて行う
closeListener := &closure.CloseListener{}
defer closeListener.Close()
// cobra.Commandを生成
rootCmd, err := newRootCmd(ctx, config, closeListener, slackNotifier)
if err != nil {
return err
}
// 実行
if err := rootCmd.ExecuteContext(ctx); err != nil {
return err
}
}
func newRootCmd(ctx context.Context, c *Config, closeListener *closure.CloseListener) (*cobra.Command, error) {
dataSource := fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true&loc=Local",c.mySQLUser, c.mySQLPassword, c.mySQLAddress, c.mySQLDatabase)
db, err := sql.Open("mysql", dataSource)
if err != nil {
return nil, err
}
closeListener.Add(func() {
if err := db.Close(); err != nil {
return nil, err
}
})
// DI
cmd, err := registry.Register(db *sql.DB)
if err != nil {
return nil, err
}
return cmd, nil
}
pkg/util/closure/util.go (クローズ処理)
*sql.DBなどCloseを行う必要があるClient用にUtil関数を作っておく。
Clientがどんどん増えていった時にオススメ。
pkg/uti/clouse/util.go
package closure
type CloseListener struct {
fs []func()
}
func (cl *CloseListener) Add(f func()) {
cl.fs = append(cl.fs, f)
}
func (cl *CloseListener) Close() {
// 追加された逆順でCloseする
for i := len(cl.fs) - 1; i >= 0; i-- {
f := cl.fs[i]
f()
}
cl.fs = nil
}
pkg/batch/registry/registry.go (DI)
DIをまとめて書いていく。
DIツールには有名なDigを使用する。
Provideで必要なオブジェクトを構築し、最後にInvokeで実行したい関数を実行する。
DIが増えてくるとNewするのが面倒になってくるのでおすすめ。
pkg/batch/registry/registry.go
var JobSet = []interface{}{
userjob.NewJob,
}
func Register(adminDB *sql.DB) (*cobra.Command, error) {
container := dig.New()
// 外部リソース
if err := providesContainer(container, []interface{}{
func() *sql.DB {
return db
},
}); err != nil {
return nil, err
}
// Jobs
if err := providesContainer(container, JobSet); err != nil {
return nil, err
}
rootCmd := &cobra.Command{
Use: "batch",
Short: "batch for server",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
}
},
}
if err := container.Invoke(func(
userJob user.Job,
) error {
rootCmd.AddCommand(
user.NewCmd(userJob),
)
return nil
}); err != nil {
return nil, err
}
return rootCmd, nil
}
func providesContainer(c *dig.Container, containers []interface{}) error {
for _, container := range containers {
err := c.Provide(container)
if err != nil {
return err
}
}
return nil
}
pkg/batch/job/user/job.go (実際に行う処理を書いてく)
pkg/batch/job/user/job.go
type Job interface {
UpdateUser(ctx context.Context) error
DeleteUser(ctx context.Context) error
}
type job struct {
db *sql.DB
}
func NewJob(db *sql.DB) Job {
return &job{
db: db,
}
}
func (j *job) UpdateUser(ctx context.Context) error {
// 実行したいバッチの処理を書く
return nil
}
func (j *job) DeleteUser(ctx context.Context) error {
// 実行したいバッチの処理を書く
return nil
}
pkg/batch/cmd/user/cmd.go (Jobに対応するコマンドを記述)
userという1つのドメインに、subCmdとしてupdateとdelete、、など色々なユースケースのJobを追加していく。
pkg/batch/cmd/user/cmd.go
func NewCmd(j user.Job) *cobra.Command {
cmd := &cobra.Command{
Use: "user",
Short: "user command",
}
subCmds := []*cobra.Command{
{
Use: "update",
Short: "ユーザーを更新する",
RunE: func(cmd *cobra.Command, args []string) error {
if err := j.UpdateUser(cmd.Context()); err != nil {
return err
}
return nil
},
},
{
Use: "delete",
Short: "ユーザーを削除する",
RunE: func(cmd *cobra.Command, args []string) error {
if err := j.DeleteUser(cmd.Context()); err != nil {
return err
}
return nil
},
},
}
cmd.AddCommand(subCmds...)
return cmd
}
build/batch/Dockerfile
イメージを軽くしたいので、マルチステージビルドしとく。
# ===== build go binary =====
FROM golang:1.17.8-alpine as go-builder
WORKDIR /go/src/github.com/xxx/yyy-server
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY cmd cmd
COPY pkg pkg
RUN go build -o batch cmd/batch/main.go
# ==== build docker image ====
FROM alpine
RUN apk update && apk add --no-cache ca-certificates bash tzdata && \
cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime && \
apk del tzdata
COPY /go/src/github.com/xxx/yyy-server /batch
Kubernetes側の実装
ディレクトリ構成
k8sのパッケージマネージャとしてはHelmを使用する。
batch
|_ templates
|_ cronjob.yaml
|_ values.yaml
|_ Chart.yaml
batch/values.yaml
環境ごとにvalues-dev.yaml、values-stg.yamlなど作ってもOK。
batch/values.yaml
# 環境名
env: dev
# 共通の値をセット
common:
# 成功した履歴を残す件数
successfulJobsHistoryLimit: 1
# 失敗した履歴を残す件数
failedJobsHistoryLimit: 2
# ImagePullPolicy
imagePullPolicy: IfNotPresent
# 同時実行
concurrencyPolicy: Forbid
# 参照するImageのタグ
imageTag: v1.0.0
# 各Batch情報
# scheduleは世界標準時で記述する
# 分、時、日、月、曜日
batches:
- name: user-update
# 毎朝2時30分に実行(世界標準だと17時)
schedule: "30 17 * * *"
args:
- "user"
- "update"
resources:
requests:
cpu: 10m
memory: 128Mi
completions: 1
parallelism: 1
startingDeadlineSeconds: 60
- name: user-delete
# 毎朝3時30分に実行(世界標準だと18時)
schedule: "30 18 * * *"
args:
- "user"
- "delete"
resources:
requests:
cpu: 10m
memory: 128Mi
backoffLimit: 3
# 2並列で走らせる
completions: 2
parallelism: 2
startingDeadlineSeconds: 60
batch/templates/cronjob.yaml
実際に適用するcronjobのマニフェストファイル。
common → 全Jobに適用する値を定義。
batches → 各Job固有の値を定義。
という形にしてる。
batch/templates/cronjob.yaml
{{- range $i, $batch := $.Values.batches -}}
{{ $app := printf "%s-batch-%s" $.Values.env $batch.name }}
{{- if ne $i 0 }}
---
{{- end }}
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: {{ $app }}
labels:
app: {{ $app }}
namespace: batch
spec:
successfulJobsHistoryLimit: {{ $.Values.common.successfulJobsHistoryLimit }}
failedJobsHistoryLimit: {{ $.Values.common.failedJobsHistoryLimit }}
schedule: {{ $batch.schedule | quote }}
concurrencyPolicy: {{ $.Values.common.concurrencyPolicy }}
startingDeadlineSeconds: {{ $.Values.common.startingDeadlineSeconds }}
jobTemplate:
spec:
backoffLimit: {{ $batch.backoffLimit }}
completions: {{ $batch.completions }}
parallelism: {{ $batch.parallelism }}
template:
spec:
restartPolicy: Never
containers:
- name: batch
image: "xxx/xxx/github.com/yyy/batch:{{ $.Values.common.imageTag }}"
imagePullPolicy: {{ $.Values.common.imagePullPolicy }}
command:
- "/batch"
args:
{{- with $batch.args -}}
{{- toYaml . | nindent 12 }}
{{- end }}
resources:
{{- with $batch.resources }}
{{- toYaml . | nindent 14 }}
{{- end }}
env:
# MYSQL
- name: MYSQL_ADDR
value: "{{ $.Values.mysql.addr }}"
- name: MYSQL_USER
value: "{{ $.Values.mysql.user }}"
- name: MYSQL_PASSWORD
value: "{{ $.Values.mysql.password }}"
- name: MYSQL_DB
value: "{{ $.Values.mysql.db }}"
{{- end -}}
Discussion