【意図乖離検出;連載 第9回】AWS上でのサーバーレス実装 - 設計から実践へ
AWS上でのサーバーレス実装 - 設計から実践へ
前回のふりかえり
前回は、AIマルチエージェント技術と民主型投票アーキテクチャについて説明しました。複数のLLMが協調して意図乖離を検証する仕組みです。
今回は、より具体的な実装の話をします。IDDをAWS上でどのように実装するのか。サーバーレスアーキテクチャの採用理由、使用するサービス、設計パターン。Pythonのコード例も交えながら、詳しく見ていきます。
ただし、この記事は単なる技術選定の解説ではありません。IDD自身が掲げる「Why(なぜそうしたか)」と「Not(なぜ他を選ばなかったか)」を明示する原則を、IDD自身の実装ドキュメントにも適用します。設計判断の「Not」を書くことで、将来この構成を変更する際の判断基準が明確になります。これがIDDの思想を体現する設計ドキュメントの書き方です。
なぜサーバーレスか——Why/Notで語る設計判断
では早速、最初の大きな設計判断から。各設計判断について、選定理由だけでなく、不採用の理由とその判断が覆る条件(reconsider_when) も記述していきます。
IDDワークロードの特性
IDDのワークロードは、以下の特性を持ちます。
イベント駆動
ドキュメントの更新、PRの作成、CI/CDパイプラインの実行など、特定のイベントに応じて処理が発生します。
バースト的
コミットが集中する時間帯(午前中や夕方)と、ほとんど処理がない時間帯(夜間や週末)があります。
CPU集約的(LLM呼び出し待ち)
LLM APIの呼び出し待ちが処理時間の大部分を占めます。実際の計算処理は軽量です。
ステートレス
各検証リクエストは独立しており、前後の処理に依存しません。
これらの特性は、サーバーレスアーキテクチャと非常に相性が良いです。
サーバーレスの利点
| 利点 | IDDへの適用 |
|---|---|
| 自動スケーリング | バースト的なワークロードに対応 |
| 従量課金 | 処理がない時間帯はコストゼロ |
| 運用負荷軽減 | サーバー管理不要 |
| 高可用性 | AWSがインフラの可用性を保証 |
| イベント統合 | S3、EventBridge等とネイティブ連携 |
Not: なぜコンテナ(ECS/EKS/Fargate)でないのか
サーバーレスの利点だけを語るのではIDDらしくありません。なぜ他の選択肢を採用しなかったのかを明記します。
| 候補 | 不採用の理由 |
|---|---|
| ECS on Fargate | IDDの各処理は独立したイベント駆動であり、常時起動のコンテナは不要。LLM API呼び出し待ちが処理時間の大部分を占めるため、コンテナを「待機」させるのはコスト効率が悪い |
| EKS(Kubernetes) | IDDはオーケストレーションをStep Functionsで実現しており、Kubernetesの複雑なクラスタ管理は運用負荷に見合わない。IDDの開発チーム(想定: 2-5名)にKubernetes運用の専門知識を要求するのは現実的でない |
| EC2(常時起動) | バースト的ワークロードに対し、ピーク時に合わせたインスタンスを常時確保するのはコスト面で論外。オートスケーリングも可能だが、Lambda + Step Functionsの方がはるかにシンプル |
| AWS App Runner | Webリクエスト/レスポンス型のワークロードに最適化されており、IDDのような非同期イベント駆動処理には不向き |
サーバーレスの課題と対策
サーバーレスには課題もあります。IDDでは、以下のように対策しています。
コールドスタート
- 課題: Lambda関数の初回起動に数秒かかる
- 対策: Provisioned Concurrencyを重要な関数に適用、軽量なランタイム(Python)の使用
15分の実行時間制限
- 課題: Lambda関数は最大15分で終了する
- 対策: Step Functionsで長時間ワークフローを管理、処理を細かく分割
ステートレスの制約
- 課題: Lambda関数間で状態を共有できない
- 対策: DynamoDB、S3、Step Functionsの入出力で状態を管理
デバッグの難しさ
- 課題: 分散システムのデバッグは困難
- 対策: X-Rayによるトレーシング、CloudWatch Logsの構造化ログ
アーキテクチャ概要
全体像
IDDのAWSアーキテクチャは、以下の主要コンポーネントで構成されます。
主要サービスの役割
| サービス | 役割 | 選定理由 |
|---|---|---|
| S3 | 成果物ストレージ | 大容量、低コスト、イベント連携 |
| Lambda | 処理実行 | サーバーレス、自動スケール |
| Step Functions | ワークフロー管理 | 状態管理、並列実行(Map State)、可視化 |
| Bedrock | LLM API(Claude, Titan) | マネージド、複数モデル対応、VPC内アクセス |
| 外部LLM API | GPT-5.2, Gemini 3 Pro | 民主型投票の多様性確保(プロバイダ分散) |
| DynamoDB | メタデータ + キャッシュ + 投票結果 | 高速、スケーラブル、サーバーレス |
| OpenSearch Serverless | ベクトル検索 | 自動スケール、10億規模対応、GPU高速化 |
| AppConfig | 設定管理 | 動的設定変更、Feature Flag |
| EventBridge | イベント管理 | スケジューリング、イベントルーティング |
| SQS / DLQ | 非同期処理 + 障害退避 | 民主型投票の障害時キュー退避 |
| X-Ray + CloudWatch | 可観測性 | 分散トレーシング、投票メトリクス監視 |
主要サービス選定の「Not」
IDDらしく、各サービスの「なぜ他を選ばなかったか」を明記します。
| サービス | 候補だったもの | 不採用の理由 |
|---|---|---|
| Step Functions | SWF(Simple Workflow)、自前のオーケストレーター | SWFはレガシー。自前実装はリトライ、エラー処理、可視化を一から作る必要があり、IDDのコアバリューでない部分に工数を取られる |
| DynamoDB | Aurora Serverless、ElastiCache | RDBはJOINが不要なIDDのアクセスパターンにオーバースペック。ElastiCacheは永続性が保証されず、投票結果やメタデータの保存先として不適切 |
| Bedrock | 外部API直接呼び出しのみ | VPC内からの安全なアクセス、IAMによる権限管理、複数モデルの統一的なAPIが必要。ただし、民主型投票ではプロバイダ多様性が必要なため、外部APIも併用する(これが「Bedrockだけにしなかった理由」でもある) |
| AppConfig | Parameter Store、S3上のJSON | Parameter Storeは設定変更の段階的デプロイ(Feature Flag的な使い方)に弱い。S3は動的な設定変更のポーリングが非効率 |
IDDの各概念とAWSサービスの対応
第7回・第8回で説明したIDDの各概念が、AWS上でどのサービスに対応するかを整理します。
意図の4要素モデル(Intent Quadrant)
| 4要素 | AWS上での実現 | 詳細 |
|---|---|---|
| What(何を) | S3 + Bedrock | MarkdownドキュメントからBedrock(Claude)が自動抽出。結果はS3に構造化保存 |
| Why(なぜ) | S3 + Bedrock + OpenSearch Serverless | 最も重要な要素。抽出後にベクトル化しOpenSearch Serverlessで類似意図を検索可能に |
| How(どう) | S3 + Bedrock | 設計判断・トレードオフをBedrockで抽出。ADRとの紐付けはDynamoDBで管理 |
| Not(なぜ他を選ばなかったか) | S3 + Bedrock | 却下理由をBedrockが抽出。reconsider_when条件はEventBridgeで定期チェック可能 |
3層トレーサビリティモデル
| 層 | 役割 | AWSサービス | 実装方法 |
|---|---|---|---|
| L1: 構造トレース | 成果物間の対応関係 | DynamoDB | シングルテーブル設計でID間リンクを管理。GSI(Global Secondary Index)で逆引き |
| L2: 意図トレース | リンクの理由・制約 | DynamoDB + S3 | DynamoDBにリンクメタデータ、S3に詳細な意図記述を保存 |
| L3: 意図検証 | 意図の保持を検証 | Step Functions + Bedrock + 外部LLM | カスケード処理 → 民主型投票ワークフローで自動検証 |
3層プロジェクト階層モデル
| 層 | AWSサービス | 実装方法 |
|---|---|---|
| L0: Organization |
S3(プレフィックス階層) + DynamoDB(PK: ORG#xxx) |
組織ポリシーはS3の_intent/配下、メタデータはDynamoDB |
| L1: Project |
S3(サブプレフィックス) + DynamoDB(SK: PROJECT#xxx) |
継承ルールはLambdaで制約チェック時に上位を自動読み込み |
| L2: SubProject |
S3(サブプレフィックス) + DynamoDB(SK: SUBPROJECT#xxx) |
制約の厳格化のみ許可するバリデーションをLambdaで実装 |
継承ルールの例:
L0 (Organization): 「全APIはOAuth 2.0で認証すること」(MUST)
↓ 継承(緩和不可)
L1 (Project): 「OAuth 2.0 + PKCE拡張を使用すること」(MUST、厳格化)
↓ 継承(緩和不可)
L2 (SubProject): 「トークン有効期限は15分以内」(MUST、さらに厳格化)
民主型投票アーキテクチャ
| 概念 | AWSサービス | 実装方法 |
|---|---|---|
| 並列分析(独立性保証) | Step Functions Map State |
MaxConcurrency指定で3つのLLMに同時リクエスト。各Lambdaは隔離された入力のみ受け取る |
| 投票集計 | Lambda(Aggregator) | Map Stateの出力を集約し、多数決・信頼度計算・不一致分析を実行 |
| カスケード処理 | Step Functions Choice State | Level 1(軽量)→ Level 2(標準)→ Level 3(民主型投票)の段階的エスカレーション |
| 議論フェーズ | Step Functions ループ | Round 1(初期分析)→ Round 2(相互批評)→ Round 3(最終判断)をイテレーション |
| Bedrock呼び出し | Bedrock InvokeModel | Claude Sonnet/Opus をVPC内からダイレクト呼び出し |
| 外部LLM呼び出し | Lambda + Secrets Manager | GPT-5.2, Gemini 3 ProのAPIキーをSecrets Managerで管理、Lambdaから呼び出し |
| 障害時フォールバック | SQS Dead Letter Queue | 全LLM障害時にDLQに退避。CloudWatch Alarmで通知 |
| 少数意見の記録 | DynamoDB |
VALIDATION#プレフィックスで検証結果と少数意見を永続化 |
| 信頼度キャリブレーション | DynamoDB + Lambda | 過去の正解率をDynamoDBに蓄積、Lambda関数でPlatt Scaling補正を適用 |
| 投票メトリクス | CloudWatch Custom Metrics | 一致率、少数意見率、棄権率をカスタムメトリクスとして送信 |
データモデル
S3: 成果物ストレージ
成果物は、3層プロジェクト階層に従ってS3に保存されます。このディレクトリ構造自体が、組織のルール/ガードレールの継承構造を表現しています。
s3://idd-artifacts-prod/
├── acme-corp/ # L0: Organization
│ ├── _intent/
│ │ ├── security-policy.md # 組織全体のセキュリティ方針
│ │ └── coding-standards.md # 組織全体のコーディング規約
│ ├── _config/
│ │ └── organization.yaml # 組織レベルの設定・制約定義
│ │
│ ├── ec-platform/ # L1: Project
│ │ ├── _intent/
│ │ │ └── api-guidelines.md # プロジェクト固有のAPI方針(組織方針を継承+厳格化)
│ │ ├── _config/
│ │ │ └── project.yaml # プロジェクトレベルの設定(組織設定を継承)
│ │ │
│ │ └── auth-service/ # L2: SubProject
│ │ ├── _intent/
│ │ │ └── auth-policy.md # サービス固有の認証方針(上位を継承+厳格化)
│ │ ├── _config/
│ │ │ └── subproject.yaml # サービスレベルの設定(上位設定を継承)
│ │ ├── requirements/
│ │ │ └── REQ-001.md
│ │ └── design/
│ │ └── DES-001.md
継承の仕組み:
各層の_intent/ディレクトリには、その層で定義されたルール・ガードレール・意図が格納されます。検証時には、L0 → L1 → L2の順に上位から読み込まれ、すべての制約が累積的に適用されます。
DynamoDB: シングルテーブル設計
DynamoDBは、シングルテーブル設計を採用しています。シングルテーブル設計とは、RDBのように用途別にテーブルを分けるのではなく、1つのテーブルにPK/SKの命名規則を工夫して複数のエンティティを格納する手法です。Rick Houlihan氏が提唱するDynamoDBのベストプラクティスであり、JOINが使えないDynamoDBで関連データを効率的に取得するために用います。
テーブル構造
| PK | SK | 属性 |
|---|---|---|
ORG#acme |
META |
組織メタデータ |
ORG#acme |
PROJECT#ec-platform |
プロジェクトメタデータ |
ORG#acme#PRJ#ec-platform |
SUBPROJECT#auth |
サブプロジェクトメタデータ |
ORG#acme#PRJ#ec-platform#SUB#auth |
ARTIFACT#REQ-001 |
成果物メタデータ |
ORG#acme#PRJ#ec-platform#SUB#auth |
VALIDATION#2024-01-15T10:30:00Z |
検証結果 |
ATOMIC_COUNTER |
acme-ec-auth-REQ |
ID採番用カウンター |
アクセスパターン
| パターン | PK | SK | 操作 |
|---|---|---|---|
| 組織一覧取得 | ORG#* |
META |
Scan with filter |
| プロジェクト一覧 | ORG#acme |
begins_with(PROJECT#) |
Query |
| サブプロジェクト一覧 | ORG#acme#PRJ#ec-platform |
begins_with(SUBPROJECT#) |
Query |
| 成果物一覧 | ORG#acme#PRJ#ec-platform#SUB#auth |
begins_with(ARTIFACT#) |
Query |
| 検証履歴取得 | ORG#acme#PRJ#ec-platform#SUB#auth |
begins_with(VALIDATION#) |
Query |
OpenSearch Serverless: ベクトル検索
意図のセマンティック検索には、OpenSearch Serverless(Vector Search Collection)を採用します。
Why: IDDの検証では、新しい意図が「過去に定義された類似の意図と矛盾していないか」をチェックする必要がある。これは単なるキーワード検索では実現できず、意味的な類似度を高速に計算するベクトル検索が不可欠である。
なぜAurora pgvectorではなくOpenSearch Serverlessか
当初はAurora Serverless v2 + pgvectorを検討しましたが、IDDのワークロード特性とデータ量の増大を考慮し、OpenSearch Serverlessを選定しました。
| 比較項目 | Aurora pgvector | OpenSearch Serverless | IDDでの評価 |
|---|---|---|---|
| スケーリング | 垂直 + 水平(Read Replica) | 自動水平スケール | ◎ データ増大に追従 |
| 最大ベクトル規模 | 数百万件(HNSWで実用的) | 10億件超(GPU高速化対応) | ◎ 組織横断で安心 |
| インデックス再構築 | IVFFlatは手動再構築が必要 | 自動最適化(2025年追加) | ◎ 運用負荷ゼロ |
| クエリ性能 | 中程度(pgvector 0.8.0で改善) | ミリ秒レベル(k-NN/ANN) | ◎ リアルタイム検索 |
| フィルタ付き検索 | pgvector 0.8.0で大幅改善 | ネイティブサポート | ◯ どちらも可 |
| サーバーレス | Aurora Serverless v2(最小ACU課金あり) | 完全サーバーレス(OCU単位) | ◯ どちらもサーバーレス |
| コスト(最小) | 〜$50/月(最小ACU) | 〜$260/月(最小4 OCU) | △ OpenSearchのほうが高い |
| Bedrock統合 | Bedrock Knowledge Bases対応 | Bedrock Knowledge Bases対応 | ◯ どちらも対応 |
| 全文検索 | PostgreSQL FTS | Luceneベースの高度な全文検索 | ◎ ハイブリッド検索が強力 |
選定理由のまとめ:
- スケーラビリティ: IDDは組織横断で意図を蓄積するため、データ量は時間とともに増大する。pgvectorは数百万件を超えるとインデックス再構築や性能劣化が懸念されるが、OpenSearch Serverlessは10億件規模まで自動スケールする
- ハイブリッド検索: 意図の検索では「ベクトル類似度 + テキストフィルタ + メタデータフィルタ」の組み合わせが頻繁に発生する。OpenSearchはこのハイブリッド検索がネイティブに強い
- 運用負荷: インデックスの自動最適化(2025年追加)により、チューニング不要で最適な検索性能を維持できる
- GPU高速化: 大量のベクトル登録時、GPUアクセラレーションで従来比10倍速(2025年追加)
インデックス設計
{
"settings": {
"index": {
"knn": true,
"knn.algo_param.ef_search": 512,
"number_of_shards": 2,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"org_id": { "type": "keyword" },
"project_id": { "type": "keyword" },
"subproject_id": { "type": "keyword" },
"artifact_type": { "type": "keyword" },
"content": {
"type": "text",
"analyzer": "kuromoji"
},
"intent_why": { "type": "text", "analyzer": "kuromoji" },
"intent_what": { "type": "text", "analyzer": "kuromoji" },
"intent_how": { "type": "text", "analyzer": "kuromoji" },
"intent_not": { "type": "text", "analyzer": "kuromoji" },
"embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "faiss",
"parameters": {
"ef_construction": 256,
"m": 16
}
}
},
"constraints": {
"type": "nested",
"properties": {
"text": { "type": "text" },
"level": { "type": "keyword" }
}
},
"created_at": { "type": "date" },
"updated_at": { "type": "date" }
}
}
}
ポイント:
-
kuromojiアナライザーで日本語テキストの全文検索に対応 - HNSWアルゴリズム + Faissエンジンで高速なANN検索
-
nested型でMUST/SHOULD制約を構造化してフィルタ可能
ハイブリッド検索クエリ
IDDでは、ベクトル類似度 + テキスト + メタデータフィルタを組み合わせたハイブリッド検索が重要です。
class IntentSearchClient:
"""OpenSearch Serverlessを使った意図検索"""
def __init__(self, opensearch_client, embedding_generator):
self.client = opensearch_client
self.embedder = embedding_generator
self.index_name = "intent-embeddings"
def hybrid_search(
self,
query_text: str,
org_id: str,
project_id: str = None,
min_score: float = 0.7,
limit: int = 10
) -> list:
"""ベクトル類似度 + テキスト + メタデータのハイブリッド検索"""
# クエリテキストをベクトル化
query_embedding = self.embedder.generate_embedding(query_text)
# ハイブリッドクエリの構築
query = {
"size": limit,
"query": {
"bool": {
"must": [
# ベクトル類似度検索(k-NN)
{
"knn": {
"embedding": {
"vector": query_embedding,
"k": limit * 2
}
}
}
],
"filter": [
# メタデータフィルタ: 組織ID
{"term": {"org_id": org_id}}
],
"should": [
# テキスト類似度(ブースト)
{
"multi_match": {
"query": query_text,
"fields": [
"intent_why^3",
"intent_what^2",
"content"
],
"type": "best_fields"
}
}
]
}
},
"min_score": min_score
}
# プロジェクトIDが指定されている場合
if project_id:
query["query"]["bool"]["filter"].append(
{"term": {"project_id": project_id}}
)
response = self.client.search(
index=self.index_name,
body=query
)
return [
{
"id": hit["_source"]["id"],
"content": hit["_source"]["content"],
"intent_why": hit["_source"].get("intent_why"),
"score": hit["_score"],
"artifact_type": hit["_source"]["artifact_type"]
}
for hit in response["hits"]["hits"]
]
def find_contradicting_intents(
self,
intent_text: str,
org_id: str,
project_id: str
) -> list:
"""矛盾する可能性のある意図を検索
意図乖離検証で使用: 新しい意図が既存の意図と矛盾していないかチェック
"""
# 高い類似度だが異なるartifact_typeの意図を探す
similar = self.hybrid_search(
query_text=intent_text,
org_id=org_id,
project_id=project_id,
min_score=0.6,
limit=20
)
# 類似度が中程度(0.6-0.85)のものは矛盾の可能性が高い
# 高すぎる(>0.85)場合は同じ意図の別表現の可能性
return [
item for item in similar
if 0.6 <= item["score"] <= 0.85
]
Lambda関数の設計
関数の分割方針
Lambda関数は、単一責任の原則に従って分割しています。ドキュメント処理と意図乖離検証、それぞれのワークフローに対応する関数群です。
ドキュメント処理ワークフロー
| 関数名 | 責任 | トリガー |
|---|---|---|
idd-trigger |
S3イベント受信、ワークフロー起動 | S3 Event / EventBridge |
idd-parse |
ドキュメント解析、構造抽出 | Step Functions |
idd-extract-intent |
LLMによる意図抽出(Why/What/How/Not) | Step Functions |
idd-embed |
ベクトル埋め込み生成 + OpenSearch登録 | Step Functions |
idd-store-result |
DynamoDBへのメタデータ保存 | Step Functions |
意図乖離検証ワークフロー
| 関数名 | 責任 | トリガー |
|---|---|---|
idd-build-context |
プロジェクトコンテキスト構築 + 関連意図検索 | Step Functions |
idd-cascade-level1 |
Level 1 軽量チェック(Haiku) | Step Functions |
idd-cascade-level2 |
Level 2 標準分析(Sonnet) | Step Functions |
idd-voting-analysis |
民主型投票の個別分析(プロバイダ抽象化) | Step Functions Map State |
idd-aggregate-votes |
投票集計・多数決・少数意見記録 | Step Functions |
idd-emit-metrics |
CloudWatch Custom Metricsへの投票メトリクス送信 | Step Functions |
共通 / API
| 関数名 | 責任 | トリガー |
|---|---|---|
idd-search |
類似意図のハイブリッド検索 | API Gateway |
idd-error-handler |
エラーハンドリング + DLQ送信 | Step Functions Catch |
共通パターン
各Lambda関数で使用する共通パターンを、Pythonコードで示します。
Pydanticによる入出力スキーマ定義
Why: LLMの出力は本質的に非構造化テキストであり、そのまま後続処理に渡すとパースエラーや型不一致が頻発する。Pydanticによるスキーマ定義で、LLM出力を確実に構造化データに変換し、バリデーションを自動化する。
Not: 素のdict型やdataclassを使わなかった理由は、Pydanticが提供するmodel_validate()によるJSON→オブジェクト変換とバリデーションが一体化しているため。TypedDictは実行時バリデーションがない。dataclassは型変換が自動でない。
from pydantic import BaseModel, Field
from typing import Optional, List, Literal
from datetime import datetime
class Intent(BaseModel):
"""意図の構造化データ(Why/What/How/Not の4要素)"""
why: str = Field(description="なぜこの要素が必要か")
what: str = Field(description="何を実現するか")
how: Optional[str] = Field(default=None, description="どう実現するか")
not_chosen: Optional[str] = Field(default=None, description="なぜ他を選ばなかったか")
class Constraint(BaseModel):
"""制約条件"""
text: str = Field(description="制約の内容")
level: Literal["MUST", "SHOULD", "MAY"] = Field(description="制約レベル")
verified: bool = Field(default=False, description="検証済みかどうか")
source_section: str = Field(description="抽出元のセクション名")
class TraceLink(BaseModel):
"""トレースリンク"""
target_id: str = Field(description="リンク先のID")
link_type: Literal["upstream", "downstream", "related"]
description: Optional[str] = Field(default=None, description="リンクの説明")
class ValidationIssue(BaseModel):
"""検証で発見された問題"""
severity: Literal["critical", "warning", "info"]
category: str = Field(description="問題のカテゴリ")
description: str = Field(description="問題の説明")
location: Optional[str] = Field(default=None, description="問題の発生箇所")
suggestion: Optional[str] = Field(default=None, description="修正の提案")
class LLMParserOutput(BaseModel):
"""LLMパーサーの出力スキーマ"""
artifact_id: str = Field(description="成果物ID(新規生成または既存)")
artifact_type: Literal["requirement", "design", "implementation", "test", "intent"]
title: str = Field(description="成果物タイトル")
summary: str = Field(description="概要(1-2文)")
intent: Intent = Field(description="抽出された意図")
constraints: List[Constraint] = Field(default_factory=list, description="制約条件リスト")
trace_links: List[TraceLink] = Field(default_factory=list, description="トレースリンクリスト")
validation_issues: List[ValidationIssue] = Field(default_factory=list, description="検証問題リスト")
confidence_score: float = Field(ge=0.0, le=1.0, description="信頼度スコア")
LLMプロセッサー
import hashlib
import json
from typing import Optional, Type, TypeVar
from pydantic import BaseModel
import boto3
from datetime import datetime
T = TypeVar('T', bound=BaseModel)
class LLMProcessor:
"""LLM処理の共通クラス"""
def __init__(
self,
bedrock_client,
dynamodb_table,
model_id: str = "anthropic.claude-sonnet-4-5-20250929-v1:0",
cache_enabled: bool = True
):
self.bedrock = bedrock_client
self.table = dynamodb_table
self.model_id = model_id
self.cache_enabled = cache_enabled
def _compute_hash(self, content: str, system_prompt: str) -> str:
"""入力のハッシュを計算"""
combined = f"{content}|{system_prompt}|{self.model_id}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
def _check_cache(self, content_hash: str) -> Optional[dict]:
"""キャッシュをチェック"""
if not self.cache_enabled:
return None
try:
response = self.table.get_item(
Key={
'PK': f'CACHE#{content_hash}',
'SK': 'RESULT'
}
)
item = response.get('Item')
if item:
return json.loads(item['result'])
except Exception:
pass
return None
def _save_cache(self, content_hash: str, result: dict, ttl_hours: int = 24):
"""結果をキャッシュに保存"""
if not self.cache_enabled:
return
import time
ttl = int(time.time()) + (ttl_hours * 3600)
try:
self.table.put_item(
Item={
'PK': f'CACHE#{content_hash}',
'SK': 'RESULT',
'result': json.dumps(result),
'ttl': ttl,
'created_at': datetime.utcnow().isoformat()
}
)
except Exception:
pass
def process(
self,
content: str,
system_prompt: str,
output_schema: Type[T],
max_tokens: int = 4096
) -> T:
"""LLMで処理し、構造化された出力を返す"""
# キャッシュチェック
content_hash = self._compute_hash(content, system_prompt)
cached = self._check_cache(content_hash)
if cached:
return output_schema.model_validate(cached)
# LLM呼び出し
response = self.bedrock.invoke_model(
modelId=self.model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"system": system_prompt,
"messages": [
{
"role": "user",
"content": content
}
]
})
)
response_body = json.loads(response['body'].read())
result_text = response_body['content'][0]['text']
# JSON抽出
result_json = self._extract_json(result_text)
# バリデーション
parsed_result = output_schema.model_validate(result_json)
# キャッシュ保存
self._save_cache(content_hash, result_json)
return parsed_result
def _extract_json(self, text: str) -> dict:
"""テキストからJSONを抽出"""
import re
# コードブロック内のJSONを探す
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if json_match:
return json.loads(json_match.group(1))
# 直接JSONとしてパース
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# { から } までを抽出
json_match = re.search(r'\{[\s\S]*\}', text)
if json_match:
return json.loads(json_match.group())
raise ValueError("JSONの抽出に失敗しました")
プロジェクトコンテキストの構築(階層継承付き)
このクラスが、3層階層の「ルール/ガードレールの継承」を実現する中核コンポーネントです。検証対象のサブプロジェクトに対し、必ず上位層(Organization → Project)の意図・制約を累積的に読み込み、すべてのルールが適用されるコンテキストを構築します。
import yaml
from dataclasses import dataclass, field
@dataclass
class InheritedConstraint:
"""継承された制約"""
text: str
level: str # MUST / SHOULD / MAY
source_layer: str # L0_org / L1_project / L2_subproject
source_file: str
@dataclass
class HierarchicalContext:
"""階層的に継承されたコンテキスト"""
org_intent: str = ""
project_intent: str = ""
subproject_intent: str = ""
inherited_constraints: list = field(default_factory=list)
effective_constraints: list = field(default_factory=list) # 最終的に有効な制約
class ProjectContextBuilder:
"""プロジェクトコンテキストを構築するクラス
Why: 組織の階層構造に従い、上位層のルール/ガードレールを
下位層に自動継承するため。検証時に「組織のセキュリティポリシーに
違反していないか」を自動チェックするには、上位の制約を
漏れなく読み込む必要がある。
Not: 各層の制約を個別に管理しフラットに結合する方法は採用しなかった。
フラット結合では「どの層で定めた制約か」が失われ、
緩和禁止ルールの検証ができなくなるため。
"""
def __init__(self, s3_client, bucket_name: str):
self.s3 = s3_client
self.bucket = bucket_name
def build_context(
self,
org_id: str,
project_id: Optional[str] = None,
subproject_id: Optional[str] = None
) -> HierarchicalContext:
"""階層に応じたコンテキストを構築
重要: L0 → L1 → L2の順に読み込み、上位の制約は必ず継承される。
下位層で上位の制約を緩和することは許可されない(厳格化のみ可)。
"""
ctx = HierarchicalContext()
all_constraints = []
# L0: Organization level(組織全体のルール = 全プロジェクトに適用)
org_intent = self._read_intent(f"{org_id}/_intent/")
org_config = self._read_config(f"{org_id}/_config/organization.yaml")
if org_intent:
ctx.org_intent = org_intent
if org_config:
org_constraints = self._extract_constraints(org_config, "L0_org")
all_constraints.extend(org_constraints)
# L1: Project level(プロジェクト固有のルール = 組織ルールを継承+厳格化)
if project_id:
project_intent = self._read_intent(f"{org_id}/{project_id}/_intent/")
project_config = self._read_config(
f"{org_id}/{project_id}/_config/project.yaml"
)
if project_intent:
ctx.project_intent = project_intent
if project_config:
project_constraints = self._extract_constraints(
project_config, "L1_project"
)
# 上位制約との整合性チェック(緩和されていないか検証)
self._validate_no_relaxation(all_constraints, project_constraints)
all_constraints.extend(project_constraints)
# L2: SubProject level(サービス固有のルール = 上位ルールを継承+厳格化)
if project_id and subproject_id:
subproject_intent = self._read_intent(
f"{org_id}/{project_id}/{subproject_id}/_intent/"
)
subproject_config = self._read_config(
f"{org_id}/{project_id}/{subproject_id}/_config/subproject.yaml"
)
if subproject_intent:
ctx.subproject_intent = subproject_intent
if subproject_config:
sub_constraints = self._extract_constraints(
subproject_config, "L2_subproject"
)
self._validate_no_relaxation(all_constraints, sub_constraints)
all_constraints.extend(sub_constraints)
ctx.inherited_constraints = all_constraints
ctx.effective_constraints = all_constraints # 全層の制約が有効
return ctx
def _validate_no_relaxation(
self,
parent_constraints: list,
child_constraints: list
) -> None:
"""下位層が上位層の制約を緩和していないか検証
例: 上位でMUSTだった制約を下位でSHOULDに変更 → 違反
例: 上位でMUSTだった制約を下位でより厳しいMUSTに変更 → 許可
"""
LEVEL_ORDER = {"MUST": 3, "SHOULD": 2, "MAY": 1}
parent_by_key = {}
for c in parent_constraints:
# 制約テキストの正規化してキーとする
key = c.text.strip().lower()
parent_by_key[key] = c
for child in child_constraints:
key = child.text.strip().lower()
if key in parent_by_key:
parent = parent_by_key[key]
parent_level = LEVEL_ORDER.get(parent.level, 0)
child_level = LEVEL_ORDER.get(child.level, 0)
if child_level < parent_level:
raise ValueError(
f"制約の緩和は許可されていません: "
f"'{child.text}' を {parent.level}→{child.level} に "
f"緩和しようとしています "
f"(定義元: {parent.source_layer})"
)
def _read_intent(self, prefix: str) -> Optional[str]:
"""指定されたプレフィックス配下の意図ファイルを読み込む"""
try:
response = self.s3.list_objects_v2(
Bucket=self.bucket,
Prefix=prefix
)
intent_contents = []
for obj in response.get('Contents', []):
if obj['Key'].endswith('.md'):
content = self.s3.get_object(
Bucket=self.bucket,
Key=obj['Key']
)['Body'].read().decode('utf-8')
intent_contents.append(content)
return "\n---\n".join(intent_contents) if intent_contents else None
except Exception:
return None
def _read_config(self, key: str) -> Optional[dict]:
"""設定ファイル(YAML)を読み込む"""
try:
content = self.s3.get_object(
Bucket=self.bucket,
Key=key
)['Body'].read().decode('utf-8')
return yaml.safe_load(content)
except Exception:
return None
def _extract_constraints(
self, config: dict, source_layer: str
) -> list:
"""設定ファイルから制約を抽出"""
constraints = []
for c in config.get('constraints', []):
constraints.append(InheritedConstraint(
text=c['text'],
level=c['level'],
source_layer=source_layer,
source_file=config.get('source_file', 'unknown')
))
return constraints
def build_context_text(self, ctx: HierarchicalContext) -> str:
"""LLMに渡すためのテキスト形式に変換"""
parts = []
if ctx.org_intent:
parts.append(f"## 組織レベルの意図・制約(全プロジェクトに適用)\n{ctx.org_intent}")
if ctx.project_intent:
parts.append(f"## プロジェクトレベルの意図・制約(組織ルールを継承)\n{ctx.project_intent}")
if ctx.subproject_intent:
parts.append(f"## サブプロジェクトレベルの意図・制約(上位ルールを継承)\n{ctx.subproject_intent}")
if ctx.effective_constraints:
constraint_text = "\n".join([
f"- [{c.level}] {c.text} (定義元: {c.source_layer})"
for c in ctx.effective_constraints
])
parts.append(f"## 有効な制約一覧(全層の累積)\n{constraint_text}")
return "\n\n".join(parts)
アトミックカウンターによるID採番
class AtomicIDGenerator:
"""アトミックカウンターによるID採番"""
def __init__(self, dynamodb_table):
self.table = dynamodb_table
def generate_id(
self,
org_id: str,
project_id: str,
subproject_id: str,
artifact_type: str
) -> str:
"""一意のIDを生成
形式: {org_prefix}-{project_prefix}-{subproject_prefix}-{TYPE}-{連番}
例: acme-ec-auth-REQ-001
"""
# プレフィックス構築
counter_key = f"{org_id}-{project_id}-{subproject_id}-{artifact_type}"
# アトミックインクリメント
response = self.table.update_item(
Key={
'PK': 'ATOMIC_COUNTER',
'SK': counter_key
},
UpdateExpression='SET #cnt = if_not_exists(#cnt, :zero) + :inc',
ExpressionAttributeNames={'#cnt': 'counter'},
ExpressionAttributeValues={':zero': 0, ':inc': 1},
ReturnValues='UPDATED_NEW'
)
counter = response['Attributes']['counter']
# 型プレフィックスの取得
type_prefix = {
'requirement': 'REQ',
'design': 'DES',
'implementation': 'IMP',
'test': 'TST'
}.get(artifact_type, 'DOC')
return f"{org_id}-{project_id}-{subproject_id}-{type_prefix}-{counter:03d}"
Step Functionsワークフロー
IDDには2つの主要なワークフローがあります。
- ドキュメント処理ワークフロー: ドキュメント登録 → 意図抽出 → ベクトル保存
- 意図乖離検証ワークフロー: カスケードチェック → 民主型投票 → 結果保存
まず、ドキュメント処理ワークフローを見た後、意図乖離検証ワークフローの詳細(民主型投票の実装を含む)は次のセクションで説明します。
ドキュメント処理ワークフロー定義
Step Functionsを使用して、ドキュメント登録時の処理フローを管理します。
{
"Comment": "IDD Document Processing Workflow",
"StartAt": "ParseDocument",
"States": {
"ParseDocument": {
"Type": "Task",
"Resource": "${ParseLambdaArn}",
"ResultPath": "$.parseResult",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "HandleError"
}
],
"Next": "ExtractIntent"
},
"ExtractIntent": {
"Type": "Task",
"Resource": "${ExtractIntentLambdaArn}",
"ResultPath": "$.extractResult",
"Retry": [
{
"ErrorEquals": ["LLMThrottlingException"],
"IntervalSeconds": 5,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "HandleError"
}
],
"Next": "CheckValidationRequired"
},
"CheckValidationRequired": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.extractResult.requiresValidation",
"BooleanEquals": true,
"Next": "ValidateIntent"
}
],
"Default": "GenerateEmbedding"
},
"ValidateIntent": {
"Type": "Task",
"Resource": "${ValidateLambdaArn}",
"ResultPath": "$.validationResult",
"Next": "GenerateEmbedding"
},
"GenerateEmbedding": {
"Type": "Task",
"Resource": "${EmbedLambdaArn}",
"ResultPath": "$.embeddingResult",
"Next": "StoreResult"
},
"StoreResult": {
"Type": "Task",
"Resource": "${StoreResultLambdaArn}",
"End": true
},
"HandleError": {
"Type": "Task",
"Resource": "${ErrorHandlerLambdaArn}",
"End": true
}
}
}
ワークフローの可視化
Step Functionsのコンソールでは、ワークフローの実行状況をリアルタイムで可視化できます。これは、デバッグや監視において非常に有用です。
ドキュメント処理ワークフロー:
意図乖離検証ワークフロー:
意図乖離検証ワークフロー
ドキュメント処理ワークフローが「登録」のフローだとすれば、意図乖離検証ワークフローは「検証」のフローです。ここでは、第8回で説明したカスケード処理と民主型投票をAWS上でどう実装するかを、具体的に見ていきます。
カスケード → 民主型投票の全体フロー
Step Functions定義: カスケード検証
{
"Comment": "IDD Intent Drift Validation - Cascade + Democratic Voting",
"StartAt": "BuildContext",
"States": {
"BuildContext": {
"Type": "Task",
"Resource": "${BuildContextLambdaArn}",
"ResultPath": "$.context",
"Next": "SearchRelatedIntents"
},
"SearchRelatedIntents": {
"Type": "Task",
"Resource": "${SearchIntentLambdaArn}",
"ResultPath": "$.relatedIntents",
"Comment": "OpenSearch Serverlessで関連意図を検索",
"Next": "Level1LightCheck"
},
"Level1LightCheck": {
"Type": "Task",
"Resource": "${CascadeLevel1LambdaArn}",
"ResultPath": "$.level1Result",
"Comment": "Haiku 4.5で軽量チェック",
"Retry": [
{
"ErrorEquals": ["LLMThrottlingException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Next": "CheckLevel1Confidence"
},
"CheckLevel1Confidence": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.level1Result.confidence_score",
"NumericGreaterThanOrEquals": 0.8,
"Next": "StoreResult"
}
],
"Default": "Level2StandardAnalysis"
},
"Level2StandardAnalysis": {
"Type": "Task",
"Resource": "${CascadeLevel2LambdaArn}",
"ResultPath": "$.level2Result",
"Comment": "Sonnet 4.5で標準分析",
"Next": "CheckLevel2Confidence"
},
"CheckLevel2Confidence": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.level2Result.confidence_score",
"NumericGreaterThanOrEquals": 0.8,
"Next": "StoreResult"
}
],
"Default": "Level3DemocraticVoting"
},
"Level3DemocraticVoting": {
"Type": "Map",
"ItemsPath": "$.votingConfig.llmConfigs",
"MaxConcurrency": 3,
"Iterator": {
"StartAt": "InvokeLLM",
"States": {
"InvokeLLM": {
"Type": "Task",
"Resource": "${VotingAnalysisLambdaArn}",
"TimeoutSeconds": 60,
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 5,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "MarkAbstention"
}
],
"End": true
},
"MarkAbstention": {
"Type": "Pass",
"Result": {
"status": "abstention",
"confidence_score": 0.0,
"reason": "LLM invocation failed"
},
"End": true
}
}
},
"ResultPath": "$.votingResults",
"Next": "AggregateVotes"
},
"AggregateVotes": {
"Type": "Task",
"Resource": "${AggregateVotesLambdaArn}",
"ResultPath": "$.finalResult",
"Next": "CheckConsensus"
},
"CheckConsensus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.finalResult.mode",
"StringEquals": "deferred",
"Next": "SendToDLQ"
},
{
"Variable": "$.finalResult.status",
"StringEquals": "needs_review",
"Next": "RequestHumanReview"
}
],
"Default": "StoreResult"
},
"RequestHumanReview": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "${HumanReviewQueueUrl}",
"MessageBody.$": "States.JsonToString($.finalResult)"
},
"Next": "StoreResult"
},
"SendToDLQ": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "${DeadLetterQueueUrl}",
"MessageBody.$": "States.JsonToString($)"
},
"End": true
},
"StoreResult": {
"Type": "Task",
"Resource": "${StoreValidationResultLambdaArn}",
"Next": "EmitMetrics"
},
"EmitMetrics": {
"Type": "Task",
"Resource": "${EmitMetricsLambdaArn}",
"End": true
}
}
}
民主型投票のLambda実装
ここが最も重要な部分です。Step FunctionsのMap Stateから呼び出される各Lambda関数が、異なるLLMプロバイダに対して隔離されたリクエストを送信します。
投票分析Lambda(プロバイダ抽象化)
import json
import boto3
import os
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime
class VotingAnalysisInput(BaseModel):
"""Map Stateから渡される各LLM用の入力"""
llm_provider: str = Field(description="bedrock / openai / google")
llm_model: str = Field(description="モデルID")
perspective: str = Field(description="分析観点の指示")
intent: dict
artifact: dict
context: dict
# 注意: 他のLLMの結果は一切含まない(独立性保証)
class VotingAnalysisOutput(BaseModel):
"""各LLMの分析結果"""
llm_provider: str
llm_model: str
status: str = Field(description="passed / warning / failed")
confidence_score: float = Field(ge=0.0, le=1.0)
rationale: str
issues: list = Field(default_factory=list)
analyzed_at: str
# プロバイダごとのクライアント
class BedrockProvider:
"""Bedrock経由のLLM呼び出し"""
def __init__(self):
self.client = boto3.client('bedrock-runtime')
def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
response = self.client.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": 0.1,
"system": "あなたは意図乖離検証のエキスパートです。",
"messages": [{"role": "user", "content": prompt}]
})
)
body = json.loads(response['body'].read())
return body['content'][0]['text']
class OpenAIProvider:
"""OpenAI API経由のLLM呼び出し"""
def __init__(self):
# Secrets ManagerからAPIキーを取得
secrets = boto3.client('secretsmanager')
secret = json.loads(
secrets.get_secret_value(
SecretId=os.environ['OPENAI_SECRET_ARN']
)['SecretString']
)
self.api_key = secret['api_key']
def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
import urllib.request
req = urllib.request.Request(
'https://api.openai.com/v1/chat/completions',
data=json.dumps({
"model": model_id,
"messages": [
{"role": "system", "content": "あなたは意図乖離検証のエキスパートです。"},
{"role": "user", "content": prompt}
],
"max_tokens": max_tokens,
"temperature": 0.1
}).encode(),
headers={
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
)
with urllib.request.urlopen(req, timeout=45) as resp:
body = json.loads(resp.read())
return body['choices'][0]['message']['content']
class GoogleProvider:
"""Google Gemini API経由のLLM呼び出し"""
def __init__(self):
secrets = boto3.client('secretsmanager')
secret = json.loads(
secrets.get_secret_value(
SecretId=os.environ['GOOGLE_SECRET_ARN']
)['SecretString']
)
self.api_key = secret['api_key']
def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
import urllib.request
url = f'https://generativelanguage.googleapis.com/v1/models/{model_id}:generateContent?key={self.api_key}'
req = urllib.request.Request(
url,
data=json.dumps({
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"maxOutputTokens": max_tokens,
"temperature": 0.1
}
}).encode(),
headers={'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req, timeout=45) as resp:
body = json.loads(resp.read())
return body['candidates'][0]['content']['parts'][0]['text']
# プロバイダファクトリ
PROVIDERS = {
'bedrock': BedrockProvider,
'openai': OpenAIProvider,
'google': GoogleProvider
}
def handler(event, context):
"""民主型投票の個別分析Lambda
Step Functions Map Stateから呼び出される。
各LLMプロバイダに隔離されたリクエストを送信し、
構造化された分析結果を返す。
"""
input_data = VotingAnalysisInput.model_validate(event)
# プロバイダの初期化
provider = PROVIDERS[input_data.llm_provider]()
# 分析プロンプトの構築(観点別の指示を含む)
prompt = f"""
{input_data.perspective}
## 検証対象
### 意図(Why/What/How/Not)
{json.dumps(input_data.intent, ensure_ascii=False, indent=2)}
### 対応する成果物
{json.dumps(input_data.artifact, ensure_ascii=False, indent=2)}
### プロジェクトコンテキスト
{json.dumps(input_data.context, ensure_ascii=False, indent=2)}
## 出力形式
以下のJSON形式で出力してください:
{{
"status": "passed" | "warning" | "failed",
"confidence_score": 0.0〜1.0,
"rationale": "判断の根拠(具体的に)",
"issues": [
{{
"severity": "critical" | "warning" | "info",
"description": "問題の説明",
"suggestion": "修正提案"
}}
]
}}
"""
# LLM呼び出し
response_text = provider.invoke(
model_id=input_data.llm_model,
prompt=prompt
)
# JSON抽出とバリデーション
import re
json_match = re.search(r'\{[\s\S]*\}', response_text)
if not json_match:
raise ValueError("LLM出力からJSONを抽出できませんでした")
result = json.loads(json_match.group())
return VotingAnalysisOutput(
llm_provider=input_data.llm_provider,
llm_model=input_data.llm_model,
status=result['status'],
confidence_score=result['confidence_score'],
rationale=result['rationale'],
issues=result.get('issues', []),
analyzed_at=datetime.utcnow().isoformat()
).model_dump()
投票集計Lambda
from collections import Counter
from typing import List
from pydantic import BaseModel
class AggregateVotesOutput(BaseModel):
"""投票集計の最終結果"""
status: str
confidence_score: float
mode: str # normal / degraded / deferred
consensus: str
majority_opinion: dict = None
dissenting_opinions: list = Field(default_factory=list)
abstentions: list = Field(default_factory=list)
all_results: list = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
def handler(event, context):
"""投票集計: Map Stateの出力を集約して最終判定"""
voting_results = event.get('votingResults', [])
min_required_votes = event.get('votingConfig', {}).get('min_votes', 2)
# 有効票と棄権を分離
valid_votes = []
abstentions = []
for result in voting_results:
if result.get('status') == 'abstention':
abstentions.append({
'llm_provider': result.get('llm_provider', 'unknown'),
'reason': result.get('reason', 'unknown')
})
else:
valid_votes.append(result)
# 全LLM障害
if len(valid_votes) == 0:
return AggregateVotesOutput(
status="deferred",
confidence_score=0.0,
mode="deferred",
consensus="0/3 - all failed",
abstentions=abstentions,
metadata={"reason": "全LLMが応答不能"}
).model_dump()
# 縮退モード: 1票のみ
if len(valid_votes) < min_required_votes:
vote = valid_votes[0]
return AggregateVotesOutput(
status=vote['status'],
confidence_score=max(0.0, vote['confidence_score'] - 0.3),
mode="degraded",
consensus=f"1/{len(voting_results)} - degraded mode",
all_results=valid_votes,
abstentions=abstentions,
metadata={"warning": "縮退モード: 信頼度に0.3のペナルティ適用"}
).model_dump()
# 通常モード: 多数決
status_counts = Counter(v['status'] for v in valid_votes)
# 三者三様チェック
if len(status_counts) == len(valid_votes) and len(valid_votes) >= 3:
return AggregateVotesOutput(
status="needs_review",
confidence_score=0.0,
mode="normal",
consensus=f"三者三様 - 人間レビュー必要",
all_results=valid_votes,
metadata={"reason": "全LLMの判断が異なる"}
).model_dump()
# 多数決
majority_status = status_counts.most_common(1)[0][0]
majority_votes = [v for v in valid_votes if v['status'] == majority_status]
dissenting_votes = [v for v in valid_votes if v['status'] != majority_status]
# 信頼度: 多数派の平均
avg_confidence = sum(v['confidence_score'] for v in majority_votes) / len(majority_votes)
return AggregateVotesOutput(
status=majority_status,
confidence_score=round(avg_confidence, 3),
mode="normal",
consensus=f"{len(majority_votes)}/{len(valid_votes)} agreed on {majority_status}",
majority_opinion={
"status": majority_status,
"llms": [v['llm_provider'] for v in majority_votes],
"rationale": majority_votes[0]['rationale']
},
dissenting_opinions=[
{
"llm_provider": v['llm_provider'],
"status": v['status'],
"rationale": v['rationale'],
"confidence": v['confidence_score']
}
for v in dissenting_votes
],
abstentions=abstentions,
all_results=valid_votes,
metadata={
"voting_timestamp": datetime.utcnow().isoformat(),
"total_llms": len(voting_results),
"valid_votes": len(valid_votes),
"abstention_count": len(abstentions)
}
).model_dump()
議論フェーズの実装(オプション)
MUST制約の検証など、高精度が求められる場合は「議論フェーズ」を組み込みます。これはStep Functionsのループで実現します。
def build_debate_round2_prompt(
initial_analysis: dict,
other_analyses: list,
perspective: str
) -> str:
"""議論フェーズRound 2のプロンプト構築
Round 1の結果を参照し、自分の分析を再評価する。
独立性を「意図的に緩和」する唯一のフェーズ。
"""
other_text = "\n\n".join([
f"### エキスパート {i+1} ({a['llm_provider']})\n"
f"判定: {a['status']} (信頼度: {a['confidence_score']})\n"
f"根拠: {a['rationale']}"
for i, a in enumerate(other_analyses)
])
return f"""
{perspective}
## あなたの初期分析(Round 1)
判定: {initial_analysis['status']}
信頼度: {initial_analysis['confidence_score']}
根拠: {initial_analysis['rationale']}
## 他のエキスパートの分析
{other_text}
## 指示
他のエキスパートの分析を参考に、あなたの初期分析を再評価してください。
1. 他のエキスパートが指摘した点で、あなたが見落としていたものはありますか?
2. 他のエキスパートの分析に同意できない点はありますか?その理由は?
3. 再評価の結果、あなたの判断は変わりますか?
最終的な判断を、根拠とともにJSON形式で出力してください。
"""
主要フローのシーケンス図
ここまでの内容を、時間軸に沿ったシーケンス図で整理します。
フロー1: ドキュメント登録 → 意図抽出 → ベクトル保存
フロー2: 意図乖離検証(カスケード → 民主型投票)
フロー3: 議論フェーズ付き民主型投票
Bedrockの活用
モデルの使い分け
IDDでは、タスクに応じてBedrockの異なるモデルを使い分けます。
| タスク | モデル | 理由 |
|---|---|---|
| 意図抽出 | Claude Sonnet 4.5 | バランスの良い性能とコスト |
| 深い分析 | Claude Opus 4.6 | 複雑な判断が必要な場合 |
| 軽量チェック | Claude Haiku 4.5 | 高速・低コスト |
| ベクトル埋め込み | Titan Embeddings v2 | 1536次元、多言語対応 |
カスケード処理の実装
Why: 全検証を高精度モデル(Opus)で実行すると、1件あたり$0.58、月$580のLLMコストが発生する。カスケード処理により、大部分をHaiku($0.01/件)やSonnet($0.06/件)で処理し、本当に判断が難しいケースのみOpusにエスカレーションすることで約90%のコスト削減を実現する。
Not: 「最初から全件Opusで処理する」アプローチを採用しなかった理由は、コストだけでなくレイテンシもある。Opusの応答時間はHaikuの5-10倍であり、開発者へのフィードバック速度が著しく低下する。また、「全件を軽量モデルだけで処理する」アプローチも採用しなかった——MUST制約の検証など高精度が求められるケースでは、軽量モデルの判断では信頼性が不十分なためである。
class CascadeLLMProcessor:
"""カスケード処理によるコスト最適化"""
MODELS = {
'light': 'anthropic.claude-haiku-4-5-20251001-v1:0',
'standard': 'anthropic.claude-sonnet-4-5-20250929-v1:0',
'deep': 'anthropic.claude-opus-4-6-v1'
}
def __init__(self, bedrock_client, dynamodb_table):
self.processors = {
level: LLMProcessor(
bedrock_client,
dynamodb_table,
model_id=model_id
)
for level, model_id in self.MODELS.items()
}
def process_with_cascade(
self,
content: str,
system_prompt: str,
output_schema: Type[T],
confidence_threshold: float = 0.8
) -> T:
"""カスケード処理で最適なモデルを選択"""
# Level 1: 軽量チェック
light_result = self.processors['light'].process(
content, system_prompt, output_schema
)
if light_result.confidence_score >= confidence_threshold:
return light_result
# Level 2: 標準分析
standard_result = self.processors['standard'].process(
content, system_prompt, output_schema
)
if standard_result.confidence_score >= confidence_threshold:
return standard_result
# Level 3: 深い分析
return self.processors['deep'].process(
content, system_prompt, output_schema
)
ベクトル埋め込みの生成とOpenSearch Serverlessへの保存
class EmbeddingGenerator:
"""Amazon Titan Embeddings v2を使用したベクトル生成"""
MODEL_ID = "amazon.titan-embed-text-v2:0"
def __init__(self, bedrock_client):
self.bedrock = bedrock_client
def generate_embedding(self, text: str) -> List[float]:
"""テキストをベクトルに変換"""
# テキストの前処理(最大8000トークン)
processed_text = text[:32000] # 大まかな文字数制限
response = self.bedrock.invoke_model(
modelId=self.MODEL_ID,
body=json.dumps({
"inputText": processed_text,
"dimensions": 1536,
"normalize": True
})
)
response_body = json.loads(response['body'].read())
return response_body['embedding']
def generate_batch_embeddings(
self,
texts: List[str]
) -> List[List[float]]:
"""バッチでベクトル生成"""
return [self.generate_embedding(text) for text in texts]
class IntentIndexer:
"""意図をOpenSearch Serverlessに登録"""
def __init__(self, opensearch_client, embedding_generator):
self.client = opensearch_client
self.embedder = embedding_generator
self.index_name = "intent-embeddings"
def index_intent(self, parsed_result: LLMParserOutput, org_id: str,
project_id: str, subproject_id: str) -> dict:
"""抽出された意図をベクトル化してインデックスに登録"""
# Why要素を中心にベクトル化(最も重要な要素)
embed_text = f"{parsed_result.intent.why}\n{parsed_result.intent.what}"
if parsed_result.intent.how:
embed_text += f"\n{parsed_result.intent.how}"
embedding = self.embedder.generate_embedding(embed_text)
document = {
"id": parsed_result.artifact_id,
"org_id": org_id,
"project_id": project_id,
"subproject_id": subproject_id,
"artifact_type": parsed_result.artifact_type,
"content": parsed_result.summary,
"intent_why": parsed_result.intent.why,
"intent_what": parsed_result.intent.what,
"intent_how": parsed_result.intent.how,
"intent_not": parsed_result.intent.not_chosen,
"embedding": embedding,
"constraints": [
{"text": c.text, "level": c.level}
for c in parsed_result.constraints
],
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
return self.client.index(
index=self.index_name,
id=parsed_result.artifact_id,
body=document
)
設定管理(AppConfig)
動的設定の管理
Why: IDDの運用では、「信頼度閾値を0.8から0.7に一時的に下げたい」「民主型投票を一時的に無効にしたい」といった設定変更が、デプロイなしで必要になる。AppConfigにより、Lambda関数の再デプロイなしで設定をリアルタイム変更できる。
Not: 環境変数やハードコードを採用しなかった理由は、変更のたびにLambdaの再デプロイが必要になり、設定変更の反映に数分のダウンタイムが発生するため。また、S3上のJSONファイルを定期ポーリングする方式も検討したが、AppConfigの方が段階的ロールアウト(Feature Flag的な運用)に対応しており、設定変更のリスクを低減できる。
AppConfigを使用して、実行時に設定を変更できるようにしています。
import boto3
from functools import lru_cache
import json
class AppConfigClient:
"""AppConfigから設定を取得"""
def __init__(
self,
application: str,
environment: str,
configuration: str
):
self.client = boto3.client('appconfigdata')
self.application = application
self.environment = environment
self.configuration = configuration
self._token = None
def start_session(self):
"""設定セッションを開始"""
response = self.client.start_configuration_session(
ApplicationIdentifier=self.application,
EnvironmentIdentifier=self.environment,
ConfigurationProfileIdentifier=self.configuration
)
self._token = response['InitialConfigurationToken']
def get_config(self) -> dict:
"""最新の設定を取得"""
if not self._token:
self.start_session()
response = self.client.get_latest_configuration(
ConfigurationToken=self._token
)
self._token = response['NextPollConfigurationToken']
if response['Configuration']:
return json.loads(response['Configuration'].read())
return {}
# 設定例
config = {
"llm": {
"default_model": "anthropic.claude-sonnet-4-5-20250929-v1:0",
"max_tokens": 4096,
"temperature": 0.1
},
"validation": {
"confidence_threshold": 0.8,
"require_unanimous": false,
"min_votes": 2
},
"caching": {
"enabled": true,
"ttl_hours": 24
},
"feature_flags": {
"multi_agent_voting": true,
"cascade_processing": true
}
}
コスト最適化
Why: なぜコスト最適化を設計段階で考えるのか
IDDは「便利だが高コスト」では普及しません。開発チームが月額数百ドル程度で運用できることが、採用の前提条件です。特にLLM利用料は従量課金であり、設計判断(カスケード処理、キャッシング)がコストに直結します。
Not: なぜ「コスト度外視で最高精度」を目指さないのか
全件をOpus + 5 LLMで検証すれば精度は最大化できるが、月額$2,000を超え、中小チームには導入不可能になる。IDDの価値は「完璧な検証」ではなく「実用的なコストで十分な精度の検証を継続的に回すこと」にある。
想定コスト
以下は、中規模プロジェクト(月1,000件の検証)での想定コストです。カスケード処理により、大部分はLevel 1/2で処理が完了し、民主型投票(Level 3)に到達するのは全体の約5〜10%を想定しています。
基盤サービス
| サービス | 使用量 | 月額概算 |
|---|---|---|
| Lambda | 100万リクエスト、10万GB秒 | $2 |
| Step Functions | 15万状態遷移 | $4 |
| DynamoDB | 10GB、100万RCU/WCU | $10 |
| OpenSearch Serverless | 4 OCU(最小構成) | $260 |
| S3 | 10GB | $0.3 |
| SQS | 10万メッセージ | $0.1 |
| 小計(基盤) | 約$276 |
LLM利用料(カスケード処理込み)
| 処理 | 件数(想定) | 1件あたり | 月額概算 |
|---|---|---|---|
| Level 1(Haiku 4.5) | 1,000件 | $0.01 | $10 |
| Level 2(Sonnet 4.5) | 250件(25%エスカレーション) | $0.06 | $15 |
| Level 3 民主型投票 | 50件(5%エスカレーション) | $0.58 | $29 |
| Titan Embeddings | 1,000件 | $0.001 | $1 |
| 小計(LLM) | 約$55 |
| 合計: 約$331/月 |
|---|
コスト削減のポイント
1. カスケード処理(最大の削減効果)
全件民主型投票なら月$580のLLMコストが、カスケード処理で$55に。約90%削減。
2. キャッシング
同じ入力に対する再処理を避ける。DynamoDBのTTLで自動削除。特に意図抽出はキャッシュヒット率が高い。
3. ハッシュベースの差分検出
変更のないファイルは再処理しない。S3のETagとDynamoDBのハッシュを比較。
4. OpenSearch Serverlessの段階的導入
PoC → Aurora pgvector、本番成長期 → OpenSearch Serverless。データ量に応じた最適選択。
5. 予算認識型スケジューリング
第8回で説明したCostAwareVotingSchedulerにより、日次予算に応じて検証モードを動的に切り替え。
まとめ: この記事自体がIDDの実践である
今回は、AWS上でのサーバーレス実装について詳しく説明しました。しかし、技術選定の表面的な説明にとどまらず、IDD自身が掲げる「Why/Notを明示する」原則を、この記事自体で実践しました。
振り返ると、この記事で明示したNotは以下の通りです:
| 設計判断 | 選んだもの | 選ばなかったもの(Not) |
|---|---|---|
| コンピュート | Lambda(サーバーレス) | ECS/EKS/Fargate/EC2 |
| プロジェクト階層 | 3層(Org→Project→SubProject) | フラット構造、2層、4層以上 |
| 成果物ストレージ | S3(プレフィックス階層) | Git、Confluence、DynamoDB直接保存 |
| メタデータDB | DynamoDB(シングルテーブル) | Aurora、マルチテーブル |
| ベクトル検索 | OpenSearch Serverless | Aurora pgvector |
| オーケストレーション | Step Functions | SWF、自前実装 |
| LLMプロバイダ | 3社併用(Bedrock + 外部API) | 単一プロバイダ、5社以上 |
| コスト戦略 | カスケード処理 | 全件高精度モデル、全件軽量モデル |
設計ドキュメントにNotを書くことで、将来の開発者が「なぜこの構成なのか」だけでなく「なぜ他の構成でないのか」を理解でき、状況が変わったときに適切に再判断できます。 これが、IDDが「意図の乖離」を防ぐための、最も基本的な実践です。
次回予告
次回は、少し哲学的な話に戻ります。「意図を理解する」とはどういうことか。現象学、解釈学、言語ゲームといった哲学的概念を借りながら、意図乖離という問題の深層を探っていきます。
📘 IDD連載ナビゲーション
◀️ 前回: 第8回: AIマルチエージェントと集合知
▶️ 次回: 第10回: 哲学的な問い
Discussion