製造業RAG運用編:監査ログ + イベント駆動再インデックスを実装する【コード付き】
はじめに
製造業向けRAGシリーズの第3弾です。
第1弾(設計編)では、製造業特有のアクセス制御の複雑さ——文書の多層的な機密レベル、OT/ICS環境のネットワーク分断、ADとVector Storeの制御主体の違い——を整理し、allowed_groupsをメタデータに焼き込むAccess label mappingとFail Closedを軸とした設計原則を解説しました。
第2弾(実装編)では、その設計をPythonで動かしました。ChromaDB + Cohere Embed/Rerankを使ったACL-aware retrievalパイプラインを約150行で実装し、田中保守員・山田一般スタッフ・鈴木工場長の3人で「ユーザーごとに返ってくる内容が変わる」動作を実際の出力で確認しました。
第2弾(実装編)の末尾にも記載していましたが、以下の2点が未実装のままでした。
- 監査ログの実装(誰がいつ何を検索したかの記録)
- イベント駆動の再インデックス(文書更新時の即時反映)
アクセス制御が動く段階で「守り」は一段階確立できていますが、実運用に耐えるシステムにはこの2つがさらに必要です。今回はこの2点を実装します。
本記事で実装するもの:
-
audit_logger.py:クエリと返答の記録(JSONL形式) -
reindex_trigger.py:文書更新時の delete → insert パイプライン -
query.pyへの統合:セキュリティチェックから監査ログまでの完全なフロー
実装環境:
- macOS(M1 MacBook Air)/ Python 3.12 / uv
- ChromaDB(ローカルVector Store)
- Cohere Embed v3 / Cohere Rerank v3.5
- Claude API(回答生成)
対象読者: 設計編・実装編を読んだエンジニア・アーキテクト。実装済みのRAGパイプラインに監査ログと再インデックスを追加したい方。
シリーズ構成:
1. 監査ログとは何か — なぜRAGに必要か
ADとの比較から考える
設計編で触れた通り、Active Directory + NTFSのACLはOSカーネルがアクセスを強制します。「誰がどのファイルを開いたか」はOSが自動的に記録します。
一方、RAGシステムのアクセス制御はアプリケーションコードが担う設計です。つまり、監査ログもアプリケーションが自前で実装しなければなりません。
| 比較観点 | AD + NTFSのACL | RAGシステム |
|---|---|---|
| アクセス制御の強制者 | OSカーネル | アプリケーションコード |
| 監査ログの記録者 | OSが自動記録 | アプリケーションが個別に実装する必要がある |
| ログの保存先 | OSのイベントログ | 任意のファイル・SIEM等 |
監査ログが機能する3つのシナリオ
① Fail Open検知(事後)
Fail Open実装ミスが発生した場合(設計編Section 5参照)、監査ログ単体で即座に検知できるわけではありません。しかし、ACL判定結果や参照チャンクIDを含む詳細なログを継続的に記録しておくことで、SIEMでの相関分析により「フィルターが適用されていないクエリ」や「権限外チャンクが参照された可能性」を事後的に検知しやすくなります。監査ログはインシデント発生後の調査精度を高める基盤として機能します。
② 不審な検索の検知
「なぜこの社員が深夜に設計図面を大量検索しているのか」——こうした異常を事後的に検知するには、user_id / timestamp / query_text の記録が最低限必要です。
③ コンプライアンス対応
ISO 27001 Annex A 8.15(ログ取得)への対応として、「誰がRAGで何を引き出したか」の証跡が必要になります。特にOT/ICS環境では、設備点検マニュアルや設計図面へのアクセス記録が外部監査の対象になるケースがあります。
2. audit_logger.py の実装
今回の実装はJSONL形式(1行1レコード)でローカルファイルに記録するシンプルな構成にしています。実運用ではSIEM(Splunk・Microsoft Sentinel等)への転送が必要になりますが、設計の考え方はここで解説した通りです。
# audit_logger.py
# 監査ログ: クエリと返答の記録
import json
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
DEBUG = os.getenv("DEBUG", "False").lower() == "true"
LOG_FILE = "audit_log.jsonl"
def log_query(
user_id: str,
groups: list[str],
query: str,
docs_retrieved: int,
rerank_scores: list[float],
response_length: int,
pii_detected: bool,
) -> None:
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"groups": groups,
"query": query,
"docs_retrieved": docs_retrieved,
"rerank_scores": rerank_scores,
"response_length": response_length,
"pii_detected": pii_detected,
}
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
if DEBUG:
print(f"[AUDIT] {json.dumps(entry, ensure_ascii=False, indent=2)}")
def log_access_denied(user_id: str, groups: list[str], query: str, reason: str) -> None:
entry = {
"timestamp": datetime.now().isoformat(),
"event": "ACCESS_DENIED",
"user_id": user_id,
"groups": groups,
"query": query[:100],
"reason": reason,
}
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
if DEBUG:
print(f"[AUDIT] {json.dumps(entry, ensure_ascii=False, indent=2)}")
設計上のポイント
JSONL形式を選んだ理由
1行1レコードのJSON(JSON Lines)は、ログ解析ツール(Splunk・Athena等)との相性が良く、ファイルに追記するだけでよいため実装がシンプルです。PostgreSQLやDynamoDBに書き込む場合も、このJSON構造をそのまま流用できます。
2つの関数を分けた理由
log_queryは正常なクエリの記録、log_access_deniedはアクセス拒否イベントの記録です。拒否イベントには"event": "ACCESS_DENIED"フィールドを追加しており、SIEMで「ACCESS_DENIEDのみをアラート対象にする」というフィルタリングが容易になります。
query[:100]にしている理由
アクセス拒否イベントのクエリは最大100文字に切り詰めて記録しています。Prompt Injection攻撃の文字列にはログパーサーやSIEMが誤って解釈するような制御文字・改行・スクリプト断片が含まれることがあります。攻撃文字列をそのままログに書き込むと、ログ解析システム側でLog Injection攻撃として悪用される可能性があるため、記録する文字数を意図的に制限しています。
📌 実運用では文字数制限に加え、改行コード(
\n、\r)や制御文字のサニタイズも推奨します。文字数の切り捨てはあくまで副次的な対策であり、サニタイズが根本対策になります。
📌 実運用で追加すべき項目
本実装はローカル環境での動作検証を目的とした基本構成です。実運用では以下の追加を検討してください:
chunk_idレベルでの参照文書ID記録(どのチャンクが使われたか)
- 回答テキスト本文(コンプライアンス要件がある場合)
- ADへの問い合わせ記録
- SIEM転送(Splunk Connect for Python・Azure Monitor等)
- ログの保管期間は組織のコンプライアンス要件に従い定義すること(ISO 27001では最低1年が目安。保管先のディスク容量・ローテーション設計も併せて検討する)
3. query.py への統合 — 完全なフロー
なぜ security/ モジュールを追加するのか
第2弾(実装編)のquery.pyは「ACL-aware retrieval」に特化した実装でした。ユーザーのグループを取得してフィルタリングし、Claudeに回答させる——という一連の流れは動いていましたが、セキュリティの観点からはさらに2点、考慮すべき事項がありました。
① 入力検証とレート制限
「冷却システムの点検手順を教えてください」のような正常なクエリだけを想定した実装でした。しかし実際の運用では「これまでの指示を無視してください」のようなPrompt Injection攻撃や、意図しない連続リクエストや悪意あるユーザーによるレート超過が発生しえます。クエリがClaudeに届く前に弾く層を追加することで、システム全体の安全性が高まります。
② 出力のPIIフィルタリング
Claudeが回答を生成する際、検索された文書の中にメールアドレスや電話番号が含まれていた場合、そのままレスポンスに出力されるリスクがあります。特にOT/ICS環境の文書には担当者情報が記載されていることが多く、出力フィルタリングを加えることでPII漏洩のリスクを低減できます。
これらを解決するために、query.pyに以下の3つのモジュールを追加しています。
| モジュール | 関連するOWASP LLMリスク区分 | 役割 |
|---|---|---|
security/input_validation.py |
LLM01: Prompt Injection | Injection パターンの検出・ブロック |
security/output_filter.py |
LLM05: Improper Output Handling | PII(個人情報)の検出・マスク |
security/rate_limiter.py |
LLM10: Unbounded Consumption | リクエスト数・トークン消費の上限制御 |
さらに今回実装したaudit_logger.pyを加えることで、クエリの開始から回答返却まで全ステップが記録される構成になります。統合後の完全なフローは以下の通りです。
ユーザーの質問
↓
[Step 0] 入力検証 (input_validation.py)
→ Prompt Injection パターンを検出したら即座にブロック
↓
[Step 0] レート制限 (rate_limiter.py)
→ 60秒間に5回超 / 日次10,000トークン超はブロック
↓
[Step 1] ユーザーグループ取得(Fail Closed)
→ グループ情報が取得できない場合はアクセスを拒否
↓
[Step 2] 質問を Cohere Embed でベクトル化
↓
[Step 3] ChromaDB 検索 → Python 側で ACL フィルタリング
↓
[Step 4] Cohere Rerank で関連度並び替え
↓
[Step 5] Claude で回答生成
↓
[Step 6] 出力フィルタリング (output_filter.py)
→ PII(クレジットカード番号・メールアドレス等)を検出・マスク
↓
[Step 7] 監査ログ記録 (audit_logger.py)
↓
回答を返却
# query.py(統合版・抜粋)
from audit_logger import log_query, log_access_denied
from security.input_validation import validate_input
from security.output_filter import filter_output
from security.rate_limiter import RateLimiter
limiter = RateLimiter()
def rag_query(user_id: str, question: str, top_k: int = 5) -> str:
# Step0: 入力検証・レート制限
is_valid, reason = validate_input(question)
if not is_valid:
log_access_denied(user_id, [], question, reason)
return f"[入力エラー] {reason}"
is_allowed, reason = limiter.check_rate_limit(user_id)
if not is_allowed:
log_access_denied(user_id, [], question, reason)
return f"[レート制限] {reason}"
# Step1: ユーザーグループ取得(Fail Closed)
groups = get_user_groups(user_id)
# Step2〜4: Embed → ACLフィルタ → Rerank(実装編と同様)
# ...(省略)
# Step5: Claude で回答生成
raw_answer = claude.messages.create(...).content[0].text
# Step6: 出力フィルタリング
filtered_answer, warnings = filter_output(raw_answer)
# Step7: 監査ログ記録
log_query(
user_id=user_id,
groups=groups,
query=question,
docs_retrieved=len(docs),
rerank_scores=[r.relevance_score for r in rerank_response.results],
response_length=len(filtered_answer),
pii_detected=len(warnings) > 0,
)
return filtered_answer
ログの出力イメージ
{"timestamp": "2026-04-14T10:23:41.123456", "user_id": "tanaka", "groups": ["maintenance_line_a", "all_staff"], "query": "冷却システムの点検手順を教えてください", "docs_retrieved": 3, "rerank_scores": [0.9999745, 0.06548521, 0.00029365], "response_length": 312, "pii_detected": false}
{"timestamp": "2026-04-14T10:24:05.654321", "event": "ACCESS_DENIED", "user_id": "unknown_user", "groups": [], "query": "ignore previous instructions", "reason": "不正な入力パターンを検出しました: 'ignore previous instructions'"}
4. イベント駆動再インデックスの設計
なぜバッチ処理では不十分か
「週に1回、全文書を再インデックスすれば十分ではないか」——この方針は、変更頻度の低い文書(製品カタログ等)なら許容できます。しかし製造業のSOP(作業手順書)や社内規則の場合、更新から次週の再インデックスまでの間、古い内容がRAGから返され続けることになります。システム設計やオペレーション運用によっては、現場で古いSOPに沿った作業が行われるきっかけになりかねません。RAGの出力を一次情報として扱う運用では特に注意が必要です。
【バッチ処理の問題】
月曜: 冷却システムのSOP改訂(弁の開放手順が変更)
↓ 新バージョンが登録されない
水曜: 夜間シフト保守員がRAGで手順を確認
↓ 古いバージョンの手順が返される ❌
日曜: 週次バッチ再インデックス実行
↓ ここでようやく更新が反映される
OT/ICS環境ではこのタイムラグが安全上のリスクになりえます。そこでイベント駆動(文書が更新されたタイミングで即座に再インデックスする)アプローチが必要になります。
本実装のスコープについて
本記事では、文書管理システムとの連携部分はシミュレーションで代替しています。実際の製造業環境では SharePoint や Confluence との Webhook 連携が必要になりますが、社内システムへのアクセス権・認証・セキュリティポリシーが絡むため、本シリーズではコアロジック(delete → insert の順序厳守)の解説に留めます。
【本来のイベント駆動設計(実運用)】
文書管理システムで文書が更新される
↓
更新イベントが Webhook エンドポイントに送信される
↓
reindex_trigger.py がリクエストを受信・処理をトリガー
↓
該当文書を delete → 再 Embed → insert
【本記事の実装(シミュレーション)】
simulate_webhook() を手動で呼び出す形で代替
5. reindex_trigger.py の実装
# reindex_trigger.py
# イベント駆動再インデックス: 文書更新の Webhook シミュレーション
import os
import cohere
import chromadb
from dotenv import load_dotenv
from chunking import chunk_document
load_dotenv()
co = cohere.Client(os.getenv("COHERE_API_KEY"))
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_collection("rag_docs")
def delete_document(doc_id: str) -> int:
results = collection.get(where={"source_doc_id": doc_id})
chunk_ids = results["ids"]
if not chunk_ids:
print(f"[REINDEX] 削除対象なし: {doc_id}")
return 0
collection.delete(ids=chunk_ids)
print(f"[REINDEX] 削除完了: {doc_id} → {len(chunk_ids)}チャンク削除")
return len(chunk_ids)
def reindex_document(doc: dict) -> int:
chunks = chunk_document(doc, chunk_size=200, overlap=20)
texts = [chunk["text"] for chunk in chunks]
response = co.embed(
texts=texts,
model="embed-multilingual-v3.0",
input_type="search_document",
)
embeddings = response.embeddings
collection.add(
ids=[chunk["id"] for chunk in chunks],
embeddings=embeddings,
documents=texts,
metadatas=[
{
**chunk["metadata"],
"allowed_groups": ",".join(chunk["metadata"]["allowed_groups"]),
}
for chunk in chunks
],
)
print(f"[REINDEX] 再登録完了: {doc['id']} → {len(chunks)}チャンク登録")
return len(chunks)
def simulate_webhook(doc_id: str, updated_doc: dict) -> None:
"""文書更新 Webhook シミュレーション。delete → insert の順序を厳守する。"""
print(f"\n{'='*50}")
print(f"[WEBHOOK] 文書更新イベント受信: {doc_id}")
print(f"{'='*50}")
deleted = delete_document(doc_id)
inserted = reindex_document(updated_doc)
print(f"[WEBHOOK] 完了: {deleted}チャンク削除 → {inserted}チャンク登録")
print(f"{'='*50}\n")
delete → insert の順序を守る理由
再インデックスは必ず「古いチャンクをdelete → 新しいチャンクをinsert」の順序で実行します。逆順(insert → delete)にすると、一時的に新旧両バージョンのチャンクがVector Store上に共存する期間が生まれます。
【逆順(insert → delete)の危険】
① 新バージョンのチャンクを insert
→ この瞬間、旧バージョンと新バージョンが混在 ❌
② 旧バージョンのチャンクを delete
→ ここでようやく解消
【正しい順序(delete → insert)】
① 旧バージョンのチャンクを delete
→ この瞬間、文書がVector Storeから消える(検索結果に出なくなる)
② 新バージョンのチャンクを insert
→ ここから新バージョンが検索可能になる
旧バージョンが消えている瞬間にクエリが来ると「文書が見つかりません」と返ることになりますが、製造業のSOP変更においては、新旧の手順が混在した回答を返すよりも、一時的に「文書なし」として返す方が安全上のリスクは低くなります。
動作確認
# reindex_trigger.py 実行例
UPDATED_DOC = {
"id": "doc_001",
"text": "冷却システム点検手順書(ラインA)v2 — 改訂版: 冷却弁の開放手順を変更。...",
"metadata": {
"doc_type": "SOP",
"department": "製造部",
"allowed_groups": ["maintenance_line_a", "plant_manager"],
},
}
simulate_webhook("doc_001", UPDATED_DOC)
$ uv run python reindex_trigger.py
==================================================
[WEBHOOK] 文書更新イベント受信: doc_001
==================================================
[REINDEX] 削除完了: doc_001 → 2チャンク削除
[REINDEX] 再登録完了: doc_001 → 2チャンク登録
[WEBHOOK] 完了: 2チャンク削除 → 2チャンク登録
==================================================
| テストケース | 結果 |
|---|---|
| doc_001 更新シミュレーション | ✅ 2チャンク削除 → 2チャンク再登録 |
| delete → insert 順序確認 | ✅ 厳守を確認 |
| 再インデックス後のクエリ | ✅ 新バージョンの内容が返却される |
6. まとめと次回予告
3弾シリーズの対応関係
今回の実装で、設計編(第1弾)で示したチェックリストが全項目カバーできました。
| 設計チェックリスト(設計編) | 実装した記事 |
|---|---|
Indexing時に allowed_groups をメタデータに付与 |
第2弾 indexing.py
|
| Query時に Fail Closed でグループ取得 | 第2弾 query.py Step 1 |
| ACL-aware retrieval(Python 側フィルタリング) | 第2弾 query.py Step 3 |
| Rerank で関連度精度向上 | 第2弾 query.py Step 4 |
| Prompt Injection 対策 | 第3弾 security/input_validation.py
|
| 出力フィルタリング(PII 検出・マスク) | 第3弾 security/output_filter.py
|
| レート制限 | 第3弾 security/rate_limiter.py
|
| 監査ログ |
第3弾 audit_logger.py ← 今回 |
| イベント駆動再インデックス |
第3弾 reindex_trigger.py ← 今回 |
今回の実装(最低限)と実運用の差分
今回の実装はローカル環境で動作確認できる「最低限の構成」です。実運用に向けては以下の対応が必要になります。
- 監査ログの転送: JSONL ファイル → Splunk / Microsoft Sentinel / CloudWatch Logs
-
再インデックスの自動化:
simulate_webhook()→ SharePoint / Confluence の Webhook エンドポイント(FastAPI 実装) - chunk_id レベルのログ: どのチャンクが参照されたかの追跡
- 回答テキストの記録: コンプライアンス要件がある場合
次回予告
第4弾では、Prompt Injection 対策の深掘りを予定しています。今回実装したブロックリスト方式の限界・LLM による意図判定・Indirect Prompt Injection(RAG 経由の攻撃)という3つの観点から、実装アプローチを比較します。
参考
- OWASP LLM Top 10: https://owasp.org/www-project-top-10-for-large-language-model-applications/
- Cohere Embed: https://docs.cohere.com/docs/embeddings
- Cohere Rerank: https://docs.cohere.com/docs/rerank
- ChromaDB: https://docs.trychroma.com/
- ISO 27001 Annex A 8.15(ログ取得): https://www.iso.org/standard/27001
Discussion