🦉

Claude Code の HooksはCatch Allしてひとつのscriptで処理したらいいのでは?

に公開

https://docs.anthropic.com/en/docs/claude-code/hooks

こんな感じで macherが空の設定を書いて、ひとつのscriptで処理したら1箇所にまとまっていいのでは?
というのを思いついたので投稿しておく

{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "",
        "hooks": [
          {
            "type": "command",
            "command": "CLAUDE_HOOK_TYPE=PreToolUse /path/to/unified_hook_processor.py"
          }
        ]
      }
    ],
~/.claude/settings.json
{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "",
        "hooks": [
          {
            "type": "command",
            "command": "CLAUDE_HOOK_TYPE=PreToolUse /path/to/unified_hook_processor.py"
          }
        ]
      }
    ],
    "PostToolUse": [
      {
        "matcher": "",
        "hooks": [
          {
            "type": "command", 
            "command": "CLAUDE_HOOK_TYPE=PostToolUse /path/to/unified_hook_processor.py"
          }
        ]
      }
    ],
    "Stop": [
      {
        "matcher": "",
        "hooks": [
          {
            "type": "command",
            "command": "CLAUDE_HOOK_TYPE=Stop /path/to/unified_hook_processor.py"
          }
        ]
      }
    ],
    "Notification": [
      {
        "matcher": "",
        "hooks": [
          {
            "type": "command",
            "command": "CLAUDE_HOOK_TYPE=Notification /path/to/unified_hook_processor.py"
          }
        ]
      }
    ]
  }
}
unified_hook_processor.py
#!/usr/bin/env python3
"""
Claude Code 統一フックプロセッサー
全てのフックイベント(PreToolUse, PostToolUse, Stop, Notification)を処理
"""
import json
import sys
import os
import subprocess
from pathlib import Path
from typing import Dict, Any, Optional, List, Set

