🦾

バッチ処理系の刷新とArgo Workflow移行

2021/12/07に公開約13,100字

これはPTAアドベントカレンダーの7日目の記事です。

5年間運用されてきたバッチ処理系を刷新し、Argo Workflowを用いたバッチ処理系に移行したのでその紹介記事です。


背景

GKE上でバッチ処理のワークロードを実行しており、ワークフローエンジンとしてDigdagを採用していました。ユースケースとしては定期実行のバッチ処理、ETL、機械学習等。
Digdagを用いたワークフロー定義はシンプルかつ運用に必要な機能を提供してくれています。実際のワークフロー内部の処理としては、ワークフローの各タスクにおいては基本的にはロジックは持たずKubernetes Jobの実行のみを行います。そのためにDigdagとKubernetes Job間で協調動作するための仕組みが独自で用意されていました。このようなバッチ処理系が約5年程運用されてきました。

この仕組で今まで元気に動いてはいたのですが次のような課題、改善の余地がありました。

  • Kubernetes JobとDigdagが協調動作するための仕組みが不安定
    • 週に1,2回アラートが上がっていた
    • Digdag Deployment上にあるk8s job manifestやk8s clusterへの接続設定はジョブ間で共有されている、あるジョブの実行が他のジョブに影響を与えうる
  • Digdagの1ステップの処理内容が本来やりたい処理以外の処理で複雑になっている
    • ①最新のKubernetes job manifestをGitHubからダウンロード②job manifest内のマクロ展開③Kubernetes Clusterに接続④Kubernetes jobの実行⑤実行したjobの終了を待機⑥終了ステータスを取得、終了ステータスに応じて次のステップを実行
  • 瞬断に耐えられるワークフローエンジンが欲しい。GKEクラスタのアップグレード作業がしやすくなる
  • 鍵を使った認証・認可からRBAC認可にする
  • digファイルがデプロイ環境ごとに複製されている
    • また、Kubernetes manifestは環境別に、所定のディレクトリにYAMLマニュフェストを配置することでそれがdigdagから取得、利用される仕組みとなっている
    • Twelve-Factor Appでいうところの「単一のコードベースと複数のデプロイ」のような状態ではない [1][2]
  • Kubernetes manifestの環境別の差分はJinja2テンプレートエンジンを使用して管理
    • 長年の運用で肥大化し、ビルドの実行時間が伸びてきていた
  • digdagワークフロー上でマクロ展開されるまで実際に動かすk8s manifestがわからない
    • シンタックスチェックなどの静的解析ができない
    • そのため、プラクティスとしてdigdag実行時のマクロ展開を避けるようになる、結果実行時パラメタの一部が異なるがほぼ同じようなk8s job manifestが多数作られる
  • ログが分散。digdagサーバ上のワークフロー実行時ログとKubernetes Jobsのログ
  • リトライ方法が複雑。例えば、アプリケーション内部で現在時刻を取得しているなど

このとき、Digdagのワークフロー定義ファイルは約300コあり、そのうち本番環境向けのdigファイルは約100コ。Kubernetes JobのYAMLマニュフェストのコード行数は計約31,000行という規模感でした

新しいバッチ処理系

アーキテクチャ

改善点、工夫

