👓
Google Cloud Datastreamを死活監視してみた
背景
データパイプラインの一環でDatastreamを運用している中で、ボトルネックがあることに気づき、死活監視をすることにしました。これを記事としてまとめていこうと思います。
ここでいうボトルネックとは以下の点です。
- リソースであるCloudSQLへの通信障害等により、その間のデータを連携することができなくなる
- Datastream自体の障害や、誤って停止してしまった場合に気付けない
- Datastreamはシングルリージョンしか設定できない
方法
構成
構成は非常にシンプルです。
- CloudFunctionsには、DatastreamのステータスをチェックしてSlackに通知する処理を実装します。メモリは128MBで充分です。
- CloudSchedulerが5分おきに対象のCloudFunctionsを実行するよう設定します。
- SlackのIncoming Webhookは事前に設定しておきます。
サンプルコード(CloudFunctions)
Terraform
main.tf
resource "google_cloudfunctions_function" "alive_monitoring" {
name = "<Your-function-name>"
region = "<Your-region>"
runtime = "python310"
available_memory_mb = 128
source_archive_bucket = "<Your-bucket>"
source_archive_object = "<Your-object>"
trigger_http = true
entry_point = "alive_monitoring"
service_account_email = "<Your-service-account-email>"
timeout = 540
environment_variables = {
GCP_PROJECT = "<Your-project>"
SLACK_WEBHOOK_URL = "<Your-slack-webhook-url>"
SLACK_CHANNEL = "<Your-slack-channel>"
SLACK_MENTION = "<!subteam^${Your-group-id}>"
STREAM_LOCATION = "<Your-stream-location>"
STREAM_NAME = "<Your-stream-name>"
}
}
SLACK_WEBHOOK_URLは事前に設定したものを使います。
SLACK_CHANNELは先頭の#
は不要です。
SLACK_MENTIONの書き方はこちらを参考にしてみてください。
ユーザー、グループ、channel等にメンションできます。
ちなみに、グループIDはSlackAPIを利用して取得することができるのですが、筆者は面倒だったのでブラウザ版Slack上で開発者ツールを開いて、グループメンションをクリック時のPOSTメッセージから見つけました。
Python
requirements.txt
google-api-python-client
main.py
import os
import requests
import json
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
SLACK_MENTION = os.environ['SLACK_MENTION']
PROJECT_ID = os.environ['GCP_PROJECT']
def alive_monitoring(request):
alive_monitoring_datastream()
return 'status check completed.', 200
def alive_monitoring_datastream():
locations = os.environ['STREAM_LOCATION']
stream_name = os.environ['STREAM_NAME']
try:
datastream = build('datastream', 'v1alpha1')
# Get the stream
stream_path = f'projects/{PROJECT_ID}/locations/{locations}/streams/{stream_name}'
response = datastream.projects().locations().streams().get(name=stream_path).execute()
# Check the stream's state
stream_state = response['state']
if stream_state not in {'RUNNING', 'CREATING', 'UPDATING'}:
print(f"Stream '{stream_name}' is in an unexpected state: {stream_state}")
message = (
f"⚠️ An issue was detected with Datastream. \n"
f"Stream '{stream_name}' is in an unexpected state: {stream_state} \n"
"Please check the logs and investigate the problem."
)
send_slack_notification(message)
except HttpError as error:
print(f"An error occurred: {error}")
message = (
f"⚠️ An issue was detected with Datastream. \n"
"A system failure may have occurred or the stream may have been deleted.\n"
"Please check the logs and investigate the problem."
)
send_slack_notification(message)
raise
def send_slack_notification(message):
payload = {
"channel": SLACK_CHANNEL,
"text": "{} {}".format(SLACK_MENTION, message),
}
requests.post(SLACK_WEBHOOK_URL, json.dumps(payload), headers={'Content-Type': 'application/json'})
このコードの仕様は次の通りです。
- Datastreamのステータスが
RUNNING
CREATING
UPDATING
でない場合は、ステータスが"実行中"でない旨の通知をします。参照元リソース(例えばCloudSQL)からストリーミングできないことによるERROR
ステータス、誤って停止をしてしまった、等が該当します。 - Datastreamのステータスが確認できない(= exceptブロック)場合は、Datastreamに障害が発生してる可能性がある旨の通知をします。Datastreamのリソース自体を削除した場合もこのエラーとなります。
Datastreamのステータスチェックは、googleapiclientを使用しています。
まとめ
Datastreamは比較的新しいサービスの為、CloudMonitoringのMetricsではリソース自体のステータスは見ることができませんでした。
今回は5分おきに死活監視をしてますが、もう少しリアルタイムで通知できる仕組みが作れると良いですね。
株式会社クロスビットでは、デスクレスワーカーのためのHR管理プラットフォームを開発しています。
一緒に開発を行ってくれる各ポジションのエンジニアを募集中です。
Discussion