🥭

Cloud Run ジョブを利用してバッチ処理の機構を作る

2024/07/05に公開

はじめに

先日からお仕事で、お客さんがサービスの効果を知ることができるような分析機能を搭載するべく、その機能をチームでもりもり開発しているのですが、その際のバッチ処理の設計や、利用した Cloud Run ジョブについてお話しようと思います。

Cloud Run ジョブの利用は私たちの開発組織では初めての試みとなりました。これまでのバッチ系処理っぽいものは、サーバとして利用している Cloud Run サービスにバッチ用エンドポイントを生やして開発するのが普段の実装手段でした。

本記事では、私たちのチームが今回開発に携わった機能概要をさらっとご紹介した上で、

  • Cloud Run ジョブの概要
  • Cloud Run ジョブの選定理由(Cloud Run サービス(REST API を生やす手法)との違いを中心に)
  • バッチ処理の全体設計や悩みポイント
  • 運用上のハマりポイント
    などを中心に、全体感をざっくりとお話していきます。

Cloud Run ジョブが気になる方や、バッチ処理の設計についてご興味がある方は、ぜひ読んでみてもらえると嬉しいです。

開発機能概要

まず前段で、私たちがどんなモノづくりをしているかについてですが、非正規雇用向けに採用を中心とした角度からお客さんのお悩みを解決していくサービスを日々開発しています。お客さんはサービスを利用して、求職者に対してさまざまなアプローチができます。

https://casting-one.jp/castingone/

今回私たちのチームが開発を行ったのは、お客さんが行ったアプローチの効果測定や、サービスに蓄積されている求職者に関する資産を見える化する、分析の機構です。イメージ的には Google アナリティクスに近い(まだあんなにすごい感じには仕上がっていませんが……!)、さまざまなサービスのデータをお客さんにお見せする装いで、期間や対象をお客さんが指定すると、前日までに関する効果や資産を見てもらうことができます。

この前日まで、というのが大きなポイントで、深夜にすべてのお客さんのその日までのデータを一気に分析用データに変換することが必要でした。そこで、バッチ処理が必要になったというわけです。

Cloud Run ジョブを技術選定した理由

前述の通り、これまでもバッチっぽい処理を行う開発は、他でもありました。その際は、主要サーバー(CRUD を中心としたリクエストも受け付けるようなもの)にバッチ用エンドポイントを生やす形で開発をしていました。

今回においては、深夜の一括バッチ実行ということで、処理するデータ量は他のバッチ系処理に比べてもある程度多いと、最初に要件を聞いた段階から考えました。また、サービスやお客さんの利用数が拡大するとともに、分析用データに変換する物量も増えていくことは容易に想像がつきます。

そこで、まず従来通り Cloud Run サービスを利用するのに心配になった点が、タイムアウト問題でした。
Cloud Run  サービスのタイムアウトは 1 時間のため、この期間に終われるかが気がかりだった形です。

https://cloud.google.com/run/docs/configuring/request-timeout?hl=ja

※結論から言いますとこのタイムアウト問題は全くの杞憂でした。現在は、1 時間は全くかからずに処理が終わっています。

このタイムアウト問題を解消できそうだとして白羽の矢が立ったのが、Cloud Run ジョブです。
タイムアウト最大設定時間は 24 時間になっており、名前の通り圧倒的に一括バッチ処理向けだなと感じました。

https://cloud.google.com/run/docs/configuring/task-timeout?hl=ja

ここで、Cloud Run ジョブを利用するのはどうか、という話になり、従来の方法と比べたメリット・デメリットなどを洗い出すことになりました。

Cloud Run ジョブと Cloud Run  サービスを比べたメリット・デメリットの整理を行ったところ、主にメリットは 3 つありました。それが以下です。

  • タイムアウトが長いため、ボリュームの大きい処理向き
  • ジョブのため 1 つの処理に特化しており、REST API のように他の全く異なる種類のリクエストとリソースを食い合わない
  • スケールアップ調整がしやすい

1 つ目は先ほど紹介したため、2 つ目,3 つ目についてお話をします。

2 つ目については前述の通り、現在の当社の場合はバッチ系エンドポイントが通常 REST API リクエストを受け付けるメインサーバに相乗りしているため、バッチ系処理と通常処理の両方を合わせて、Cloud Run のリソースはインスタンス全体が 1 つになっています。万が一今回の分析バッチを行っている時間帯になんらかの理由で別のリクエストが増加した場合、リソースが分散してしまう可能性がありました。Cloud Run ジョブは実行の都度インスタンスが立ち上がるため、他のリソース状態に左右されない独自の処理機構を作れるのは大きなメリットだと思いました。

3 つ目は 2 つ目のリソースと同じような話なのですが、たとえば分析バッチがメモリを多く利用する処理があったとして、既存の Cloud Run サービスのメモリ量では足りない場合、他の REST API の処理には十分に余裕があるにも関わらずスケールアップを検討する必要があります。ここをジョブとして独立させることにより、Cloud Run ジョブの利用メモリに最適化させてスケールアップに臨むことができます(実際の導入時にメモリの最適化処理を行えたので、その際にも改めてとても良いなと思いました)。

