💬

BigQueryの大きいクエリを検知してSlackにアラートを送る

2024/03/28に公開

はじめに

BigQueryはクエリが読み込むデータが大きいほど、お金がかかります。
なかには 150万円溶かしたひともいるとか・・・

社内の状況として、BigQueryユーザ当たりの一日の利用上限の設定は こちら ですでに実装済みでした。
しかし、上限内で大きなクエリが流れる事象が頻発していました。
これに対して上限を引き下げると、本当に使いたい時に使えなくなりそれはそれでいまいち。

そのため今回の実装で、「さっきのクエリ大きかったよ」とユーザにアラートを飛ばすことで費用を気にする習慣を作ってもらいたく、実装しました。

要件

大きいクエリが流れたらSlackに通知する を具体化すると以下。

①トリガのタイミング
 クエリ発行時に検知する。クエリの実行自体は許容する。
②トリガ条件条件
 クエリのスキャン対象が0.3TB以上でトリガ。
 特定のサービスアカウントは通知の対象外にする。
③送信するデータ
 ユーザ名(メールアドレスの@より左)
 TBの単位でクエリの容量を送る。有効数字は2桁。
④送信先
 Slack

全体像

BigQuery:クエリの発行。ログの生成。
Cloud Logging:ログの有効化。
Pub/Sub:メッセージのキューイング。
Cloud Function:関数を実行。(python)

開発手順

0 BigQuery

BigQueryはデフォルトでログが吐き出されています。
なので特に気にしなくて良いです。
今回はその自動で吐き出されているログを使います。

1 Cloud Logging・Pub/Sub

ここでは、pub/subにログをルーティングします。

1-1 ログルーター > シンクを作成

1-2 適当にログルーティングの名前をつけつつ、pub/subトピックを作成。

1-3 シンクに含めるログの選択は以下のコードを記載

ここで機能要件の「0.3TB以上でトリガ」フィルタができます。
しかし、pythonの方が可読性が高い上に、pub/sub自体料金がそんなに高くないので、selectしたものは全件pub/subにキューイングされるようにします。

resource.type="bigquery_resource"
protoPayload.methodName="jobservice.jobcompleted"

1-4 全体像はこんな感じ

2 Slack用のwebhook

2-1 webhookを作成

こちら から作ります。
ログインしてうんたらかんたらしたらURLが発行できます。

3 Cloud Function

3-1 Cloud Function > ファンクションを作成

3-2 基本

環境:第2世代
関数名:適当につける
リージョン:asia-northeast1 (東京)
image.png

3-3 トリガー

1で作成したpub/subをトリガに設定します。

3-4 コードの記述

ランタイム:python 3.9(確実に動かしたかったので、このバージョンにしています。)

main.py (pub/subから送られてきたデータを加工しつつslackに送信)
import base64
import json
import requests

# Triggered from a message on a Cloud Pub/Sub topic.
def hello_pubsub(event, context):
    pubsub_message = base64.b64decode(event["data"])
    message_data = json.loads(pubsub_message)

    email = message_data['protoPayload']['authenticationInfo']['principalEmail']
    user = email.split('@')[0]

    job_statistics = message_data['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobStatistics']
    query_bytes_billed = int(job_statistics['totalBilledBytes'])

    terabytes_value = query_bytes_billed / (2**40)  # バイトをテラバイトに変換
    rounded_terabytes = round(terabytes_value, 2)  # 結果を有効数字2桁で丸める

    # 1TB = 1024^4 bytes
    if email != "{除外したいサービスアカウントのメールアドレス}" and rounded_terabytes >= 0.3:
        slack_message = f"えぇ〜?先ほどの {user} さんのクエリは {rounded_terabytes}TB だったのかい〜?"
        webhook_url = '{webhookのURL}'
        requests.post(webhook_url, json={'text': slack_message})
requirements.txt (インストール対象)
functions-framework==3.*
requests==2.*

3-5 デプロイ

デプロイを押します。

4 Slack

4-1 テスト
先ほどのcloud functionのif分岐を弱めたのちに(0.000001TB以上でトリガなど。)BigQueryでクエリをしてみて、データが飛ぶのを確認します。
こんな感じでslackに通知がきたらゲームクリアです。今回はマスオさんにしてみました。

備考:その他に考えた基盤

①BigQuery → monitoring → Slack
ノーコードで構成可能。1時間で1TBスキャンがあった時など、BigQuery全体の利用状況でアラートを飛ばすことができる。ユーザ名が抽出できないため「特定のサービスアカウントは通知の対象外にする」という機能要件を満たさず。

②BigQuery → 割り当てと上限 → monitoring → Outlook → power automate → Slack
ローコードで構成可能。ユーザ一日あたりのクエリした容量を閾値にアラートを飛ばすことができる。特定のユーザという情報がメール上にないため、「特定のサービスアカウントは通知の対象外にする」という機能要件を満たさず。

参考文献

・本実装した基盤のアウトライン
ChatGPT
https://chat.openai.com/share/e4938978-ee85-4c1c-9823-b9d1f24d356d

・BigQuery データポリシー監査ログの有効化
https://cloud.google.com/bigquery/docs/column-data-masking-audit-logging?hl=ja

・備考ー①の基盤を作るときに参考にした記事
https://dev.classmethod.jp/articles/bigquery-query-usage-monitoring/

・Cloud Functionの書き方
https://www.softbank.jp/biz/blog/cloud-technology/articles/202301/deploy-a-slack-using-cf/

・pub/subのインプット
https://qiita.com/k-akie/items/7e40739b2f226508571a

・requirementの使い方
https://envader.plus/course/8/scenario/1073

・requestのバージョン
https://pypi.org/project/requests/

Discussion