複数ベンダーのLLMログを集約した統計グラフをSlackに日次通知する(Cloudflare AI Gateway/Python)
はじめに
年あらため、Cloudflare AI Gatewayを使いはじめました。
エンドポイントを差し替えることで、複数ベンダーのLLMのログを統合できる代物です。
たとえば、以下のようなデータが取れます。
各リクエストの概要
時系列グラフ
ベンダーをまたがる集約ログは、異常検知や生成AIの利用促進のKPIになり、地味に重要です。筆者も似たようなプロキシをApps Script/Spreadsheetで自作したり、オープンソースのllm.reportを試したこともありました。[1]
しかし、
- レスポンスが遅くなる
- メンテが大変
- モデルやベンダーの変動
- インフラの管理
などが課題/面倒でした。かといって各ベンダーのdashboardを見たり、各ベンダーのAPIで自作するのもメンテが大変です。Cloudflare AI Gatewayはそれらを解決しつつ、現時点で機能もスリムなので、使いやすそうに見えます。
Cloudflare AI Gatewayの説明は、上記のscrapやscrapで紹介している先行記事に譲ることとし、この記事では以下を目指します。
この記事の目標
Cloudflare AI Gatewayのdashboardで見られるグラフをSlackで日次で通知したい。
すなわち、
をSlackに通知したい。
理由は単純に
- 毎日Cloudflare AI Gatewayのdashboardを見るのが面倒/忘れる
- チームメンバーの目に触れない
からです。
方針
以下を考えました。
- Cloudflare AI Gatewayの標準機能を探す
- 調べたが見つからなかった。(あれば教えてほしい。)
- Logpushを設定すればできそう。ただ、工数が気になるし、グラフは描画しないといけない。
-
Cloudflare Workers Browser Renderingを使う
- Pupeteerで外形監視できる。これでスクリーンショットを撮る。Cloudflareで完結。
- 人間の証明ができないのでログインできない。
結局、以下にしました。
- Cloudflare APIでログを取得
- グラフ描画
- Slack投稿
言語・デプロイ先は先行記事もあるので、Python/Firebase Functionsとしました。
実装
長くなってしまうので、ポイントを摘んで記録します。なお、筆者はそれほどPythonに書き慣れていませんので、優しい心でご参考ください。
1. Firebase Functionsの作成
Firebase FunctionsのPythonサポートは2023年から、と思ったより若いです。
基本的には公式ドキュメントや最近の記事を見て作っていきますが、TypeScriptより苦労しそうです。
Pythonのバージョン
以下でも書かれていますが、2025年1月時点でPython 3.13では動作せず、Python3.12でないとデプロイが上手くいきませんでした。(Did you forget to run 'python3.12 -m venv venv'?
)
pyenvでバージョンを切り替えて再度venv作成、installしたところ動作しました。
Schedulerの設定
スケジュール実行にするにはデコレータを使えばOK。
@scheduler_fn.on_schedule(
schedule="every day 17:00",
timezone="Asia/Tokyo",
region="asia-northeast1",
memory=options.MemoryOption.GB_1,
max_instances=1,
timeout_sec=60,
)
def daily_ai_gateway_logs(event: scheduler_fn.ScheduledEvent) -> None:
ローカル動作
ドキュメントではfirebase emulators:start
が紹介されていますが、スケジュール関数は使えないようです。
http関数としてテストするか、functions-framework
を使い、functions-framework --target <func_name> --debug
でも動作しました。(ただし、たんにfirebaseのデコレータが無視されてhttp関数になっているだけのように見え、良いやり方ではないかもしれません。)
コマンド例。
$python3.12 -m venv venv
$source venv/bin/activate
$cd functions
$pip install -r requirements.txt
$functions-framework --target daily_ai_gateway_logs --debug
matplotlib.use('Agg')
グラフ描写に使うmatplotlib
は、以下の記事にある通り、Backendの指定が必要でした。
指定しないとStarting the Matplotlib GUI outside the main thread may fail.
の警告が出て失敗しました。
メモリ
デフォルトの256だと足りません。1GBで安定します。
デプロイ
firebase.json
のcodebase
を設定のうえ、以下を実行。
$python3.12 -m venv venv
$source venv/bin/activate
$cd functions
$pip install -r requirements.txt
$firebase deploy --only functions
コード概観
ディレクトリーツリーは下記。
ai-gateway-notifier/
┣ functions/
┃ ┣ cf_utils/
┃ ┃ ┣ __init__.py
┃ ┃ ┣ constants.py
┃ ┃ ┣ logs.py
┃ ┃ ┣ slack.py
┃ ┃ ┣ utils.py
┃ ┃ ┗ visualizer.py
┃ ┣ .env
┃ ┣ .gitignore
┃ ┣ .python-version
┃ ┣ main.py
┃ ┗ requirements.txt
┣ .firebaserc
┣ .gitignore
┣ README.md
┗ firebase.json
requirements.txt
は下記です。(バージョンを吟味すべきですが、端折りました。)
firebase_functions
cloudflare
python-dotenv~=0.19.0
pytz~=2023.3
pandas
matplotlib
slack_sdk
Firebase関連の処理はエントリーポイントのmain.py
にあります。
最終的にmain.py
は以下のようになりました。
import os
from firebase_functions import scheduler_fn, logger, options
from firebase_admin import initialize_app
from dotenv import load_dotenv
from cf_utils import get_logs, analyze_logs, validate_required_env_vars
from cf_utils.slack import Slack
from cf_utils import constants
from datetime import datetime
# 環境変数の読み込み
load_dotenv()
# Firebase初期化
initialize_app()
@scheduler_fn.on_schedule(
schedule="every day 17:00",
timezone="Asia/Tokyo",
region="asia-northeast1",
memory=options.MemoryOption.GB_1,
max_instances=1,
timeout_sec=60,
)
def daily_ai_gateway_logs(event: scheduler_fn.ScheduledEvent) -> None:
"""
Cloudflare Log API よりログを取得し、matplotlib で分析結果をグラフ化、
Slack へ通知するFirebase Functions のエントリポイント。
"""
try:
# 必須環境変数の検証
validate_required_env_vars()
# 環境変数から Gateway ID と Account ID を取得
GATEWAY_ID = os.getenv("GATEWAY_ID")
ACCOUNT_ID = os.getenv("ACCOUNT_ID")
# ログの取得
logs = get_logs(
gateway_id=GATEWAY_ID,
account_id=ACCOUNT_ID,
hours=constants.LOG_HOURS,
per_page=constants.LOG_PER_PAGE,
)
logger.info(f"Retrieved {len(logs)} logs.")
if logs:
# サンプルログをデバッグ表示
logger.debug(f"Sample log entry: {logs[0].__dict__}")
# ログの分析
analysis_results = analyze_logs(logs)
logger.info(f"Analysis complete. Summary: {analysis_results.get('summary')}")
# Slackに通知 (失敗しても処理を続行)
try:
slack = Slack()
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
slack_title = f"AI Gateway Analytics - {current_time}"
slack.upload_graph(
graph_data=analysis_results["graph"],
title=slack_title,
summary=analysis_results["summary"],
)
logger.info(f"Slack notification sent: {slack_title}")
except Exception as e:
logger.warning(f"Failed to send Slack notification: {str(e)}")
return
except Exception as e:
# エラー時はメッセージとスタックトレースをログ出力しつつ、レスポンスにも返す
import traceback
logger.error(f"Error: {str(e)}")
logger.error(traceback.format_exc())
return
うち、get_logs
がCloudflareのAPI部分、analyze_logs
が解析・グラフ作成部分、Slack
クラスが、ファイルアップロード/投稿を担う部分です。
2. Cloudflare APIの処理
ライブラリ
認証
以下に説明されている通り、
- API Token
- API Key
の2種類に大別され、API Token
が推奨されています。
Tokenの取得からAI Gateway APIをcallするには、以下の記事が参考になります。
...が、Pythonライブラリを使うとうまくいきませんでした。Node.jsのライブラリでは同一のTokenで取得できたので、認証に問題はないはずですが、よくわかりません。結局APIキーでの認証としました。
コード例
APIの呼び出し部分を抽出します。
from cloudflare import Cloudflare
client = Cloudflare()
...
response = client.ai_gateway.logs.list(
id=gateway_id,
account_id=account_id,
direction="asc",
start_date=start_date,
end_date=end_date,
order_by="created_at",
page=page,
per_page=per_page,
)
client
は環境変数からデフォルト値を取ってきます。今回は、
- CLOUDFLARE_EMAIL
- CLOUDFLARE_API_KEY
を環境変数として設定しているため、引数を省いています。
戻り値はListで要素のサンプルは以下です。
LogListResponse(id='xxxxxxxxxxxx', cached=False, created_at=datetime.datetime(2025, 1, 6, 1, 4, 38), duration=453, model='', path='', provider='azure-openai', request='', response='', success=False, tokens_in=0, tokens_out=0, metadata=None, request_content_type='', request_type='provider', response_content_type='', status_code=403, step=0, updated_at='2025-01-06 01:04:38', event_id=None, model_type='', timings={'total': 453.22490498423576, 'latency': 451.5235419869423}, location={'region': 'Tokyo', 'colo': 'NRT'}, cost=0, custom_cost=False, feedback=None, score=None)
3. グラフの描画
生成AIの得意分野なので、お任せ。JSTへの変換、色付けなどもおこなっています。グラフはbytes
で返しています。
cf_utils/logs.py
import io
from typing import List, Dict, Any
import pandas as pd
import matplotlib
matplotlib.use("Agg") # GUI環境なしでも動作可能にする設定
import matplotlib.pyplot as plt
from matplotlib.dates import HourLocator, DateFormatter
from pytz import timezone
from firebase_functions import logger
from . import constants
def get_utc_time_range(hours: int = constants.LOG_HOURS):
"""
UTCの現在時刻を終端とした指定時間分のtime_rangeを返す。
時刻は整時(floor("h"))している。
Returns:
start_time (Timestamp): UTCの開始時刻
end_time (Timestamp): UTCの終了時刻
time_range (DatetimeIndex): 1時間ごとのUTCタイムレンジ
"""
end_time = pd.Timestamp.now(tz="UTC").floor("h")
start_time = (end_time - pd.Timedelta(hours=hours)).floor("h")
time_range = pd.date_range(start=start_time, end=end_time, freq="h", tz="UTC")
return start_time, end_time, time_range
def set_jst_xaxis(ax):
"""
Matplotlib Axes の X 軸を JST (Asia/Tokyo) でラベル表示する設定に変更する。
Args:
ax (Axes): Matplotlib の Axes オブジェクト
"""
jst = timezone("Asia/Tokyo")
# DateFormatterにタイムゾーンを渡すと、ラベル表示が指定のTZになる
date_formatter = DateFormatter("%m-%d %H:%M", tz=jst)
ax.xaxis.set_major_locator(HourLocator())
ax.xaxis.set_major_formatter(date_formatter)
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def prepare_log_data(logs) -> pd.DataFrame:
"""
LogListResponseオブジェクトをDataFrameに変換し、
直近24時間(デフォルト)のログデータにフィルタリングして返す。
"""
logger.debug("Converting raw logs into pandas DataFrame...")
data = []
for log in logs:
data.append(
{
"created_at": log.created_at,
"provider": log.provider,
"success": log.success,
"tokens_in": log.tokens_in,
"tokens_out": log.tokens_out,
"status_code": log.status_code,
"cost": log.cost,
}
)
df = pd.DataFrame(data)
# created_atをdatetime型に変換(UTCを付与)
df["created_at"] = pd.to_datetime(df["created_at"], utc=True)
# 現在時刻(UTC)と比較
utc_now = pd.Timestamp.now(tz="UTC")
start_time = utc_now - pd.Timedelta(hours=constants.LOG_HOURS)
# tz-aware同士で比較
filtered_df = df[(df["created_at"] >= start_time) & (df["created_at"] <= utc_now)]
logger.debug(
f"DataFrame created with {len(df)} total rows, filtered to {len(filtered_df)} rows."
)
return filtered_df
def calculate_summary(df: pd.DataFrame) -> Dict[str, Dict[str, float]]:
"""
期間内の集計値を計算し、プロバイダー別と全体の統計情報をまとめる。
Args:
df (pd.DataFrame): ログデータのDataFrame
Returns:
Dict: { provider名または'Total': { 各種メトリクス } }
"""
logger.debug("Calculating summary statistics for each provider and total.")
summary = {}
# プロバイダーごとの集計
for provider in df["provider"].unique():
provider_df = df[df["provider"] == provider]
summary[provider] = {
"Total Requests": len(provider_df),
"Total Tokens In": provider_df["tokens_in"].sum(),
"Total Tokens Out": provider_df["tokens_out"].sum(),
"Total Errors": (provider_df["status_code"] >= 400).sum(),
"Total Cost": provider_df["cost"].sum(),
}
# 全体の集計
summary["Total"] = {
"Total Requests": len(df),
"Total Tokens In": df["tokens_in"].sum(),
"Total Tokens Out": df["tokens_out"].sum(),
"Total Errors": (df["status_code"] >= 400).sum(),
"Total Cost": df["cost"].sum(),
}
logger.debug(f"Summary: {summary}")
return summary
def create_stacked_plots(df: pd.DataFrame, providers: List[str]) -> tuple:
"""
各種積み上げグラフの元データを作成する。
Args:
df (pd.DataFrame): ログデータ
providers (List[str]): プロバイダーリスト
Returns:
tuple:
(request_data, tokens_in_data, tokens_out_data, error_data, time_range)
- リクエスト数、トークン数、エラー数のデータと時間範囲
"""
logger.debug("Preparing data for stacked plots...")
# UTCタイムレンジを取得
_, _, time_range = get_utc_time_range(hours=constants.LOG_HOURS)
request_data = {}
tokens_in_data = {}
tokens_out_data = {}
error_data = {}
for provider in providers:
provider_df = df[df["provider"] == provider].copy()
provider_df.set_index("created_at", inplace=True)
# UTCのまま、1時間単位でresample
hourly = provider_df.resample("h", closed="right", label="right")
# リクエスト数
request_counts = hourly.size()
request_data[provider] = request_counts.reindex(time_range, fill_value=0)
# トークン数
tokens = hourly.agg({"tokens_in": "sum", "tokens_out": "sum"})
tokens = tokens.reindex(time_range, fill_value=0)
tokens_in_data[provider] = tokens["tokens_in"]
tokens_out_data[provider] = tokens["tokens_out"]
# エラー数
errors = hourly["status_code"].apply(
lambda x: (x >= 400).sum() if len(x) else 0
)
error_data[provider] = errors.reindex(time_range, fill_value=0)
logger.debug("Stacked plot data preparation complete.")
return request_data, tokens_in_data, tokens_out_data, error_data, time_range
def visualize_logs(logs) -> bytes:
"""
ログデータを可視化(積み上げグラフ)し、PNG 画像をバイナリ形式で返す。
Args:
logs: LogListResponseオブジェクトのリスト
Returns:
bytes: PNG形式の画像データ
"""
logger.debug("Visualizing logs...")
# データの準備
df = prepare_log_data(logs)
if df.empty:
logger.warning("No log data available for visualization (DataFrame is empty).")
raise ValueError("No log data available for visualization")
# グラフの作成
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12))
plt.subplots_adjust(hspace=0.3)
# データの準備
providers = sorted(df["provider"].unique())
request_data, tokens_in_data, tokens_out_data, error_data, time_range = (
create_stacked_plots(df, providers)
)
# リクエスト数(積み上げ)
def get_provider_color(provider: str) -> str:
if provider == "google-vertex-ai":
return "#CBC3E3"
elif provider == "google-ai-studio":
return "#B0E0E6"
elif provider == "azure-openai":
return "#FFDAB9"
else:
return "#D3D3D3"
provider_colors = {p: get_provider_color(p) for p in providers}
ax1.stackplot(
time_range,
[request_data[provider] for provider in providers],
labels=providers,
colors=[provider_colors[p] for p in providers],
)
# トークン数(積み上げ)
ax2.stackplot(
time_range,
[tokens_in_data[p] for p in providers],
labels=[f"{p} (in)" for p in providers],
colors=[provider_colors[p] for p in providers],
alpha=0.7,
)
ax2.stackplot(
time_range,
[tokens_out_data[p] for p in providers],
labels=[f"{p} (out)" for p in providers],
colors=[provider_colors[p] for p in providers],
alpha=0.3,
)
# エラー数(積み上げ)
ax3.stackplot(
time_range,
[error_data[p] for p in providers],
labels=providers,
colors=[provider_colors[p] for p in providers],
)
# 軸の装飾
for ax in [ax1, ax2, ax3]:
ax.grid(True, linestyle="--", alpha=0.7)
# ここは UTC の start_time, end_time で範囲を指定
ax.set_xlim(time_range[0], time_range[-1])
# JST表示設定
set_jst_xaxis(ax)
ax.legend()
# タイトルと軸ラベルの設定
ax1.set_title("Hourly Request Count by Provider (Stacked)")
ax1.set_ylabel("Number of Requests")
ax2.set_title("Hourly Token Usage by Provider (Stacked)")
ax2.set_ylabel("Number of Tokens")
ax3.set_title("Hourly Error Count by Provider (Stacked)")
ax3.set_ylabel("Number of Errors")
# グラフの表示設定
plt.tight_layout()
# バイトデータとしてPNGを保存
buffer = io.BytesIO()
plt.savefig(buffer, format="png", bbox_inches="tight", dpi=300)
plt.close()
logger.debug("Visualization complete, returning PNG byte data.")
return buffer.getvalue()
def analyze_logs(logs) -> Dict[str, Any]:
"""
ログデータを分析し、サマリーと可視化グラフを返す。
Args:
logs: LogListResponseオブジェクトのリスト
Returns:
Dict: { 'summary': 集計結果, 'graph': 画像バイトデータ }
"""
logger.debug("Analyzing logs (summary + visualization)...")
# データの準備
df = prepare_log_data(logs)
if df.empty:
logger.warning("No log data available for analysis (DataFrame is empty).")
return {"summary": {}, "graph": b""}
# 集計の計算
summary = calculate_summary(df)
# グラフの生成
graph_data = visualize_logs(logs)
logger.debug("Analysis complete, returning summary and graph data.")
return {"summary": summary, "graph": graph_data}
4. Slackの処理
Slackアプリの作成
長くなってしまうので省略します。User Tokenを使い、下記のScopeを設定、User OAuth Token
を取得します。
コード例
コードは生成AIで実装できます。ただし、File UploadはV2があるので注意です。クライアントライブラリを使った方が良いと思います。
環境変数には
- SLACK_CHANNEL_ID
- SLACK_BOT_TOKEN
が必要です。SLACK_CHANNEL_ID
は「チャンネル詳細」から取得できます。
cf_utils/slack.py
import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from typing import Optional, Dict, Any
class Slack:
def __init__(
self, channel_id: Optional[str] = None, access_token: Optional[str] = None
):
"""Initialize Slack client with credentials from env vars or parameters"""
self.channel_id = channel_id or os.getenv("SLACK_CHANNEL_ID")
self.access_token = access_token or os.getenv("SLACK_BOT_TOKEN")
if not self.channel_id or not self.access_token:
raise ValueError("SLACK_CHANNEL_ID and SLACK_BOT_TOKEN must be provided")
self.client = WebClient(token=self.access_token)
def _format_summary(self, summary: Dict[str, Any]) -> str:
"""
summaryデータを整形されたメッセージに変換
Args:
summary: 分析結果のサマリー辞書
Returns:
str: 整形されたメッセージ
"""
message = "📊 *Cloudflare AI Gateway 日次レポート*\n\n"
# プロバイダーごとの集計
for provider, stats in summary.items():
if provider != "Total":
message += f"*{provider}*\n"
message += f"• リクエスト数: {stats['Total Requests']:,}\n"
message += f"• Input Tokens: {stats['Total Tokens In']:,}\n"
message += f"• Output Tokens: {stats['Total Tokens Out']:,}\n"
message += f"• エラー数: {stats['Total Errors']:,}\n"
message += f"• コスト: ${stats['Total Cost']:.4f}\n\n"
# 合計値
total = summary["Total"]
message += "*合計*\n"
message += f"• 総リクエスト数: {total['Total Requests']:,}\n"
message += f"• 総Input Tokens: {total['Total Tokens In']:,}\n"
message += f"• 総Output Tokens: {total['Total Tokens Out']:,}\n"
message += f"• 総エラー数: {total['Total Errors']:,}\n"
message += f"• 総コスト: ${total['Total Cost']:.4f}\n"
success_rate = (
(
(total["Total Requests"] - total["Total Errors"])
/ total["Total Requests"]
* 100
)
if total["Total Requests"] > 0
else 0
)
message += f"• 成功率: {success_rate:.1f}%\n"
return message
def upload_graph(
self, graph_data: bytes, title: str, summary: Dict[str, Any]
) -> dict:
"""
グラフデータとサマリー情報をSlackにアップロード
Args:
graph_data: Binary graph data (PNG format)
title: Title for the graph image
summary: Analysis summary data
Returns:
dict: Slack API response
"""
try:
# サマリーメッセージの作成
message = self._format_summary(summary)
# グラフとメッセージを一つのメッセージとしてアップロード
response = self.client.files_upload_v2(
channels=self.channel_id,
initial_comment=message,
title=title,
filename=f"{title}.png",
file=graph_data,
)
if not response["ok"]:
raise ValueError(f"Slack API error: Upload failed")
return response
except SlackApiError as e:
print(f"Got an error: {e.response['error']}")
print(f"Received a response status_code: {e.response.status_code}")
raise Exception(f"Failed to upload to Slack: {str(e)}")
5. 動作検証
ベストなやり方ではなさそうですが、下記で一応テストできます。
ローカルの場合
$python3.12 -m venv venv
$source venv/bin/activate
$cd functions
$pip install -r requirements.txt
$functions-framework --target daily_ai_gateway_logs --debug
コンソールに表示のURLにアクセス。
Cloud Functions
Cloud Schedulerから強制実行。
Slack
上記いずれかで実行し、Slackに以下のようなメッセージがポストされれば成功です。
グラフはwindow時間ごとの平均をとったりはしていないので、凸凹しています。
おわりに
フロー自体は単純なので、2時間もあれば作れると思って着手しましたが、
- PythonでのFirebase Functionsの開発
- Cloudflare APIの認証
- API Tokenでの認証がNode.jsではうまくいくのにPythonだと動作しない
で足を取られました。加えて、今回扱ったCloudflare AI Gateway / Cloudflare API / Python on Firebase Functions はサービスが若いこともあり、いずれも参考記事が少なかったです。
2025年にもなると、複数ベンダーのLLMがあちこちの環境で動く状況が当たり前になると思います。その前提として今回のような統合ログの定期的な監視・評価は重要だと考えています。本記事で実装したPythonコードはこなれないところがありますが、開発の流れなど、参考になれば幸いです。[2]
-
llm.report のリポジトリは、いま見ると
llm.report is no longer actively maintained. This project was unable to find a sustainable business model
と書かれていました。 ↩︎ -
ソースコードは公開しませんが、リクエストがあれば共有します。 ↩︎
Discussion