【Cloud Run Jobs】定期実行で排他制御を実現する
はじめに
こんにちは! テラーノベルでサーバーサイドを担当している@yuhasです。
最近は業務でCloud RunやCloud Run Jobsを使うことが多いのですが、その中でCloud Run Jobsの定期実行での排他制御を実現しました。これは色々と工夫が必要なことで、そのやり方を紹介します。
定期実行での排他制御とはつまり、毎分の定期実行を行いつつも、同時に複数の実行が行われないようにするためにはどう制御するか、というような話になります。
Cloud Run JobsとCloud Schedulerについて
Cloud Run Jobsはコンテナの実行環境です。コンテナのイメージを渡してあげるだけで簡単に実行することができます。実行間隔や実行時間についてはさまざまな制御を行うことができますが、Cloud Schedulerを利用するのが簡単です。Cloud Run Jobsの「トリガー」タブからCloud Schedulerの設定をすることができ、毎分実行するであるとか毎日0時に実行するという設定を行うことができます。
排他制御を行いたいモチベーション
そこで、実行間隔よりも長くなる(可能性のある)プロセスを実行するとどうなるでしょうか?
たとえば、以下のGoプロセスをコンテナ化し、Cloud Run Jobsで毎分実行するようにしたとしましょう。(ビルドやDockerfileについては省略)
package main
import (
"time"
)
func main() {
longProcess()
}
func longProcess() {
time.Sleep(90 * time.Second)
return
}
すると、90秒かかる実行を60秒ごとに行うことになり、複数の実行が同時に行われることになります。
要件によっては、このような複数の実行が被ることを許容できる場合もありますが、今回私の扱った業務では、毎分の実行をしつつも、複数の実行が同時に行われないようにしなければなりませんでした。
つまりこういう処理(排他制御)を実現したかったのです。
- 10:05:00 実行1が開始
- 10:05:30 30秒で実行1が終了
- 10:06:00 実行2が開始
- 10:07:00 実行3を開始したいが、実行2が終わってないので、実行3をスキップ
- 10:07:10 ちょっと長くなってしまい、70秒で実行2が終了
- 10:08:00 実行4が開始
・・・
ちなみに、今回なぜこうしたかったかというと、各実行でデータベースに対して書き込みを行っていて、複数の実行が同時に行われると同一のデータを複数回書き込んでしまう可能性があったためです。
どうやって排他制御を実現するか
Cloud Run JobsやCloud Schedulerのレイヤーだけではこの排他制御を実現することはできないようです。コンテナ内部で実行するプロセスの内部で排他制御を実現する必要があります。
今回、Cloud Run Admin APIをコンテナ内部のプロセスから呼び出すことで、排他制御を実現しました。
先ほどのコードを以下のようにしてみましょう。
package main
import (
"fmt"
"log"
"os"
"strings"
"time"
"google.golang.org/api/run/v1"
)
func main() {
if isOtherExecutionRunning, err := isOtherExecutionRunning(); err != nil {
log.Fatal(err)
} else if isOtherExecutionRunning {
log.Println("other execution is running")
return
}
longProcess()
}
func isOtherExecutionRunning() (bool, error) {
// Cloud Run Admin APIのクライアントを作成する。
s, err := run.NewService(ctx)
if err != nil {
return false, err
}
// Cloud Run Admin APIのExecution List APIを呼び出す。
// Jobの実行(すでに実行完了したものも含む)を一覧で返す。
// 同一プロジェクトのCloud Run Jobsの実行について、開始時刻の降順でソートされて返される。
// プロジェクトの規模にもよるが、100件ほど取得すれば良い。
// https://cloud.google.com/run/docs/reference/rest/v1/namespaces.executions/list
hostName := "asia-northeast1-run.googleapis.com"
projectID := "your-project-id"
listExecutionLimit := 100
s.BasePath = fmt.Sprintf("https://%s/", hostName)
resp, err := s.Namespaces.Executions.List(fmt.Sprintf("namespaces/%s", projectID)).Limit(listExecutionLimit).Do()
if err != nil {
return false, err
}
// 現在の実行名を取得する。Cloud Run Jobsで実行する場合に取得できる環境変数。
// 例: hogehoge-12345-abc
// hogehogeがJobの名前(prefix)である。そこに"-"をつけ、それ以降は実行ごとにランダムな文字列が付与される。
executionName := os.Getenv("CLOUD_RUN_EXECUTION")
// Jobの名前のこと
executionPrefix := "hogehoge"
// レスポンスのExecutionの配列をみていく。現在の実行と同一のJobで実行中のものがあるかどうかを見る。
for _, v := range resp.Items {
// CompletionTimeが空文字列でない場合は、実行完了したものであるから、次を見る。
if v.Status.CompletionTime != "" {
continue
}
name := v.Metadata.Name
// 現在の実行名が実行中としてレスポンスに含まれるのは当然のことであるので、次を見る。
if name == executionName {
continue
}
prefix := fmt.Sprintf("%s-", executionPrefix)
// "hogehoge-12346-abd"や"hogehoge-12347-abe"のような実行名があれば、同一Jobの別実行が実行中ということになり、trueを返す。
// ここの判定方法はいろんなやり方があると思う。
if strings.HasPrefix(name, prefix) && strings.Count(name, "-") == strings.Count(prefix, "-") {
return true
}
}
return false
}
func longProcess() {
time.Sleep(90 * time.Second)
return
}
コードのコメントに書いた通りなのですが、こちらでも簡単に説明します。
まず、Cloud Run Admin APIのExecution List APIを叩きます。
すると、指定したプロジェクトにおけるCloud Run Jobsの「実行」の一覧を取得できます。この中に、現在実行中のJobと同一Jobの別実行が存在すれば、自身の実行を終了させれば良いわけです。より噛み砕いていえば、Jobの名前が「"hogehoge"」で、現在の実行名が「"hogehoge-12345-abc"」だったら、「"hogehoge-12346-abd"」や「"hogehoge-12347-abe"」といった名前の実行が実行中ということを確認できれば、自身の実行を終了させるというわけです。
この一覧の取得は、実行開始時刻の降順でソートされて返されます。その中から、
- 実行が完了していないもの
- 自身の実行名と同一ではないもの
- 自身の実行名と同一のPrefixを持っているもの
をすべて満たす実行があれば、それが同一Jobの別実行になります。そのようなものがあれば、自身の実行を終了させます。これで排他制御を実現できます。
まとめ
排他制御は、定期実行を扱う上で欲しくなる要件の一つだと思います。
それをCloud Run JobsやCloud Schedulerのレイヤーのみで実現できたら嬉しいですが、それは難しそうです。ここで示したように、アプリケーションの中で実装をしてあげる必要があります。
Discussion