改ざん耐性ログを最小実装する:ハッシュ鎖・検証・リカバリ

TL;DR
- append-only + ハッシュ鎖で「消せない・すり替えられない」を作る。各レコードのcontent_hashと直前のparent_hashを連結してchain_hashを更新。
- 検証は決定論シリアライズ(JSON整形の癖を排除)+LAGで再計算。ギャップ/分岐/重複は自動検出できる。
- 運用はスナップショット(checkpoint)+キー管理+リカバリ手順をセットで。監査はソースエクスポート×ハッシュ照合で担保する。
スキーマ(最小)
audit_log
column type note
audit_id STRING substr(chain_hash,0,16)など
seq INT64 単調増加(重複・欠番検出用)
parent_hash STRING 直前レコードのchain_hash
content_hash STRING 決定論シリアライズのSHA-256
chain_hash STRING SHA256(parent_hash + content_hash)
at TIMESTAMP 付与時刻(UTC)
by STRING 実行主体(svc名/ユーザーID)
hint STRING 任意メモ(エントリIDなど)
checkpoints
| as_of_seq INT64 | chain_hash STRING | snapshot_uri STRING | at TIMESTAMP |
Python:書き込み(append-only)
audit_log_writer.py
import json, time, hashlib, os
from typing import BinaryIO, Dict
def canonical_json(d: Dict) -> str:
# key順・小数点・空白を固定して決定論に
return json.dumps(d, ensure_ascii=False, separators=(",", ":"), sort_keys=True)
def sha256_hex(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
class AuditWriter:
def init(self, stream: BinaryIO, starting_parent: str = "0"*64):
self.stream = stream
self.parent = starting_parent
self.seq = 0
def append(self, entry: Dict, actor: str, hint: str = "") -> Dict:
self.seq += 1
payload = {
"seq": self.seq,
"entry": entry, # raw(後の検証は entry→content_hash で行う)
}
c = sha256_hex(canonical_json(payload))
chain = sha256_hex(self.parent + c)
row = {
"audit_id": chain[:16],
"seq": self.seq,
"parent_hash": self.parent,
"content_hash": c,
"chain_hash": chain,
"at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"by": actor,
"hint": hint
}
self.stream.write((json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8"))
self.stream.flush()
self.parent = chain
return row
if name == "main":
import sys
w = AuditWriter(sys.stdout.buffer)
# デモ書き込み
w.append({"type": "journal_entry", "id": "JE-001", "amount": 150}, actor="etl", hint="JE-001")
w.append({"type": "rule_change", "id": "RC-101", "delta": {"add": "usdc_in_v2"}}, actor="ops", hint="RC-101")
免責:本稿は一般的情報の提供を目的とし、法務・会計・税務の助言には該当しません。
Discussion