一方で Cloud Run サービスを従来通り利用するメリットは、なんといっても実装容易性でした。

既にある仕組みを利用するということでそのメリットは大きいように思えましたが、同じ Cloud Run という仲間である以上、万が一 Cloud Run ジョブでうまく行かなかったとしたら、処理を従来の Cloud Run サービスに合わせて移植すれば良さそうだったため、試しやすそうというひと押しもあり、導入してみることを決定しました。

結論、分析バッチをそれ単体で安定して稼働するために、Cloud Run ジョブに独立させる、という選択はとても良いように感じました。

開発体験の良さ

そんなこんなで初導入だったのですが、Cloud Run サービスを普段から利用している私たちにとって、Cloud Run ジョブのチャレンジはそんなに難易度の高いものではありませんでした。同じ Cloud Run サービスということで、ドキュメントの見方も仕組みもインフラ付近の痒いところに手が届かない……的な細かい設定も非常に似ており、むしろ開発体験はとても良かったです。

Cloud Run ジョブの開発体験の良さについても、いくつか感じたところがあったので以下に残してみます。

  • コマンド化と相性が良い
  • 実行が容易
  • Google Cloud Scheduler との相性が良い

Cloud Run ジョブにはコマンドや引数を設定することができます。
Go はコマンドラインツールを比較的簡単に作ることができるため、相性が良いように感じました。

また、分析バッチは深夜の一括実行とは別に、万が一不具合が起きた時の再実行や開発中のテスト実行などに備えて、集計日時と集計範囲を指定して実行する……といった利用方法を検討していたため、デフォルトは一括実行にした上で、コマンドによって範囲指定して処理を実行する形に変えるといった使い方が可能でした。

CLI 作成が簡単にできる Cobra を利用して、イメージとしては以下のようなエントリーコードになっています。

https://github.com/spf13/cobra

main.go
func main() {
	cnf := config.Get()

	// 省略

	cmdRoot := cmd.NewCmdRoot()
	if err := cmdRoot.New().ExecuteContext(ctx); err != nil {
		logger.ErrorWithContext(ctx, err.Error())
		os.Exit(1)
	}
}
root.go
package cmd

import (
	"fmt"

	pkgerrors "github.com/pkg/errors"
	"github.com/spf13/cobra"
)

type CmdRoot struct {
	UseCase *application.UseCase
}

func NewCmdRoot() *CmdRoot {
	return &CmdRoot{
		UseCase: application.NewUseCase(),
	}
}

type target struct {
	ID   string
	Date string
}

var t target

func (r *CmdRoot) New() *cobra.Command {
	cmd := &cobra.Command{
		Use: "app",
	}

	allCmd := &cobra.Command{
		Use: "all",
		RunE: func(cmd *cobra.Command, args []string) (err error) {
			err = r.UseCase.BatchAll(ctx)
			if err != nil {
				return fmt.Errorf("❌ BatchAll Error: %w", err)
			}
			return nil
		},
	}
	cmd.AddCommand(allCmd)

	targetCmd := &cobra.Command{
		Use: "target",
		RunE: func(cmd *cobra.Command, args []string) (err error) {
			if t.ID == "" || t.Date == "" {
				return pkgerrors.New("ID または Date が指定されていません")
			}

			err = r.UseCase.BatchTarget(ctx, t.ID, t.Date)
			if err != nil {
				return fmt.Errorf("❌ BatchTarget Error: %w", err)
			}

			return nil
		},
	}
	return cmd
}

実行は以下のように、ローカル開発と Cloud Run ジョブ上とで、似た形で行うことができます。普段は REST API を叩く必要があるため、なんだかんだめちゃくちゃ色々楽だなと思いました。

# Localの実行
$ go run main.go tenants
$ go run main.go target --IDs=1 --Date=20240703

# Cloud Run ジョブの実行
$ gcloud run jobs execute [ジョブ名] --region="asia-northeast1" --project="[プロジェクト名]"
$ gcloud run jobs execute [ジョブ名] --region="asia-northeast1" --project="[プロジェクト名]" --args="target,--ID=1,--Date=20240703"

決まった時間になんらかの処理を行う、というのがジョブ機構の定石だと思いますが、それは GCP ももちろんそのように考えてもらっているらしく、Google Cloud Scheduler との連携方法が公式にきちんと載っており、つなぎ込みが容易だったのも嬉しいポイントでした。

https://cloud.google.com/run/docs/execute/jobs-on-schedule?hl=ja#terraform

システム全体設計

今回の分析バッチ処理について、最初に完成システムは以下のようになっていると良さそうということでまとめていました。

  • パフォーマンス
    • 深夜 xx 時〜yy 時までにすべての対象テナントの分析バッチ処理が終了していること
    • アクセスするデータベースは他サーバからアクセスされるものと同一であること
    • テナント数の増加やデータ量増加に伴い全集計処理期間が遅くなった際に検知できること
  • 拡張性
    • パフォーマンスが厳しくなった場合に備えること
  • 運用
    • テスト実行や不具合確認対応のため任意の日時で 1 テナントだけ実行する手段があること

