💬

BigQuery上のデータを定期的に集計して表形式でSlack通知する

2022/12/26に公開

こんにちは、スペースマーケットでバックエンドエンジニアをしております、Miotavaです。

「BigQueryに溜めているデータを日次で集計して、表形式でまとめたデータをSlackへ毎日投稿する」ことを実施しましたので、簡単ではありますがやり方をまとめます。

※ 上記により、プロダクトマネージャやマーケタ含めてSlackにいる人であれば受動的にデータを見られて気づきを得られる、そんな「エンジニア発信」で「全社横断的にプロダクト改善していける」仕組みを推進している途中です!

準備

  • Slack Webhook URL
    • Slack側設定は本記事では割愛します。

構成概要

  • DB:BigQuery
  • スケジューラ:Google Scheduler
  • スケジューラとデータ加工・投稿処理のつなぎ:Cloud Pub/Sub
  • データ加工・投稿処理:Google Cloud Functions

今回使用するBigQueryのデータと集計SQL

テーブル定義

CREATE TABLE
  test.action_logs (user_id INT64, action_type STRING, actioned_at TIMESTAMP)
PARTITION BY
  DATE_TRUNC(actioned_at, MONTH);

ここでは、単純に「誰が」「いつ」「何をした」といった行動ログを集計対象データとして扱うこととします。

データ

INSERT INTO `test.action_logs` (user_id, action_type, actioned_at)
VALUES
  (1, "click_button", "2022-12-06 05:00:00 UTC"),
  (2, "click_button", "2022-12-06 05:00:00 UTC"),
  (2, "show_detail", "2022-12-06 05:00:00 UTC"),
  (1, "click_button", "2022-12-07 05:00:00 UTC"),
  (1, "show_detail", "2022-12-07 05:00:00 UTC"),
  (2, "send_message", "2022-12-07 05:00:00 UTC"),
  (2, "show_detail", "2022-12-07 05:00:00 UTC"),
  (2, "send_message", "2022-12-07 05:00:00 UTC"),
  (3, "send_message", "2022-12-07 05:00:00 UTC");

集計SQL例

ここでは、対象の日にアクション数が多い順トップ3を出してみます。

SELECT
  action_type, count(*) as action_count
FROM
  `test.action_logs`
WHERE
  DATE(actioned_at, 'Asia/Tokyo') = '2022-12-07'
GROUP BY
  action_type
ORDER BY
  action_count DESC
LIMIT
  3;

後ほど上記集計SQL内の日付や抽出数を外部から渡せるようにしていきます。

Cloud Functions の設定

1. 「構成」の設定項目

  • トリガータイプを「Cloud Pub/Sub」にしてトピックを作成(トピックIDは今回は topic-test-notify-slack とした)
  • ランタイム環境変数に以下変数追加
    • 名前1:SLACK_WEBHOOK_URL
    • 値1:(SlackのWebhook URLを入力ください)
  • あとはデフォルトでOK
    • Cloud Functions上で重めの処理を扱う場合は適宜メモリなど調整ください

2. 「コード」の設定項目

  • ランタイムを「Python3.10」に変更
  • エントリポイントに「main」と入れる
  • 以下コードを貼り付け
main.py
import requests, json
import base64
from google.cloud import bigquery
from datetime import datetime, timedelta, timezone
from tabulate import tabulate
import os
import ast


def past_date_string(days_ago, tz):
    nowadays = datetime.now(tz)
    past_date = nowadays - timedelta(days_ago)
    return past_date.strftime("%Y-%m-%d")


def prepare_slack_message(title, df):
    """
    集計データをSlack投稿に適した形へ整形
    """
    if len(df) == 0:
        return f"*{title}*```該当データなし```"
    else:
        # tabulateパッケージによる表のフォーマットを行う
        df_tab = tabulate(
            [list(row) for row in df.values],
            headers=list(df.columns),
            tablefmt="rst",
            numalign="right",
        )
        return f"*{title}*```{df_tab}```"