Argo Workflowの導入による改善点、工夫したポイントとしては、

  • 構成管理はkubectl + Kustomizeを採用
    • kubectl + Kustomizeを使いArgo Workflowマニュフェストの構成管理をします。ArgoCliがあるのですが試してみたところ運用に耐えられず見送りました。というのも、CronWorkflowの更新をしたい時には当時だとCronWorkflowのリプレースの操作をするしかなく、それがクリティカルな問題でした[3] 。リプレース中は削除されてるCronWorkflowが発火することはもちろんありませんし、削除したタイミングでそのCronWorkflowのヒストリまで消えてしまいます
    • また、ArgoCDを用いたGitOpsを近い将来に見据えていたのでそれとの相性もよいということで🙆‍♂️
  • Argo WebUIはCloud IAPをかましてGoogleアカウント認証
  • Workflow ControllerはHA構成にする
  • Workflow Controllerのクラッシュリカバリの設定をする
  • Cost最適化のプラクティスに習う
  • ResourceTemplateは使わない
    • CronWorkflow内でKubernetes Resourceを定義することができるので現行のKubernetes Jobマニュフェストをそのまま使うことができ移行自体はしやすくはなる(かもしれない)のですがこれは使用しないことにしました。理由としては、①意味が異なるものをネストしたYAMLの扱いにくさ(エディタでの編集も苦労する、パラメタリレーの仕事も減らしたい)②ワークフロー定義がArgoの責任範囲で完結できるようにするため(1つのワークフロー定義を見ればワークフローの全貌が把握できる、シンタックスチェックができる等)③Argo WorkflowがPodSpecを拡張した機能が非常に便利だから④責務が分散しうるのを避ける(リトライ、タイムアウト等の制御が1箇所に)⑤自分が最初に採った方法がチームのベースとなる、ベストプラクティスをこの時点で見つけたい。今後新しいジョブが作られる前に。そのためにも最初から特殊な構成にはしない。
  • WorkflowDefaultsを使ってワークフロー間で共通する設定の記述を減らす、定義漏れ時のリスクを減らす
  • RBAC認可
  • 各ワークフローごとにREADMEを作成
    • 今回の移行のためにジョブのサービス的な役割や、処理内容、保守運用のヒント、リカバリ方法等を調べたのでそれを記載
    • エラー時のSlack通知にはそのリカバリ手順、リトライ手順のリンクを載せる
  • 柔軟なリトライを可能にする
    • エラーとなったジョブの途中からの再開、最初から再実行、過去日時等を指定してのジョブの再実行が可能
    • そのためにコンテナの振る舞いに関する入力はWorkflowから渡す、Workflow起動時に渡すことができるようにした。例えば、アプリケーションコード内で現在時刻を取得し使わない
  • 本番環境にリリース前にステージング環境、開発環境で動作確認できること

参考: ワークフロー実行時のWebUIの図

参考サンプル: コードベース

ℹ️

ディレクトリ構成

/argo
├── Makefile
├── README.md
└── cron ... CronWorkflowリソースを定義する
│    └── sample-hourly-job
│    │    ├── README.md
│    │    └── base
│    │    │   ├── kustomization.yaml
│    │    │   └── workflow.yaml
│    │    └── overlays
│    │        └── dev
│    │        │   ├── kustomization.yaml
│    │        │   ├── suspend.yaml
│    │        │   └── params.yaml
│    │        └── stg
│    │        │   ├── kustomization.yaml
│    │        │   └── params.yaml
│    │        └── prd
│    │            ├── kustomization.yaml
│    │            └── params.yaml
│    └── ...
└── templates ... WorkflowTemplateリソースを定義する
    └── slack-notify.yaml
    └── ...

cron/sample-hourly-job/base/kustomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
  - workflow.yaml

cron/sample-hourly-job/base/workflow.yaml

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: sample-hourly-job
  namespace: argo