※サービスはテナントという単位で各々分析に必要な集計を行うため、テナントという表現が登場しています。

こんな感じで今回のシステムにおいて重要そうな観点を整理しつつ実際に開発を進めた上で、今回私たちのチームが Cloud Run ジョブを起点に作成した分析バッチの機構は以下の通りです。

※運用観点については、すでに説明した通り、コマンドを利用することで実現させることができているので、以下の説明では省略しています。

設計において考慮したのは以下 2 点です。

  • 再試行を加味した設計
  • 処理時間と DB 並行アクセス上限を加味した並行処理の設計

まず再試行を加味した設計についてですが、前提 Cloud Run ジョブには再試行回数を定めることができます。

https://cloud.google.com/run/docs/configuring/max-retries?hl=ja

万が一あるテナントの処理がそのテナント起因でなんらかのエラーになりジョブ全体が失敗で終了した場合、もう一度最初から全テナントを集計しないようにするべく、履歴ログのテーブルをデータベースに作成しておき、管理するようにしました。たとえば A〜E までのテナントが分析バッチ対象の際に、A・B・C が処理を終了してすでに終了フラグを書き込んでいた場合、再実行の際には D・E の処理のみを行わせるようにした形です。

再試行時にすでに無事に処理を終えたテナントをスキップすることで、全体の処理に無駄が出ないように設計した形です。

また、テナントによる分析集計項目は大枠いくつか存在するのですが、更新トランザクションは 1 つにまとめることにより、「テナントの集計は全くないか、すべて完了しているか」のどちらかにするようにしました。テナント単位では途中までの集計は破棄されることになりますが、途中から処理が再開されることで起きる不具合を防止する方を選んでいます(この部分における処理効率は捨てた形になりました)。

履歴ログテーブルの作成は主に再試行時の考慮が大きいですが、このデータがあると日々の処理開始・処理終了時間が記録されていくので、パフォーマンスの観測的にも何か役に立つかもしれないと思っています。

続いてジョブを設計する時に悩みのタネになりがちな並行処理についてですが、今回は 1 つのジョブ実行において、アプリケーションコードの中で指定した上限並列数ずつ実行する(goroutine)、最低限の並行処理としました。

Cloud Run ジョブには、ジョブ実行そのものを並列で動かす方法も存在していましたが、この方法を取る場合はどう並行処理を管理するかを含めて、より設計が複雑になってきます。このようにはせず、今回最低限の並行処理に止めた理由は、現段階ではそれほどデータを処理する量に対してパフォーマンス(処理時間の制限)が厳しくなかったためです(データの増加具合やバッチ処理量の増加等の変数を加味しても、1 年以上は大規模な並列処理を設計しなくても耐えられそうという肌感)。

処理するデータ量が多く時間制限がシビアであればあるほど並行処理は時間短縮に有用になりますが、その代わり内部構造が複雑になりがちです。拡張性という観点において、最初はパフォーマンスが厳しくなった時に移行できるような設計にしたいと考えていましたが、直近全く困らない形なのであれば一旦据え置きにしてシンプルにスタートするのが良い、という判断にした形です。

このあたりの判断は、私たちのサービスが非常に大規模で安定しているフェーズというよりは、まだテナント数に伸び代を持つ時期で、どんどん機能数を足していっているフェーズ、ということも一因としてあったと思います。

と偉そうな感じで言ってしまいましたが、私はチームで話し合った際に、並行処理は完璧に拡張性のあるものを設計したくそのドラフトを出した側の人間なのですが、冷静なチームの方々によって「too much だよね……」みたいなフィードバックをもらいました(冷静な方のご意見は大切)。😇

ちなみに、普段メインで利用している REST API サーバと同じデータベースにアクセスすることからも、あまりジョブのインスタンスを増やしたりしてリソースが分散した場合は、データベースの同時アクセス数の方も気にしなければならないことに気づいた、というのもあります。

おわりに

ということで、振り返ってみると最終的な全体設計はオーソドックスな形に落ち着いたと思っています。
しかし、Cloud Run ジョブの検証から全体設計、導入まで色々とプロセスを考えるのは、1 つとても学びになりました。

何度も言っているかもしれませんが、Cloud Run サービスと Cloud Run ジョブは非常に似ているところがあり、どちらかの開発に慣れている場合は非常にとっつきやすかったなというのが感想です。

一方で提供がサービス/ジョブと切り分かれている以上は色々と細かいところで違いもあり、特に開発終盤の監視回りを整えていた際には、よくハマった記憶があります。取れるメトリクスが違ったり、トレースの設定回りで少しサービスと異なる設定をしなければならなかった……などなど。

Cloud Run ジョブのお話から、Cloud Run ジョブを利用してチームで開発した分析バッチの設計概要まで、全体的に長々とお話してしまいましたが、ここまでお読みいただきありがとうございました! 今回の記事が、何かのためになっていたら嬉しいです。

いつもの

https://www.wantedly.com/projects/1130967

https://www.wantedly.com/projects/768663

https://www.wantedly.com/projects/1244229

Discussion