👓

Google Cloud Datastreamを死活監視してみた

2023/05/08に公開

背景

データパイプラインの一環でDatastreamを運用している中で、ボトルネックがあることに気づき、死活監視をすることにしました。これを記事としてまとめていこうと思います。

ここでいうボトルネックとは以下の点です。

  • リソースであるCloudSQLへの通信障害等により、その間のデータを連携することができなくなる
  • Datastream自体の障害や、誤って停止してしまった場合に気付けない
  • Datastreamはシングルリージョンしか設定できない

方法

構成

構成は非常にシンプルです。

  • CloudFunctionsには、DatastreamのステータスをチェックしてSlackに通知する処理を実装します。メモリは128MBで充分です。
  • CloudSchedulerが5分おきに対象のCloudFunctionsを実行するよう設定します。
  • SlackのIncoming Webhookは事前に設定しておきます。

サンプルコード(CloudFunctions)

Terraform

main.tf

resource "google_cloudfunctions_function" "alive_monitoring" {
  name                  = "<Your-function-name>"
  region                = "<Your-region>"
  runtime               = "python310"
  available_memory_mb   = 128
  source_archive_bucket = "<Your-bucket>"
  source_archive_object = "<Your-object>"
  trigger_http          = true
  entry_point           = "alive_monitoring"
  service_account_email = "<Your-service-account-email>"
  timeout               = 540

  environment_variables = {
    GCP_PROJECT = "<Your-project>"
    SLACK_WEBHOOK_URL = "<Your-slack-webhook-url>"
    SLACK_CHANNEL = "<Your-slack-channel>"
    SLACK_MENTION     = "<!subteam^${Your-group-id}>"
    STREAM_LOCATION = "<Your-stream-location>"
    STREAM_NAME = "<Your-stream-name>"
  }
}

SLACK_WEBHOOK_URLは事前に設定したものを使います。

SLACK_CHANNELは先頭の#は不要です。

SLACK_MENTIONの書き方はこちらを参考にしてみてください。
ユーザー、グループ、channel等にメンションできます。

ちなみに、グループIDはSlackAPIを利用して取得することができるのですが、筆者は面倒だったのでブラウザ版Slack上で開発者ツールを開いて、グループメンションをクリック時のPOSTメッセージから見つけました。

Python

requirements.txt
google-api-python-client
main.py
import os
import requests
import json
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
SLACK_MENTION = os.environ['SLACK_MENTION']
PROJECT_ID = os.environ['GCP_PROJECT']

def alive_monitoring(request):
  alive_monitoring_datastream()

  return 'status check completed.', 200

def alive_monitoring_datastream():
  locations = os.environ['STREAM_LOCATION']
  stream_name = os.environ['STREAM_NAME']

  try:
    datastream = build('datastream', 'v1alpha1')

    # Get the stream
    stream_path = f'projects/{PROJECT_ID}/locations/{locations}/streams/{stream_name}'
    response = datastream.projects().locations().streams().get(name=stream_path).execute()

    # Check the stream's state
    stream_state = response['state']
    if stream_state not in {'RUNNING', 'CREATING', 'UPDATING'}:
      print(f"Stream '{stream_name}' is in an unexpected state: {stream_state}")

      message = (
        f"⚠️ An issue was detected with Datastream. \n"
        f"Stream '{stream_name}' is in an unexpected state: {stream_state} \n"
        "Please check the logs and investigate the problem."
      )
      send_slack_notification(message)

  except HttpError as error:
    print(f"An error occurred: {error}")
    message = (
      f"⚠️ An issue was detected with Datastream. \n"
      "A system failure may have occurred or the stream may have been deleted.\n"
      "Please check the logs and investigate the problem."
    )
    send_slack_notification(message)

    raise

def send_slack_notification(message):
  payload = {
    "channel": SLACK_CHANNEL,
    "text": "{} {}".format(SLACK_MENTION, message),
  }
  requests.post(SLACK_WEBHOOK_URL, json.dumps(payload), headers={'Content-Type': 'application/json'})

このコードの仕様は次の通りです。

  • DatastreamのステータスがRUNNING CREATING UPDATINGでない場合は、ステータスが"実行中"でない旨の通知をします。参照元リソース(例えばCloudSQL)からストリーミングできないことによるERRORステータス、誤って停止をしてしまった、等が該当します。
  • Datastreamのステータスが確認できない(= exceptブロック)場合は、Datastreamに障害が発生してる可能性がある旨の通知をします。Datastreamのリソース自体を削除した場合もこのエラーとなります。

Datastreamのステータスチェックは、googleapiclientを使用しています。

まとめ

Datastreamは比較的新しいサービスの為、CloudMonitoringのMetricsではリソース自体のステータスは見ることができませんでした。
今回は5分おきに死活監視をしてますが、もう少しリアルタイムで通知できる仕組みが作れると良いですね。


株式会社クロスビットでは、デスクレスワーカーのためのHR管理プラットフォームを開発しています。
一緒に開発を行ってくれる各ポジションのエンジニアを募集中です。

https://x-bit.co.jp/recruit/

https://herp.careers/v1/xbit

https://note.com/xbit_recruit/

クロスビットテックブログ

Discussion