spec:
  schedule: "00 * * * *"
  timezone: "Asia/Tokyo"
  concurrencyPolicy: "Forbid"
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 5
  startingDeadlineSeconds: 60
  workflowSpec:
    entrypoint: main
    onExit: exit-handler
    arguments:
      parameters:
        - name: env
        - name: image
        - name: target_date
        - name: cloudsql_instance_credentials
        - name: envoy_config
        - name: slack_channel
    volumes:
      - name: cloudsql-instance-credentials
        secret:
          secretName: "{{workflow.parameters.cloudsql_instance_credentials}}"
      - name: envoy
        configMap:
          name: "{{workflow.parameters.envoy_config}}"
    templates:
      - name: main
        steps:
          - - name: target-date
              inline:
                script:
                  image: asia.gcr.io/my-project/sample/debian
                  command: [bash]
                  source: |
                    DEFAULT_TARGET_DATE=`date +"%Y-%m-%d" --date "{{workflow.scheduledTime}}"`
                    TARGET_DATE="{{workflow.parameters.target_date}}"
                    [ -z "$TARGET_DATE" ] && TARGET_DATE=$DEFAULT_TARGET_DATE
                    echo $TARGET_DATE
                  env:
                    - name: TZ
                      value: Asia/Tokyo

          - - name: sample-hourly-job-main
              inline:
                timeout: 58m
                container:
                  image: "{{workflow.parameters.image}}"
                  args:
                    - --env={{workflow.parameters.env}}
                    - --target-date={{steps.target-date.outputs.result}}
                  env:
                    - name: TZ
                      value: Asia/Tokyo
                  resources:
                    limits:
                      cpu: 1000m
                      memory: 1Gi
                    requests:
                      cpu: 1000m
                      memory: 1Gi
                sidecars:
                  - image: gcr.io/cloudsql-docker/gce-proxy:1.23.1-alpine
                    command: [/cloud_sql_proxy]
                    args:
                      - --dir=/tmp
                      - -instances={{workflow.parameters.cloudsql_instance}}=tcp:3306
		      - -credential_file=/secrets/cloudsql/credentials.json
		    volumeMounts:
                      - name: cloudsql-instance-credentials
                        mountPath: /secrets/cloudsql
                        readOnly: true
                  - image: envoyproxy/envoy:v1.20.1
                    command: ["envoy", "-c", "/etc/envoy/envoy.yaml"]
                    ports:
                      - containerPort: 10000
                        name: envoy-sidecar
                      - containerPort: 10001
                        name: envoy-admin
                    volumeMounts:
                      - name: envoy
                        mountPath: /etc/envoy
			readOnly: true
                  retryStargegy:
                    limit: 3
                    rettyPolicy: "Always"
                    backoff:
                      duration: "5s"
                      factor: 2

      - name: exit-handler
        steps:
          - - name: slack-notify-failed
              when: "{{workflow.status}} != Succeeded"
              templateRef:
                name: slack-notify
                template: slack-notify-template
              arguments:
                parameters:
                  - name: channel
                    value: "{{=jsonpath(workflow.parameters.slack_channel, '$.alert')}}"
                  - name: env
                    value: "{{workflow.parameters.env}}"
                  - name: status
                    value: "{{workflow.status}}"
                  - name: message
                    value: "{{workflow.name}} jobが失敗しました。エラー内容を確認しジョブの再実行をしてください。<https://github.com/my-org/my-repo/argo/cron/sample-hourly-job/README.md#リカバリ方法|リカバリ方法>"
                  - name: failures
                    value: "{{workflow.failures}}"

cron/sample-hourly-job/overlays/dev/kustomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
  - ../../base
patchesStrategicMerge:
  - params.yaml

cron/sample-hourly-job/overlays/dev/params.yaml

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: sample-hourly-job
  namespace: argo
spec:
  workflowSpec:
    arguments:
      parameters:
        - name: env
          value: dev
        - name: image
          value: asia.gcr.io/my-project/argo-sample-hourly-job:latest
        - name: target_date
          value: ""  # リトライのために日時を指定して実行する用のオプション。通常は空でおk.空の場合は現在日時が使われます
        - name: cloudsql_instance
          value: my-project:us-east1:dev-argo-sample-db
        - name: cloudsql_instance_credentials
          value: cloudsql-instance-credentials
        - name: slack_channel
          value: '{"alert": "#dev-my-project-alert", "success": "#dev-my-project-notice"}'

cron/sample-hourly-job/overlays/prd/kustomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
  - ../../base
patchesStrategicMerge:
  - params.yaml

cron/sample-hourly-job/overlays/prd/params.yaml

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: sample-hourly-job
  namespace: argo
spec:
  workflowSpec:
    arguments:
      parameters:
        - name: env
          value: prd
        - name: image
          value: asia.gcr.io/my-project/argo-sample-hourly-job:v0.0.1
        - name: target_date
          value: ""  # リトライのために日時を指定して実行する用のオプション。通常は空でおk.空の場合は現在日時が使われます
        - name: cloudsql_instance
          value: my-project:us-east1:prd-argo-sample-db
        - name: cloudsql_instance_credentials
          value: cloudsql-instance-credentials
        - name: slack_channel
          value: '{"alert": "#prd-my-project-alert", "success": "#prd-my-project-notice"}'