def get_top_action_types_from_bigquery(bq, target_date, limit):
    """
    BigQueryから集計データ取得
    """

    query = f"""
      SELECT
        action_type, COUNT(*) AS action_count
      FROM
        `test.action_logs`
      WHERE
        DATE(actioned_at, 'Asia/Tokyo') = '{target_date}'
      GROUP BY
        action_type
      ORDER BY
        action_count DESC
      LIMIT
        {limit};
    """

    return bq.query(query)


def main(event, context):
    """
    Cloud Functionsのエントリポイント
    """

    bq = bigquery.Client(os.environ.get("GCP_PROJECT"))
    SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")

    # Pub/Subから受け取るデータはbase64エンコードされているのでデコードする
    # さらに上記でデコードされたデータは文字列となっているので、astモジュールで辞書型に変換する
    pubsub_data = ast.literal_eval(base64.b64decode(event["data"]).decode("utf-8"))
    days_ago = int(pubsub_data["days_ago"])
    limit = int(pubsub_data["limit"])

    # ここでは説明のため、集計対象日として固定値の日付文字列を設定(前述で格納したデータに合わせた)
    past_date = "2022-12-07"
    # 参考までに、自分は以下のようにn日前の日付を文字列で取得できるようにしています
    # (テストでPub/Sub引数変えながら見るため)
    # JST = timezone(timedelta(hours=+9), "JST")
    # past_date = past_date_string(days_ago, JST)

    query_output = get_top_action_types_from_bigquery(bq, past_date, limit)

    formatted_message = prepare_slack_message(
        title=f"最もされたアクショントップ{limit}", df=query_output.to_dataframe()
    )

    requests.post(
        SLACK_WEBHOOK_URL,
        data=json.dumps({"text": formatted_message, "username": f"{past_date} の集計結果"}),
    )
requirements.txt
google-api-python-client
google-cloud-bigquery
pandas
db-dtypes
tabulate

ポイント

  • 集計データの表フォーマットについて
    • tabulateパッケージを利用することによりDataFrameデータを表形式で出力しています。
    • 日本語を含む表データについてはSlack上で綺麗に整形できませんでした
      • インラインコードにより当幅フォントで表示しようとしたが、Slackにて展開される際に日本語データが入るとアルファベットや半角記号と日本語の表示幅が異なりカラム幅を揃えることができなかった。
      • そのため、日本語を含むカラムが一つだけなのであれば最後のカラムにしておくことをお勧めします。
        • そうすれば、最悪はみ出ても汚く見えにくい
  • Cloud Functions 内のランタイム環境変数にセットした変数の値は os.environ.get で取得することが可能です。
    • これを利用して、本番用とテスト用とでPythonスクリプトを共通で使い回し、環境変数の値だけを変えたCloud Functions関数を別々で用意することで、それぞれ別の(本番用とテスト用の)SlackチャンネルのWebhook URLを指定してあげることができます。

Cloud Scheduler の設定

  • 頻度:cron形式で指定してください(毎日朝9時に実行したい場合は 0 9 * * * )
  • タイムゾーン:日本標準時
  • ターゲットタイプ:Pub/Sub
  • Cloud Pub/Subトピック:Cloud Functionsで作成したトピックを選択
  • メッセージ本文: {"days_ago": 1, "limit": 3}

完了

うまくいけばこれで、指定した時間にSlackへ以下のように通知されるはずです!

最後に

スペースマーケットでは、一緒にサービスを成長させていく仲間を探しています。
サービスに興味がある、技術的なチャレンジをしつつ事業を成長させてみたい方など是非是非お話しさせてください!

https://www.wantedly.com/projects/1061116

https://www.wantedly.com/projects/1113570

https://www.wantedly.com/projects/1113544

スペースマーケット Engineer Blog

Discussion