BigQueryの大きいクエリを検知してSlackにアラートを送る
はじめに
BigQueryはクエリが読み込むデータが大きいほど、お金がかかります。
なかには 150万円溶かしたひともいるとか・・・
社内の状況として、BigQueryユーザ当たりの一日の利用上限の設定は こちら ですでに実装済みでした。
しかし、上限内で大きなクエリが流れる事象が頻発していました。
これに対して上限を引き下げると、本当に使いたい時に使えなくなりそれはそれでいまいち。
そのため今回の実装で、「さっきのクエリ大きかったよ」とユーザにアラートを飛ばすことで費用を気にする習慣を作ってもらいたく、実装しました。
要件
大きいクエリが流れたらSlackに通知する を具体化すると以下。
①トリガのタイミング
クエリ発行時に検知する。クエリの実行自体は許容する。
②トリガ条件条件
クエリのスキャン対象が0.3TB以上でトリガ。
特定のサービスアカウントは通知の対象外にする。
③送信するデータ
ユーザ名(メールアドレスの@より左)
TBの単位でクエリの容量を送る。有効数字は2桁。
④送信先
Slack
全体像
BigQuery:クエリの発行。ログの生成。
Cloud Logging:ログの有効化。
Pub/Sub:メッセージのキューイング。
Cloud Function:関数を実行。(python)
開発手順
0 BigQuery
BigQueryはデフォルトでログが吐き出されています。
なので特に気にしなくて良いです。
今回はその自動で吐き出されているログを使います。
1 Cloud Logging・Pub/Sub
ここでは、pub/subにログをルーティングします。
1-1 ログルーター > シンクを作成
1-2 適当にログルーティングの名前をつけつつ、pub/subトピックを作成。
1-3 シンクに含めるログの選択は以下のコードを記載
ここで機能要件の「0.3TB以上でトリガ」フィルタができます。
しかし、pythonの方が可読性が高い上に、pub/sub自体料金がそんなに高くないので、selectしたものは全件pub/subにキューイングされるようにします。
resource.type="bigquery_resource"
protoPayload.methodName="jobservice.jobcompleted"
1-4 全体像はこんな感じ
2 Slack用のwebhook
2-1 webhookを作成
こちら から作ります。
ログインしてうんたらかんたらしたらURLが発行できます。
3 Cloud Function
3-1 Cloud Function > ファンクションを作成
3-2 基本
環境:第2世代
関数名:適当につける
リージョン:asia-northeast1 (東京)
3-3 トリガー
1で作成したpub/subをトリガに設定します。
3-4 コードの記述
ランタイム:python 3.9(確実に動かしたかったので、このバージョンにしています。)
main.py (pub/subから送られてきたデータを加工しつつslackに送信)
import base64
import json
import requests
# Triggered from a message on a Cloud Pub/Sub topic.
def hello_pubsub(event, context):
pubsub_message = base64.b64decode(event["data"])
message_data = json.loads(pubsub_message)
email = message_data['protoPayload']['authenticationInfo']['principalEmail']
user = email.split('@')[0]
job_statistics = message_data['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobStatistics']
query_bytes_billed = int(job_statistics['totalBilledBytes'])
terabytes_value = query_bytes_billed / (2**40) # バイトをテラバイトに変換
rounded_terabytes = round(terabytes_value, 2) # 結果を有効数字2桁で丸める
# 1TB = 1024^4 bytes
if email != "{除外したいサービスアカウントのメールアドレス}" and rounded_terabytes >= 0.3:
slack_message = f"えぇ〜?先ほどの {user} さんのクエリは {rounded_terabytes}TB だったのかい〜?"
webhook_url = '{webhookのURL}'
requests.post(webhook_url, json={'text': slack_message})
requirements.txt (インストール対象)
functions-framework==3.*
requests==2.*
3-5 デプロイ
デプロイを押します。
4 Slack
4-1 テスト
先ほどのcloud functionのif分岐を弱めたのちに(0.000001TB以上でトリガなど。)BigQueryでクエリをしてみて、データが飛ぶのを確認します。
こんな感じでslackに通知がきたらゲームクリアです。今回はマスオさんにしてみました。
備考:その他に考えた基盤
①BigQuery → monitoring → Slack
ノーコードで構成可能。1時間で1TBスキャンがあった時など、BigQuery全体の利用状況でアラートを飛ばすことができる。ユーザ名が抽出できないため「特定のサービスアカウントは通知の対象外にする」という機能要件を満たさず。
②BigQuery → 割り当てと上限 → monitoring → Outlook → power automate → Slack
ローコードで構成可能。ユーザ一日あたりのクエリした容量を閾値にアラートを飛ばすことができる。特定のユーザという情報がメール上にないため、「特定のサービスアカウントは通知の対象外にする」という機能要件を満たさず。
参考文献
・本実装した基盤のアウトライン
ChatGPT
・BigQuery データポリシー監査ログの有効化
・備考ー①の基盤を作るときに参考にした記事
・Cloud Functionの書き方
・pub/subのインプット
・requirementの使い方
・requestのバージョン
Discussion