class ClaudeCodeHookProcessor:
    def __init__(self):
        self.project_root = self._find_project_root()
        
        # 危険なパス・操作のパターン
        self.dangerous_paths = {
            '/etc/', '/sys/', '/proc/', '/dev/', '/root/',
            '/usr/bin/', '/usr/sbin/', '/bin/', '/sbin/'
        }
        
        # 本番系のファイル・ディレクトリ
        self.production_paths = {
            'production/', 'prod/', '.env.production', 
            'docker-compose.prod.yml', 'Dockerfile.prod'
        }
        
        # 危険なコマンドパターン  
        self.dangerous_commands = {
            'rm -rf /', 'sudo rm', 'chmod 777', 'chown root',
            'dd if=', '> /dev/', 'mkfs.', 'fdisk', 'parted'
        }
        
        # フォーマット対象拡張子
        self.format_extensions = {
            '.js', '.jsx', '.ts', '.tsx', '.py', '.go', '.rs', '.java'
        }

    def _find_project_root(self) -> Path:
        """プロジェクトルートを探す"""
        current = Path.cwd()
        for parent in [current] + list(current.parents):
            if any((parent / marker).exists() for marker in 
                   ['.git', 'package.json', 'pyproject.toml', 'go.mod', 'Cargo.toml']):
                return parent
        return current

    def process_hook(self, hook_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """メインのフック処理ロジック"""
        
        # フックタイプを環境変数から判定(設定で渡す)
        hook_type = os.environ.get('CLAUDE_HOOK_TYPE', self._detect_hook_type(hook_data))
        
        if hook_type == 'PreToolUse':
            return self._handle_pre_tool_use(hook_data)
        elif hook_type == 'PostToolUse':
            return self._handle_post_tool_use(hook_data)
        elif hook_type == 'Stop':
            return self._handle_stop(hook_data)
        elif hook_type == 'Notification':
            return self._handle_notification(hook_data)
        
        return None

    def _detect_hook_type(self, hook_data: Dict[str, Any]) -> str:
        """入力データからフックタイプを推測"""
        if 'tool_name' in hook_data and 'tool_response' not in hook_data:
            return 'PreToolUse'
        elif 'tool_name' in hook_data and 'tool_response' in hook_data:
            return 'PostToolUse'
        elif 'message' in hook_data and 'title' in hook_data:
            return 'Notification'
        elif 'transcript_path' in hook_data:
            return 'Stop'
        return 'Unknown'

    def _handle_pre_tool_use(self, hook_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """PreToolUse: 危険な操作のブロック"""
        tool_name = hook_data.get('tool_name', '')
        tool_input = hook_data.get('tool_input', {})
        
        # 1. 危険なファイルパスチェック
        file_path = (tool_input.get('file_path') or 
                    tool_input.get('path') or 
                    tool_input.get('filename') or '')
        
        if file_path:
            # システムディレクトリへの書き込み
            if any(dangerous in file_path for dangerous in self.dangerous_paths):
                return {
                    "decision": "block",
                    "reason": f"システムディレクトリ '{file_path}' への変更は禁止されています"
                }
            
            # 本番環境ファイル
            if any(prod in file_path for prod in self.production_paths):
                return {
                    "decision": "block", 
                    "reason": f"本番環境ファイル '{file_path}' への変更は禁止されています"
                }
            
            # パストラバーサル攻撃
            if '..' in file_path or file_path.startswith('/'):
                return {
                    "decision": "block",
                    "reason": f"不正なパス '{file_path}' が検出されました"
                }

        # 2. 危険なBashコマンドチェック
        if tool_name == 'Bash':
            command = tool_input.get('command', '')
            if any(dangerous in command for dangerous in self.dangerous_commands):
                return {
                    "decision": "block",
                    "reason": f"危険なコマンド '{command}' の実行は禁止されています"
                }

        # 3. 安全な操作の自動承認
        if tool_name in ['Read', 'Glob', 'Grep']:
            return {
                "decision": "approve",
                "reason": "読み取り専用操作のため自動承認"
            }
        
        # プロジェクト内のソースコード変更
        if (tool_name in ['Write', 'Edit', 'MultiEdit'] and 
            file_path and 
            file_path.startswith('./src/') and 
            Path(file_path).suffix in self.format_extensions):
            return {
                "decision": "approve", 
                "reason": "ソースコードディレクトリの安全な変更"
            }

        return None  # デフォルトの権限フローに従う

    def _handle_post_tool_use(self, hook_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """PostToolUse: 品質チェック"""
        tool_name = hook_data.get('tool_name', '')
        tool_input = hook_data.get('tool_input', {})
        tool_response = hook_data.get('tool_response', {})
        
        # ファイル変更操作のみ処理
        if tool_name not in ['Write', 'Edit', 'MultiEdit']:
            return None
            
        file_path = (tool_input.get('file_path') or 
                    tool_input.get('path') or 
                    tool_input.get('filename'))
        
        if not file_path or not Path(file_path).exists():
            return None

        issues = []
        
        # 1. 構文チェック
        syntax_issues = self._check_syntax(file_path)
        issues.extend(syntax_issues)
        
        # 2. リンターチェック  
        lint_issues = self._check_linting(file_path)
        issues.extend(lint_issues)
        
        # 3. テストチェック(関連テストがある場合)
        test_issues = self._check_tests(file_path)
        issues.extend(test_issues)
        
        if issues:
            return {
                "decision": "block",
                "reason": f"品質チェックで問題が検出されました:\n" + "\n".join(f"• {issue}" for issue in issues)
            }
        
        return None

    def _handle_stop(self, hook_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Stop: 全体品質チェック"""
        transcript_path = hook_data.get('transcript_path', '')
        
        # 無限ループ防止
        if hook_data.get('stop_hook_active'):
            return None
        
        # 1. コード変更があったかチェック
        if not self._has_code_modifications(transcript_path):
            # コード変更なし → フォーマットスキップ
            return {"suppressOutput": True}
        
        # 2. 必須要件チェック
        missing_requirements = self._check_requirements()
        if missing_requirements:
            return {
                "decision": "block",
                "reason": f"必須要件が不足しています:\n" + "\n".join(f"• {req}" for req in missing_requirements)
            }
        
        # 3. 全体テストチェック
        if not self._run_full_tests():
            return {
                "decision": "block", 
                "reason": "テストが失敗しています。全てのテストが通るまで修正を続けてください。"
            }
        
        # 4. フォーマット実行
        self._run_formatting()
        
        print("✅ 全ての品質チェックが完了しました")
        return None

    def _handle_notification(self, hook_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Notification: カスタム通知"""
        message = hook_data.get('message', '')
        
        # 重要な通知のみカスタム処理
        if any(keyword in message.lower() for keyword in ['error', 'failed', 'block']):
            self._send_custom_notification(message)
        
        return {"suppressOutput": True}  # デフォルト通知を抑制

    # ユーティリティメソッド
    
    def _check_syntax(self, file_path: str) -> List[str]:
        """構文チェック"""
        issues = []
        suffix = Path(file_path).suffix
        
        try:
            if suffix == '.py':
                result = subprocess.run(['python', '-m', 'py_compile', file_path], 
                                      capture_output=True, text=True)
                if result.returncode != 0:
                    issues.append(f"Python構文エラー: {result.stderr.strip()}")
            
            elif suffix in ['.js', '.jsx', '.ts', '.tsx']:
                # TypeScript/ESLint チェック
                result = subprocess.run(['npx', 'tsc', '--noEmit', file_path], 
                                      capture_output=True, text=True)
                if result.returncode != 0:
                    issues.append(f"TypeScript型エラー: {result.stderr.strip()}")
                    
        except FileNotFoundError:
            pass  # ツールがインストールされていない場合はスキップ
            
        return issues

    def _check_linting(self, file_path: str) -> List[str]:
        """リンターチェック"""
        issues = []
        suffix = Path(file_path).suffix
        
        try:
            if suffix == '.py':
                result = subprocess.run(['flake8', file_path], 
                                      capture_output=True, text=True)
                if result.returncode != 0:
                    issues.append(f"Flake8警告: {result.stdout.strip()}")
            
            elif suffix in ['.js', '.jsx', '.ts', '.tsx']:
                result = subprocess.run(['npx', 'eslint', file_path], 
                                      capture_output=True, text=True)
                if result.returncode != 0:
                    issues.append(f"ESLint警告: {result.stdout.strip()}")
                    
        except FileNotFoundError:
            pass
            
        return issues

    def _check_tests(self, file_path: str) -> List[str]:
        """関連テストの実行"""
        issues = []
        
        # テストファイルの推測
        test_patterns = [
            file_path.replace('.py', '_test.py'),
            file_path.replace('.py', '.test.py'), 
            file_path.replace('.js', '.test.js'),
            file_path.replace('.ts', '.test.ts'),
        ]
        
        for test_file in test_patterns:
            if Path(test_file).exists():
                try:
                    if test_file.endswith('.py'):
                        result = subprocess.run(['python', '-m', 'pytest', test_file, '-v'], 
                                              capture_output=True, text=True)
                    else:
                        result = subprocess.run(['npm', 'test', '--', test_file], 
                                              capture_output=True, text=True)
                    
                    if result.returncode != 0:
                        issues.append(f"テスト失敗 ({test_file}): {result.stdout.strip()}")
                        
                except FileNotFoundError:
                    pass
                break
                
        return issues

    def _has_code_modifications(self, transcript_path: str) -> bool:
        """コード修正があったかチェック"""
        try:
            with open(transcript_path, 'r') as f:
                for line in f:
                    try:
                        entry = json.loads(line.strip())
                        if (entry.get('type') == 'tool_use' and 
                            entry.get('name') in ['Write', 'Edit', 'MultiEdit']):
                            return True
                    except json.JSONDecodeError:
                        continue
        except Exception:
            pass
        return False

    def _check_requirements(self) -> List[str]:
        """必須要件チェック"""
        missing = []
        
        # 必須ファイル
        required_files = ['README.md']  # プロジェクトに応じて調整
        for file in required_files:
            if not (self.project_root / file).exists():
                missing.append(f"必須ファイル: {file}")
        
        # package.json のスクリプト
        package_json = self.project_root / 'package.json'
        if package_json.exists():
            try:
                with open(package_json) as f:
                    data = json.load(f)
                    scripts = data.get('scripts', {})
                    required_scripts = ['test', 'build']  # 必要に応じて調整
                    for script in required_scripts:
                        if script not in scripts:
                            missing.append(f"package.json script: {script}")
            except Exception:
                pass
        
        return missing

    def _run_full_tests(self) -> bool:
        """全体テスト実行"""
        try:
            # Node.js プロジェクト
            if (self.project_root / 'package.json').exists():
                result = subprocess.run(['npm', 'test'], 
                                      capture_output=True, text=True, cwd=self.project_root)
                return result.returncode == 0
            
            # Python プロジェクト
            if (self.project_root / 'pyproject.toml').exists():
                result = subprocess.run(['python', '-m', 'pytest'], 
                                      capture_output=True, text=True, cwd=self.project_root)
                return result.returncode == 0
                
        except Exception:
            pass
        return True  # テストが設定されていない場合は通す

    def _run_formatting(self):
        """フォーマット実行"""
        try:
            if (self.project_root / 'package.json').exists():
                subprocess.run(['npm', 'run', 'format'], 
                             capture_output=True, cwd=self.project_root)
                print("✅ フォーマット完了")
        except Exception:
            pass

    def _send_custom_notification(self, message: str):
        """カスタム通知送信"""
        # Slack, Discord, メール等に送信
        # 実装は環境に応じて調整
        print(f"🚨 重要な通知: {message}")

def main():
    try:
        # stdin から JSON データを読み取り
        hook_data = json.load(sys.stdin)
        
        # プロセッサーを初期化して実行
        processor = ClaudeCodeHookProcessor()
        result = processor.process_hook(hook_data)
        
        # 結果を出力
        if result:
            print(json.dumps(result))
        
        sys.exit(0)
        
    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON input: {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

Discussion