Tips:
IDEで書くときは次の設定をしておくとvalidateできて便利です。

https://argoproj.github.io/argo-workflows/ide-setup/

デプロイ方法

# シンタックスチェック
$ kustomize build argo/cron/sample-hourly-job/overlays/dev | argo lint -
# plan
$ kustomize build argo/cron/sample-hourly-job/overlays/dev | kubectl diff -f -
# apply
$ kustomize build argo/cron/sample-hourly-job/overlays/dev | kubectl apply -f -

リトライ方法

ケース1: 再実行する場合

Argo WebUIでCronWorkflowの詳細ページを開き、「SUBMIT」を実行

ケース2: エラーとなったジョブの続きから再開したい場合

Argo WebUIでエラーとなったWorkflowを開き、「RETRY」を実行

ケース3: 過去日時を指定し再実行する場合

argocliから対象日時を指定し再実行することが可能です。

例:

$ argo submit -n argo --from cronwf/sample-daily-jobs --parameter target_date=2021-12-07

結果

  • ワークフロー定義のコード行数が82%削減。1ジョブあたり平均1600行だったものが平均300行に減った
  • 週1,2回発生していたバッチ処理系のエラーが解消された
  • 1リポジトリ内のArgo Workflow定義で完結する。Digdag実行するまでどういうKubernetes Job manifestが実行されるかわからなかったものが、事前にわかるようになった
    • ワークフローの可読性が向上!したと思う。ワークフロー定義のシンタックスチェックも可能に
    • CI/CDがしやすくなり続くGitOpsにつながる
  • 実行時間が改善。ものによっては80%の短縮。1m40sかかっていた処理が20sに
    • 1ステップ内で毎回やっていたgit pull, kubernetes clusterへの接続等が不要になったため
  • External Secretsを使う等のセキュリティ対策が進んだ
  • 約100こあったワークフローは整理すると50こ程度にまで削減できた
  • 移行にかかった時間としては、ものにもよるが1ジョブあたり0.5~15営ほどかかりました
    • 入社して1ヶ月頃から始めた仕事だったので既存のバッチ処理群について知っていることが何もなかったのでひたすら調査にかかる時間が多かったです
  • 運用、管理プロセスがシンプルに。管理するリポジトリが一つ減り、使うツールも減った。
  • 移行作業による障害はなかった
RAS(I)S 結果
Reliability
(信頼性)
💮 バッチ処理系の基盤が安定。週1,2回発生していたエラーが解消された。
Availability
(可用性)
💮 Workflow ControllerのHA構成。GKEクラスタのアップグレードもしやすくなった。実行時間の改善、ものによっては80%の短縮、1m40sかかっていたものが20sに。自動化されたリトライ
Serviceability
(保守性)
💮 コードの重複の排除と独立性が高まった。ワークフローのコード行数が82%削減、1ワークフローあたり平均1600行だったものが平均300行に。管理が必要なリポジトリ、ツールが減った。各ワークフローごとにREADMEを記載、リトライ方法やリカバリ手順を記載。本番環境リリース前に変更差分が確認できる、ステージング環境等での動作確認が可能。ワークフロー定義のシンタックスチェックが可能。柔軟なリトライ方法を用意。ログの集約。実行状況の可視化
Security
(安全性/気密性)
💮 RBAC認可。External Secretsの導入。Cloud IAP

以上、こちらはPTAアドベントカレンダーの7日目の記事でした。

脚注
  1. The Twelve-Factor App https://12factor.net/ja/codebase ↩︎

  2. Beyond the Twelve-Factor App https://tanzu.vmware.com/content/blog/beyond-the-twelve-factor-app ↩︎

  3. argoproj/argo-workflows issue https://github.com/argoproj/argo-workflows/issues/5464 ↩︎

Discussion

ログインするとコメントできます