🐶

Herokuのログを取得し、整形する

2025/02/14に公開

はじめに

現在監視している Heroku 環境にて、最近はリクエスト数が増えたので、監視用のバッチのためのログ取得処理を実装したので、以下に記します。

ログ取得

Heroku のログは、Heroku API を使用して取得することができます。
本プログラムでは、error_code と dyno に関心があったので、それらを取得するようにしました。
取得したログは、扱いやすいように pydantic モデルに変換しています。

log_get.py
from typing import Dict, Optional, List
import requests
from datetime import datetime
from pydantic import BaseModel, Field
import re

class HerokuLogEntry(BaseModel):
    timestamp: Optional[datetime] = Field(
        default=None,
        description="ログのタイムスタンプ(例:Jan 20 20:58:14)"
    )
    app_name: Optional[str] = Field(
        default=None,
        description="Herokuアプリケーション名"
    )
    error_code: Optional[str] = Field(
        default=None,
        description="エラーコード(例:H12, H10)"
    )
    description: Optional[str] = Field(
        default=None,
        description="エラーの説明文"
    )
    method: Optional[str] = Field(
        default=None,
        description="HTTPメソッド(GET, POST等)"
    )
    path: Optional[str] = Field(
        default=None,
        description="リクエストパス"
    )
    host: Optional[str] = Field(
        default=None,
        description="ホスト名"
    )
    request_id: Optional[str] = Field(
        default=None,
        description="リクエストを一意に識別するID"
    )
    forwarded_for: Optional[str] = Field(
        default=None,
        alias="fwd",
        description="X-Forwarded-Forヘッダーの値(クライアントのIPアドレス)"
    )
    dyno: Optional[str] = Field(
        default=None,
        description="処理を実行したDyno(例:web.1, worker.1)"
    )
    connect_time: Optional[str] = Field(
        default=None,
        description="接続時間(ミリ秒)"
    )
    service_time: Optional[str] = Field(
        default=None,
        description="サービス実行時間(ミリ秒)"
    )
    status: Optional[int] = Field(
        default=None,
        description="HTTPステータスコード"
    )
    bytes_transferred: Optional[int] = Field(
        default=None,
        alias="bytes",
        description="転送されたバイト数"
    )
    protocol: Optional[str] = Field(
        default=None,
        description="使用されたプロトコル(http, https)"
    )


def format_log_line(log_line: str) -> Optional[HerokuLogEntry]:
    """ログ行を整形する関数

    Args:
        log_line (str): 整形前のログ行

    Returns:
        Optional[HerokuLogEntry]: 整形後のログ情報。パースできない場合はNone
    """
    # スペースで分割して基本情報を取得
    parts = log_line.strip().split()
    if len(parts) < 4:
        return None

    # 基本的なログ情報を初期化
    log_dict = {}

    # タイムスタンプを解析
    try:
        timestamp_str = parts[0]
        log_dict["timestamp"] = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
    except Exception:
        log_dict["timestamp"] = None

    try:
        log_dict["app_name"] = parts[1]
    except Exception:
        log_dict["app_name"] = None

    # 残りのkey=valueペアを解析
    for part in parts[3:]:
        if "=" in part:
            try:
                key, value = part.split("=", 1)
                # クォートを削除
                if value.startswith('"') and value.endswith('"'):
                    value = value[1:-1]

                # 特定のフィールドの型変換
                if key == "status":
                    try:
                        value = int(value)
                    except ValueError:
                        value = None
                elif key == "bytes":
                    try:
                        value = int(value)
                    except ValueError:
                        value = None
                elif key == "code":
                    log_dict["error_code"] = value
                elif key == "desc":
                    log_dict["description"] = value
                else:
                    log_dict[key] = value
            except Exception:
                continue

    # Pydanticモデルでバリデーション
    try:
        log_entry = HerokuLogEntry(**log_dict)
        return log_entry
    except Exception:
        return None


def get_heroku_logs(
    api_key: str,
    app_name: str,
    dyno: str = "router",
    source: str = "heroku",
    lines: int = 10
) -> List[HerokuLogEntry]:
    """Herokuのログを取得する関数

    Args:
        api_key (str): HerokuのAPIキー
        app_name (str): Herokuアプリケーション名
        dyno (str, optional): 取得するログのdyno. デフォルトはrouter.
        source (str, optional): 取得するログのソース. デフォルトはheroku.
        lines (int, optional): 取得するログの行数. デフォルトは10.

    Returns:
        List[HerokuLogEntry]: 整形されたログのリスト
    """
    if not api_key:
        return []

    if not app_name:
        return []

    headers = {
        'Accept': 'application/vnd.heroku+json; version=3',
        'Authorization': f'Bearer {api_key}'
    }

    try:
        response = requests.post(
            f'https://api.heroku.com/apps/{app_name}/log-sessions',
            headers=headers,
            json={
                "dyno": dyno,
                "lines": lines,
                "source": source,
            }
        )
        response.raise_for_status()

        log_session: Dict = response.json()
        log_url = log_session.get('logplex_url')

        if log_url:
            logs = requests.get(log_url)
            return [format_log_line(log) for log in logs.text.split("\n") if log]
        else:
            return []

    except Exception as e:
        return []

def get_timeout_error_log_count(logs: List[HerokuLogEntry]) -> int:
    error_logs = [log for log in logs if log.error_code == "H12"]
    return len(error_logs)

def get_dyno_count(logs: List[HerokuLogEntry]) -> int:
    # log.dynoより、web dynoを表すweb.1やweb.2などからDynoの数を取得。2があると、dynoが2つあることになる。
    # 正規表現で取得
    dyno_types = set([log.dyno for log in logs if re.match(r'web\.\d+', log.dyno)])
    dyno_count = len(dyno_types)
    return dyno_count

おわりに

バッチ用には別途プログラムを作成しますが、一旦簡単なログ収集を実装しました。
ログ管理サービスから取得した方が良いですが、今回は少しマイルドな実装とし、Heroku 単体での処理としました。
ほぼ Cursor からのコード生成であるため、コードの可読性は低いですが、そのうち改善したいと思います。

Discussion