Cloud Runジョブの実行ステータスをモニタリングするツールを作ってみる【コード編】
はじめに
唐突ではありますが、皆さんCloud Runジョブは利用されていますか?
Cloud Run ジョブは、Google Cloudのマネージドサービス「Cloud Run」の一機能で、バッチ処理や非同期タスクを実行するためのものです。Cloud Runは、コンテナ化されたアプリケーションを自動的にスケーリングして実行できるサービスですが、ジョブはその中で、指定されたコンテナを特定のタスクが完了するまで実行するための仕組みです。
当社では、自動化装置・設備向けの特注部品や金型用部品、工具・消耗品を扱うECサイトを運用しており、検索のためのデータベース更新にCloud Runジョブを利用しています。
皆さんは、管理するジョブが増えるにつれ、以下のような不安を感じたことはありませんか?
- Cloud Runでジョブをデプロイしたけれど、ちゃんと実行されているか不安。
- 念のため確認しておきたいけれど、コンソール上で毎日確認するのは少し手間。
もちろんこうした不安や手間をなくすために、エラー実装の共通クラス化や、ログ出力の仕組みを検討されるとは思うのですが、念のために汎用的なモニタリング(監視)の仕組みは構築しておきたいかと思います。
今回は、上記のモニタリングを実装するための、前準備のコードについてご紹介させていただきます。
(参考)
Cloud Run とは
手順
① ジョブの実行結果の取得
まずはGoogleから提供されているドキュメントを参考に、指定のプロジェクトID、ロケーションの、実行が失敗しているジョブIDから、ジョブの実行結果(execution)を取得するコードを作成します。
from google.cloud import run_v2
project_id = 'test-project-id'
location = 'asia-northeast1'
job_id = 'test-job-001'
execution_id = 'test-job-001-xxxx'
client = run_v2.ExecutionsClient()
execution_name = f"projects/{project_id}/locations/{location}/jobs/{job_id}/executions/{execution_id}"
execution = client.get_execution(name=execution_name)
print(execution)
(参考)
Package Classes (0.10.10)
② executionの中身を確認
①で取得したexecutionの中身を確認します。長いため抜粋しますが、condition.type_が「Completed」の、condition.stateがジョブの実行判定に使えそうです。
ジョブが成功しているときは、「CONDITION_SUCCEEDED」、失敗しているときは「CONDITION_FAILED」と表示されます。
conditions {
type_: "Completed"
state: CONDITION_FAILED
last_transition_time {
seconds: xxxxxxxxxx
nanos: xxxxxxxxx
}
}
③ 試作コードの作成
①②の内容をもとに、プロジェクト内の各ジョブの、最新のジョブIDの実行結果を取得するpythonコードを作成します。
今回は試作ということで、printによる大雑把な出力ではありますが、それぞれの実行結果が一括で取得できています。
from google.cloud import run_v2
def get_all_jobs_latest_execution_status(project_id, location):
client = run_v2.JobsClient()
execution_client = run_v2.ExecutionsClient()
# プロジェクトとロケーション内の全てのジョブをリスト
parent = f"projects/{project_id}/locations/{location}"
jobs = client.list_jobs(parent=parent)
for job in jobs:
job_name = job.name.split('/')[-1]
print(f"Job Name: {job_name}")
# 最新の実行を取得
latest_execution = None
executions = execution_client.list_executions(parent=job.name)
for execution in executions:
if latest_execution is None or execution.create_time > latest_execution.create_time:
latest_execution = execution
if latest_execution:
execution_id = latest_execution.name.split('/')[-1]
print(f"Latest Execution ID: {execution_id}")
# 実行ステータスを取得
execution_name = latest_execution.name
execution = execution_client.get_execution(name=execution_name)
for condition in execution.conditions:
if condition.type_ == "Completed":
print(condition.state)
else:
print("No executions found for this job.")
if __name__ == "__main__":
project_id = 'test-project-id' # プロジェクト ID を指定
location = "asia-northeast1" # ジョブのロケーションを指定
get_all_jobs_latest_execution_status(project_id, location)
出力結果
Job Name: test-job-001
Latest Execution ID: test-job-001-xxxx
State.CONDITION_FAILED
Job Name: test-job-002
Latest Execution ID: test-job-002-xxxx
State.CONDITION_SUCCEEDED
Job Name: test-job-003
Latest Execution ID: test-job-003-xxxx
State.CONDITION_SUCCEEDED
おわりに
いかがでしょうか。
今回の記事では、Cloud Runジョブの実行ステータスをモニタリングするツールの、前段の部分についてご紹介させていただきました。
次回の記事では、日次でモニタリングできるように、③でご紹介したコードを改修し、Cloud Run関数(旧Cloud functions)とCloud Monitoringを併用した運用自動化、自動通知についてご紹介できればと思います。
余談
はじめにの部分で、ECサイトの運営について軽く触れさせていただきましたが、以下のように、当社のこれまでの検索の取り組みについて、ブログ外でもご紹介させていただいております。
是非こちらもご覧になってください!
「欲しい商品」を探せない要改善のECサイト、検索精度を高めるには?
BtoB 大企業の内製ベクトル検索エンジン: Vertex AI Vector Search 移行と高付加価値への挑戦
Discussion