☁️

【意図乖離検出;連載 第9回】AWS上でのサーバーレス実装 - 設計から実践へ

に公開

AWS上でのサーバーレス実装 - 設計から実践へ

前回のふりかえり

前回は、AIマルチエージェント技術と民主型投票アーキテクチャについて説明しました。複数のLLMが協調して意図乖離を検証する仕組みです。

今回は、より具体的な実装の話をします。IDDをAWS上でどのように実装するのか。サーバーレスアーキテクチャの採用理由、使用するサービス、設計パターン。Pythonのコード例も交えながら、詳しく見ていきます。

ただし、この記事は単なる技術選定の解説ではありません。IDD自身が掲げる「Why(なぜそうしたか)」と「Not(なぜ他を選ばなかったか)」を明示する原則を、IDD自身の実装ドキュメントにも適用します。設計判断の「Not」を書くことで、将来この構成を変更する際の判断基準が明確になります。これがIDDの思想を体現する設計ドキュメントの書き方です。

なぜサーバーレスか——Why/Notで語る設計判断

では早速、最初の大きな設計判断から。各設計判断について、選定理由だけでなく、不採用の理由とその判断が覆る条件(reconsider_when) も記述していきます。

IDDワークロードの特性

IDDのワークロードは、以下の特性を持ちます。

イベント駆動
ドキュメントの更新、PRの作成、CI/CDパイプラインの実行など、特定のイベントに応じて処理が発生します。

バースト的
コミットが集中する時間帯(午前中や夕方)と、ほとんど処理がない時間帯(夜間や週末)があります。

CPU集約的(LLM呼び出し待ち)
LLM APIの呼び出し待ちが処理時間の大部分を占めます。実際の計算処理は軽量です。

ステートレス
各検証リクエストは独立しており、前後の処理に依存しません。

これらの特性は、サーバーレスアーキテクチャと非常に相性が良いです。

サーバーレスの利点

利点 IDDへの適用
自動スケーリング バースト的なワークロードに対応
従量課金 処理がない時間帯はコストゼロ
運用負荷軽減 サーバー管理不要
高可用性 AWSがインフラの可用性を保証
イベント統合 S3、EventBridge等とネイティブ連携

Not: なぜコンテナ(ECS/EKS/Fargate)でないのか

サーバーレスの利点だけを語るのではIDDらしくありません。なぜ他の選択肢を採用しなかったのかを明記します。

候補 不採用の理由
ECS on Fargate IDDの各処理は独立したイベント駆動であり、常時起動のコンテナは不要。LLM API呼び出し待ちが処理時間の大部分を占めるため、コンテナを「待機」させるのはコスト効率が悪い
EKS(Kubernetes) IDDはオーケストレーションをStep Functionsで実現しており、Kubernetesの複雑なクラスタ管理は運用負荷に見合わない。IDDの開発チーム(想定: 2-5名)にKubernetes運用の専門知識を要求するのは現実的でない
EC2(常時起動) バースト的ワークロードに対し、ピーク時に合わせたインスタンスを常時確保するのはコスト面で論外。オートスケーリングも可能だが、Lambda + Step Functionsの方がはるかにシンプル
AWS App Runner Webリクエスト/レスポンス型のワークロードに最適化されており、IDDのような非同期イベント駆動処理には不向き

サーバーレスの課題と対策

サーバーレスには課題もあります。IDDでは、以下のように対策しています。

コールドスタート

  • 課題: Lambda関数の初回起動に数秒かかる
  • 対策: Provisioned Concurrencyを重要な関数に適用、軽量なランタイム(Python)の使用

15分の実行時間制限

  • 課題: Lambda関数は最大15分で終了する
  • 対策: Step Functionsで長時間ワークフローを管理、処理を細かく分割

ステートレスの制約

  • 課題: Lambda関数間で状態を共有できない
  • 対策: DynamoDB、S3、Step Functionsの入出力で状態を管理

デバッグの難しさ

  • 課題: 分散システムのデバッグは困難
  • 対策: X-Rayによるトレーシング、CloudWatch Logsの構造化ログ

アーキテクチャ概要

全体像

IDDのAWSアーキテクチャは、以下の主要コンポーネントで構成されます。

主要サービスの役割

サービス 役割 選定理由
S3 成果物ストレージ 大容量、低コスト、イベント連携
Lambda 処理実行 サーバーレス、自動スケール
Step Functions ワークフロー管理 状態管理、並列実行(Map State)、可視化
Bedrock LLM API(Claude, Titan) マネージド、複数モデル対応、VPC内アクセス
外部LLM API GPT-5.2, Gemini 3 Pro 民主型投票の多様性確保(プロバイダ分散)
DynamoDB メタデータ + キャッシュ + 投票結果 高速、スケーラブル、サーバーレス
OpenSearch Serverless ベクトル検索 自動スケール、10億規模対応、GPU高速化
AppConfig 設定管理 動的設定変更、Feature Flag
EventBridge イベント管理 スケジューリング、イベントルーティング
SQS / DLQ 非同期処理 + 障害退避 民主型投票の障害時キュー退避
X-Ray + CloudWatch 可観測性 分散トレーシング、投票メトリクス監視

主要サービス選定の「Not」

IDDらしく、各サービスの「なぜ他を選ばなかったか」を明記します。

サービス 候補だったもの 不採用の理由
Step Functions SWF(Simple Workflow)、自前のオーケストレーター SWFはレガシー。自前実装はリトライ、エラー処理、可視化を一から作る必要があり、IDDのコアバリューでない部分に工数を取られる
DynamoDB Aurora Serverless、ElastiCache RDBはJOINが不要なIDDのアクセスパターンにオーバースペック。ElastiCacheは永続性が保証されず、投票結果やメタデータの保存先として不適切
Bedrock 外部API直接呼び出しのみ VPC内からの安全なアクセス、IAMによる権限管理、複数モデルの統一的なAPIが必要。ただし、民主型投票ではプロバイダ多様性が必要なため、外部APIも併用する(これが「Bedrockだけにしなかった理由」でもある)
AppConfig Parameter Store、S3上のJSON Parameter Storeは設定変更の段階的デプロイ(Feature Flag的な使い方)に弱い。S3は動的な設定変更のポーリングが非効率

IDDの各概念とAWSサービスの対応

第7回・第8回で説明したIDDの各概念が、AWS上でどのサービスに対応するかを整理します。

意図の4要素モデル(Intent Quadrant)

4要素 AWS上での実現 詳細
What(何を) S3 + Bedrock MarkdownドキュメントからBedrock(Claude)が自動抽出。結果はS3に構造化保存
Why(なぜ) S3 + Bedrock + OpenSearch Serverless 最も重要な要素。抽出後にベクトル化しOpenSearch Serverlessで類似意図を検索可能に
How(どう) S3 + Bedrock 設計判断・トレードオフをBedrockで抽出。ADRとの紐付けはDynamoDBで管理
Not(なぜ他を選ばなかったか) S3 + Bedrock 却下理由をBedrockが抽出。reconsider_when条件はEventBridgeで定期チェック可能

3層トレーサビリティモデル

役割 AWSサービス 実装方法
L1: 構造トレース 成果物間の対応関係 DynamoDB シングルテーブル設計でID間リンクを管理。GSI(Global Secondary Index)で逆引き
L2: 意図トレース リンクの理由・制約 DynamoDB + S3 DynamoDBにリンクメタデータ、S3に詳細な意図記述を保存
L3: 意図検証 意図の保持を検証 Step Functions + Bedrock + 外部LLM カスケード処理 → 民主型投票ワークフローで自動検証

3層プロジェクト階層モデル

AWSサービス 実装方法
L0: Organization S3(プレフィックス階層) + DynamoDB(PK: ORG#xxx 組織ポリシーはS3の_intent/配下、メタデータはDynamoDB
L1: Project S3(サブプレフィックス) + DynamoDB(SK: PROJECT#xxx 継承ルールはLambdaで制約チェック時に上位を自動読み込み
L2: SubProject S3(サブプレフィックス) + DynamoDB(SK: SUBPROJECT#xxx 制約の厳格化のみ許可するバリデーションをLambdaで実装

継承ルールの例:

L0 (Organization): 「全APIはOAuth 2.0で認証すること」(MUST)
  ↓ 継承(緩和不可)
L1 (Project): 「OAuth 2.0 + PKCE拡張を使用すること」(MUST、厳格化)
  ↓ 継承(緩和不可)
L2 (SubProject): 「トークン有効期限は15分以内」(MUST、さらに厳格化)

民主型投票アーキテクチャ

概念 AWSサービス 実装方法
並列分析(独立性保証) Step Functions Map State MaxConcurrency指定で3つのLLMに同時リクエスト。各Lambdaは隔離された入力のみ受け取る
投票集計 Lambda(Aggregator) Map Stateの出力を集約し、多数決・信頼度計算・不一致分析を実行
カスケード処理 Step Functions Choice State Level 1(軽量)→ Level 2(標準)→ Level 3(民主型投票)の段階的エスカレーション
議論フェーズ Step Functions ループ Round 1(初期分析)→ Round 2(相互批評)→ Round 3(最終判断)をイテレーション
Bedrock呼び出し Bedrock InvokeModel Claude Sonnet/Opus をVPC内からダイレクト呼び出し
外部LLM呼び出し Lambda + Secrets Manager GPT-5.2, Gemini 3 ProのAPIキーをSecrets Managerで管理、Lambdaから呼び出し
障害時フォールバック SQS Dead Letter Queue 全LLM障害時にDLQに退避。CloudWatch Alarmで通知
少数意見の記録 DynamoDB VALIDATION#プレフィックスで検証結果と少数意見を永続化
信頼度キャリブレーション DynamoDB + Lambda 過去の正解率をDynamoDBに蓄積、Lambda関数でPlatt Scaling補正を適用
投票メトリクス CloudWatch Custom Metrics 一致率、少数意見率、棄権率をカスタムメトリクスとして送信

データモデル

S3: 成果物ストレージ

成果物は、3層プロジェクト階層に従ってS3に保存されます。このディレクトリ構造自体が、組織のルール/ガードレールの継承構造を表現しています。

s3://idd-artifacts-prod/
├── acme-corp/                          # L0: Organization
│   ├── _intent/
│   │   ├── security-policy.md          # 組織全体のセキュリティ方針
│   │   └── coding-standards.md         # 組織全体のコーディング規約
│   ├── _config/
│   │   └── organization.yaml           # 組織レベルの設定・制約定義
│   │
│   ├── ec-platform/                    # L1: Project
│   │   ├── _intent/
│   │   │   └── api-guidelines.md       # プロジェクト固有のAPI方針(組織方針を継承+厳格化)
│   │   ├── _config/
│   │   │   └── project.yaml            # プロジェクトレベルの設定(組織設定を継承)
│   │   │
│   │   └── auth-service/               # L2: SubProject
│   │       ├── _intent/
│   │       │   └── auth-policy.md      # サービス固有の認証方針(上位を継承+厳格化)
│   │       ├── _config/
│   │       │   └── subproject.yaml     # サービスレベルの設定(上位設定を継承)
│   │       ├── requirements/
│   │       │   └── REQ-001.md
│   │       └── design/
│   │           └── DES-001.md

継承の仕組み:

各層の_intent/ディレクトリには、その層で定義されたルール・ガードレール・意図が格納されます。検証時には、L0 → L1 → L2の順に上位から読み込まれ、すべての制約が累積的に適用されます。

DynamoDB: シングルテーブル設計

DynamoDBは、シングルテーブル設計を採用しています。シングルテーブル設計とは、RDBのように用途別にテーブルを分けるのではなく、1つのテーブルにPK/SKの命名規則を工夫して複数のエンティティを格納する手法です。Rick Houlihan氏が提唱するDynamoDBのベストプラクティスであり、JOINが使えないDynamoDBで関連データを効率的に取得するために用います。

テーブル構造

PK SK 属性
ORG#acme META 組織メタデータ
ORG#acme PROJECT#ec-platform プロジェクトメタデータ
ORG#acme#PRJ#ec-platform SUBPROJECT#auth サブプロジェクトメタデータ
ORG#acme#PRJ#ec-platform#SUB#auth ARTIFACT#REQ-001 成果物メタデータ
ORG#acme#PRJ#ec-platform#SUB#auth VALIDATION#2024-01-15T10:30:00Z 検証結果
ATOMIC_COUNTER acme-ec-auth-REQ ID採番用カウンター

アクセスパターン

パターン PK SK 操作
組織一覧取得 ORG#* META Scan with filter
プロジェクト一覧 ORG#acme begins_with(PROJECT#) Query
サブプロジェクト一覧 ORG#acme#PRJ#ec-platform begins_with(SUBPROJECT#) Query
成果物一覧 ORG#acme#PRJ#ec-platform#SUB#auth begins_with(ARTIFACT#) Query
検証履歴取得 ORG#acme#PRJ#ec-platform#SUB#auth begins_with(VALIDATION#) Query

OpenSearch Serverless: ベクトル検索

意図のセマンティック検索には、OpenSearch Serverless(Vector Search Collection)を採用します。

Why: IDDの検証では、新しい意図が「過去に定義された類似の意図と矛盾していないか」をチェックする必要がある。これは単なるキーワード検索では実現できず、意味的な類似度を高速に計算するベクトル検索が不可欠である。

なぜAurora pgvectorではなくOpenSearch Serverlessか

当初はAurora Serverless v2 + pgvectorを検討しましたが、IDDのワークロード特性とデータ量の増大を考慮し、OpenSearch Serverlessを選定しました。

比較項目 Aurora pgvector OpenSearch Serverless IDDでの評価
スケーリング 垂直 + 水平(Read Replica) 自動水平スケール ◎ データ増大に追従
最大ベクトル規模 数百万件(HNSWで実用的) 10億件超(GPU高速化対応) ◎ 組織横断で安心
インデックス再構築 IVFFlatは手動再構築が必要 自動最適化(2025年追加) ◎ 運用負荷ゼロ
クエリ性能 中程度(pgvector 0.8.0で改善) ミリ秒レベル(k-NN/ANN) ◎ リアルタイム検索
フィルタ付き検索 pgvector 0.8.0で大幅改善 ネイティブサポート ◯ どちらも可
サーバーレス Aurora Serverless v2(最小ACU課金あり) 完全サーバーレス(OCU単位) ◯ どちらもサーバーレス
コスト(最小) 〜$50/月(最小ACU) 〜$260/月(最小4 OCU) △ OpenSearchのほうが高い
Bedrock統合 Bedrock Knowledge Bases対応 Bedrock Knowledge Bases対応 ◯ どちらも対応
全文検索 PostgreSQL FTS Luceneベースの高度な全文検索 ◎ ハイブリッド検索が強力

選定理由のまとめ:

  1. スケーラビリティ: IDDは組織横断で意図を蓄積するため、データ量は時間とともに増大する。pgvectorは数百万件を超えるとインデックス再構築や性能劣化が懸念されるが、OpenSearch Serverlessは10億件規模まで自動スケールする
  2. ハイブリッド検索: 意図の検索では「ベクトル類似度 + テキストフィルタ + メタデータフィルタ」の組み合わせが頻繁に発生する。OpenSearchはこのハイブリッド検索がネイティブに強い
  3. 運用負荷: インデックスの自動最適化(2025年追加)により、チューニング不要で最適な検索性能を維持できる
  4. GPU高速化: 大量のベクトル登録時、GPUアクセラレーションで従来比10倍速(2025年追加)

インデックス設計

{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 512,
      "number_of_shards": 2,
      "number_of_replicas": 0
    }
  },
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "org_id": { "type": "keyword" },
      "project_id": { "type": "keyword" },
      "subproject_id": { "type": "keyword" },
      "artifact_type": { "type": "keyword" },
      "content": {
        "type": "text",
        "analyzer": "kuromoji"
      },
      "intent_why": { "type": "text", "analyzer": "kuromoji" },
      "intent_what": { "type": "text", "analyzer": "kuromoji" },
      "intent_how": { "type": "text", "analyzer": "kuromoji" },
      "intent_not": { "type": "text", "analyzer": "kuromoji" },
      "embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "faiss",
          "parameters": {
            "ef_construction": 256,
            "m": 16
          }
        }
      },
      "constraints": {
        "type": "nested",
        "properties": {
          "text": { "type": "text" },
          "level": { "type": "keyword" }
        }
      },
      "created_at": { "type": "date" },
      "updated_at": { "type": "date" }
    }
  }
}

ポイント:

  • kuromojiアナライザーで日本語テキストの全文検索に対応
  • HNSWアルゴリズム + Faissエンジンで高速なANN検索
  • nested型でMUST/SHOULD制約を構造化してフィルタ可能

ハイブリッド検索クエリ

IDDでは、ベクトル類似度 + テキスト + メタデータフィルタを組み合わせたハイブリッド検索が重要です。

class IntentSearchClient:
    """OpenSearch Serverlessを使った意図検索"""
    
    def __init__(self, opensearch_client, embedding_generator):
        self.client = opensearch_client
        self.embedder = embedding_generator
        self.index_name = "intent-embeddings"
    
    def hybrid_search(
        self,
        query_text: str,
        org_id: str,
        project_id: str = None,
        min_score: float = 0.7,
        limit: int = 10
    ) -> list:
        """ベクトル類似度 + テキスト + メタデータのハイブリッド検索"""
        
        # クエリテキストをベクトル化
        query_embedding = self.embedder.generate_embedding(query_text)
        
        # ハイブリッドクエリの構築
        query = {
            "size": limit,
            "query": {
                "bool": {
                    "must": [
                        # ベクトル類似度検索(k-NN)
                        {
                            "knn": {
                                "embedding": {
                                    "vector": query_embedding,
                                    "k": limit * 2
                                }
                            }
                        }
                    ],
                    "filter": [
                        # メタデータフィルタ: 組織ID
                        {"term": {"org_id": org_id}}
                    ],
                    "should": [
                        # テキスト類似度(ブースト)
                        {
                            "multi_match": {
                                "query": query_text,
                                "fields": [
                                    "intent_why^3",
                                    "intent_what^2",
                                    "content"
                                ],
                                "type": "best_fields"
                            }
                        }
                    ]
                }
            },
            "min_score": min_score
        }
        
        # プロジェクトIDが指定されている場合
        if project_id:
            query["query"]["bool"]["filter"].append(
                {"term": {"project_id": project_id}}
            )
        
        response = self.client.search(
            index=self.index_name,
            body=query
        )
        
        return [
            {
                "id": hit["_source"]["id"],
                "content": hit["_source"]["content"],
                "intent_why": hit["_source"].get("intent_why"),
                "score": hit["_score"],
                "artifact_type": hit["_source"]["artifact_type"]
            }
            for hit in response["hits"]["hits"]
        ]
    
    def find_contradicting_intents(
        self,
        intent_text: str,
        org_id: str,
        project_id: str
    ) -> list:
        """矛盾する可能性のある意図を検索
        
        意図乖離検証で使用: 新しい意図が既存の意図と矛盾していないかチェック
        """
        # 高い類似度だが異なるartifact_typeの意図を探す
        similar = self.hybrid_search(
            query_text=intent_text,
            org_id=org_id,
            project_id=project_id,
            min_score=0.6,
            limit=20
        )
        
        # 類似度が中程度(0.6-0.85)のものは矛盾の可能性が高い
        # 高すぎる(>0.85)場合は同じ意図の別表現の可能性
        return [
            item for item in similar
            if 0.6 <= item["score"] <= 0.85
        ]

Lambda関数の設計

関数の分割方針

Lambda関数は、単一責任の原則に従って分割しています。ドキュメント処理と意図乖離検証、それぞれのワークフローに対応する関数群です。

ドキュメント処理ワークフロー

関数名 責任 トリガー
idd-trigger S3イベント受信、ワークフロー起動 S3 Event / EventBridge
idd-parse ドキュメント解析、構造抽出 Step Functions
idd-extract-intent LLMによる意図抽出(Why/What/How/Not) Step Functions
idd-embed ベクトル埋め込み生成 + OpenSearch登録 Step Functions
idd-store-result DynamoDBへのメタデータ保存 Step Functions

意図乖離検証ワークフロー

関数名 責任 トリガー
idd-build-context プロジェクトコンテキスト構築 + 関連意図検索 Step Functions
idd-cascade-level1 Level 1 軽量チェック(Haiku) Step Functions
idd-cascade-level2 Level 2 標準分析(Sonnet) Step Functions
idd-voting-analysis 民主型投票の個別分析(プロバイダ抽象化) Step Functions Map State
idd-aggregate-votes 投票集計・多数決・少数意見記録 Step Functions
idd-emit-metrics CloudWatch Custom Metricsへの投票メトリクス送信 Step Functions

共通 / API

関数名 責任 トリガー
idd-search 類似意図のハイブリッド検索 API Gateway
idd-error-handler エラーハンドリング + DLQ送信 Step Functions Catch

共通パターン

各Lambda関数で使用する共通パターンを、Pythonコードで示します。

Pydanticによる入出力スキーマ定義

Why: LLMの出力は本質的に非構造化テキストであり、そのまま後続処理に渡すとパースエラーや型不一致が頻発する。Pydanticによるスキーマ定義で、LLM出力を確実に構造化データに変換し、バリデーションを自動化する。

Not: 素のdict型やdataclassを使わなかった理由は、Pydanticが提供するmodel_validate()によるJSON→オブジェクト変換とバリデーションが一体化しているため。TypedDictは実行時バリデーションがない。dataclassは型変換が自動でない。

from pydantic import BaseModel, Field
from typing import Optional, List, Literal
from datetime import datetime

class Intent(BaseModel):
    """意図の構造化データ(Why/What/How/Not の4要素)"""
    why: str = Field(description="なぜこの要素が必要か")
    what: str = Field(description="何を実現するか")
    how: Optional[str] = Field(default=None, description="どう実現するか")
    not_chosen: Optional[str] = Field(default=None, description="なぜ他を選ばなかったか")

class Constraint(BaseModel):
    """制約条件"""
    text: str = Field(description="制約の内容")
    level: Literal["MUST", "SHOULD", "MAY"] = Field(description="制約レベル")
    verified: bool = Field(default=False, description="検証済みかどうか")
    source_section: str = Field(description="抽出元のセクション名")

class TraceLink(BaseModel):
    """トレースリンク"""
    target_id: str = Field(description="リンク先のID")
    link_type: Literal["upstream", "downstream", "related"]
    description: Optional[str] = Field(default=None, description="リンクの説明")

class ValidationIssue(BaseModel):
    """検証で発見された問題"""
    severity: Literal["critical", "warning", "info"]
    category: str = Field(description="問題のカテゴリ")
    description: str = Field(description="問題の説明")
    location: Optional[str] = Field(default=None, description="問題の発生箇所")
    suggestion: Optional[str] = Field(default=None, description="修正の提案")

class LLMParserOutput(BaseModel):
    """LLMパーサーの出力スキーマ"""
    artifact_id: str = Field(description="成果物ID(新規生成または既存)")
    artifact_type: Literal["requirement", "design", "implementation", "test", "intent"]
    title: str = Field(description="成果物タイトル")
    summary: str = Field(description="概要(1-2文)")
    intent: Intent = Field(description="抽出された意図")
    constraints: List[Constraint] = Field(default_factory=list, description="制約条件リスト")
    trace_links: List[TraceLink] = Field(default_factory=list, description="トレースリンクリスト")
    validation_issues: List[ValidationIssue] = Field(default_factory=list, description="検証問題リスト")
    confidence_score: float = Field(ge=0.0, le=1.0, description="信頼度スコア")

LLMプロセッサー

import hashlib
import json
from typing import Optional, Type, TypeVar
from pydantic import BaseModel
import boto3
from datetime import datetime

T = TypeVar('T', bound=BaseModel)

class LLMProcessor:
    """LLM処理の共通クラス"""
    
    def __init__(
        self,
        bedrock_client,
        dynamodb_table,
        model_id: str = "anthropic.claude-sonnet-4-5-20250929-v1:0",
        cache_enabled: bool = True
    ):
        self.bedrock = bedrock_client
        self.table = dynamodb_table
        self.model_id = model_id
        self.cache_enabled = cache_enabled
    
    def _compute_hash(self, content: str, system_prompt: str) -> str:
        """入力のハッシュを計算"""
        combined = f"{content}|{system_prompt}|{self.model_id}"
        return hashlib.sha256(combined.encode()).hexdigest()[:16]
    
    def _check_cache(self, content_hash: str) -> Optional[dict]:
        """キャッシュをチェック"""
        if not self.cache_enabled:
            return None
        
        try:
            response = self.table.get_item(
                Key={
                    'PK': f'CACHE#{content_hash}',
                    'SK': 'RESULT'
                }
            )
            item = response.get('Item')
            if item:
                return json.loads(item['result'])
        except Exception:
            pass
        return None
    
    def _save_cache(self, content_hash: str, result: dict, ttl_hours: int = 24):
        """結果をキャッシュに保存"""
        if not self.cache_enabled:
            return
        
        import time
        ttl = int(time.time()) + (ttl_hours * 3600)
        
        try:
            self.table.put_item(
                Item={
                    'PK': f'CACHE#{content_hash}',
                    'SK': 'RESULT',
                    'result': json.dumps(result),
                    'ttl': ttl,
                    'created_at': datetime.utcnow().isoformat()
                }
            )
        except Exception:
            pass
    
    def process(
        self,
        content: str,
        system_prompt: str,
        output_schema: Type[T],
        max_tokens: int = 4096
    ) -> T:
        """LLMで処理し、構造化された出力を返す"""
        
        # キャッシュチェック
        content_hash = self._compute_hash(content, system_prompt)
        cached = self._check_cache(content_hash)
        if cached:
            return output_schema.model_validate(cached)
        
        # LLM呼び出し
        response = self.bedrock.invoke_model(
            modelId=self.model_id,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "system": system_prompt,
                "messages": [
                    {
                        "role": "user",
                        "content": content
                    }
                ]
            })
        )
        
        response_body = json.loads(response['body'].read())
        result_text = response_body['content'][0]['text']
        
        # JSON抽出
        result_json = self._extract_json(result_text)
        
        # バリデーション
        parsed_result = output_schema.model_validate(result_json)
        
        # キャッシュ保存
        self._save_cache(content_hash, result_json)
        
        return parsed_result
    
    def _extract_json(self, text: str) -> dict:
        """テキストからJSONを抽出"""
        import re
        
        # コードブロック内のJSONを探す
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
        if json_match:
            return json.loads(json_match.group(1))
        
        # 直接JSONとしてパース
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass
        
        # { から } までを抽出
        json_match = re.search(r'\{[\s\S]*\}', text)
        if json_match:
            return json.loads(json_match.group())
        
        raise ValueError("JSONの抽出に失敗しました")

プロジェクトコンテキストの構築(階層継承付き)

このクラスが、3層階層の「ルール/ガードレールの継承」を実現する中核コンポーネントです。検証対象のサブプロジェクトに対し、必ず上位層(Organization → Project)の意図・制約を累積的に読み込み、すべてのルールが適用されるコンテキストを構築します。

import yaml
from dataclasses import dataclass, field

@dataclass
class InheritedConstraint:
    """継承された制約"""
    text: str
    level: str  # MUST / SHOULD / MAY
    source_layer: str  # L0_org / L1_project / L2_subproject
    source_file: str

@dataclass
class HierarchicalContext:
    """階層的に継承されたコンテキスト"""
    org_intent: str = ""
    project_intent: str = ""
    subproject_intent: str = ""
    inherited_constraints: list = field(default_factory=list)
    effective_constraints: list = field(default_factory=list)  # 最終的に有効な制約


class ProjectContextBuilder:
    """プロジェクトコンテキストを構築するクラス
    
    Why: 組織の階層構造に従い、上位層のルール/ガードレールを
         下位層に自動継承するため。検証時に「組織のセキュリティポリシーに
         違反していないか」を自動チェックするには、上位の制約を
         漏れなく読み込む必要がある。
    
    Not: 各層の制約を個別に管理しフラットに結合する方法は採用しなかった。
         フラット結合では「どの層で定めた制約か」が失われ、
         緩和禁止ルールの検証ができなくなるため。
    """
    
    def __init__(self, s3_client, bucket_name: str):
        self.s3 = s3_client
        self.bucket = bucket_name
    
    def build_context(
        self,
        org_id: str,
        project_id: Optional[str] = None,
        subproject_id: Optional[str] = None
    ) -> HierarchicalContext:
        """階層に応じたコンテキストを構築
        
        重要: L0 → L1 → L2の順に読み込み、上位の制約は必ず継承される。
        下位層で上位の制約を緩和することは許可されない(厳格化のみ可)。
        """
        ctx = HierarchicalContext()
        all_constraints = []
        
        # L0: Organization level(組織全体のルール = 全プロジェクトに適用)
        org_intent = self._read_intent(f"{org_id}/_intent/")
        org_config = self._read_config(f"{org_id}/_config/organization.yaml")
        if org_intent:
            ctx.org_intent = org_intent
        if org_config:
            org_constraints = self._extract_constraints(org_config, "L0_org")
            all_constraints.extend(org_constraints)
        
        # L1: Project level(プロジェクト固有のルール = 組織ルールを継承+厳格化)
        if project_id:
            project_intent = self._read_intent(f"{org_id}/{project_id}/_intent/")
            project_config = self._read_config(
                f"{org_id}/{project_id}/_config/project.yaml"
            )
            if project_intent:
                ctx.project_intent = project_intent
            if project_config:
                project_constraints = self._extract_constraints(
                    project_config, "L1_project"
                )
                # 上位制約との整合性チェック(緩和されていないか検証)
                self._validate_no_relaxation(all_constraints, project_constraints)
                all_constraints.extend(project_constraints)
        
        # L2: SubProject level(サービス固有のルール = 上位ルールを継承+厳格化)
        if project_id and subproject_id:
            subproject_intent = self._read_intent(
                f"{org_id}/{project_id}/{subproject_id}/_intent/"
            )
            subproject_config = self._read_config(
                f"{org_id}/{project_id}/{subproject_id}/_config/subproject.yaml"
            )
            if subproject_intent:
                ctx.subproject_intent = subproject_intent
            if subproject_config:
                sub_constraints = self._extract_constraints(
                    subproject_config, "L2_subproject"
                )
                self._validate_no_relaxation(all_constraints, sub_constraints)
                all_constraints.extend(sub_constraints)
        
        ctx.inherited_constraints = all_constraints
        ctx.effective_constraints = all_constraints  # 全層の制約が有効
        
        return ctx
    
    def _validate_no_relaxation(
        self,
        parent_constraints: list,
        child_constraints: list
    ) -> None:
        """下位層が上位層の制約を緩和していないか検証
        
        例: 上位でMUSTだった制約を下位でSHOULDに変更 → 違反
        例: 上位でMUSTだった制約を下位でより厳しいMUSTに変更 → 許可
        """
        LEVEL_ORDER = {"MUST": 3, "SHOULD": 2, "MAY": 1}
        
        parent_by_key = {}
        for c in parent_constraints:
            # 制約テキストの正規化してキーとする
            key = c.text.strip().lower()
            parent_by_key[key] = c
        
        for child in child_constraints:
            key = child.text.strip().lower()
            if key in parent_by_key:
                parent = parent_by_key[key]
                parent_level = LEVEL_ORDER.get(parent.level, 0)
                child_level = LEVEL_ORDER.get(child.level, 0)
                
                if child_level < parent_level:
                    raise ValueError(
                        f"制約の緩和は許可されていません: "
                        f"'{child.text}' を {parent.level}{child.level} に "
                        f"緩和しようとしています "
                        f"(定義元: {parent.source_layer})"
                    )
    
    def _read_intent(self, prefix: str) -> Optional[str]:
        """指定されたプレフィックス配下の意図ファイルを読み込む"""
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.bucket,
                Prefix=prefix
            )
            
            intent_contents = []
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('.md'):
                    content = self.s3.get_object(
                        Bucket=self.bucket,
                        Key=obj['Key']
                    )['Body'].read().decode('utf-8')
                    intent_contents.append(content)
            
            return "\n---\n".join(intent_contents) if intent_contents else None
        except Exception:
            return None
    
    def _read_config(self, key: str) -> Optional[dict]:
        """設定ファイル(YAML)を読み込む"""
        try:
            content = self.s3.get_object(
                Bucket=self.bucket,
                Key=key
            )['Body'].read().decode('utf-8')
            return yaml.safe_load(content)
        except Exception:
            return None
    
    def _extract_constraints(
        self, config: dict, source_layer: str
    ) -> list:
        """設定ファイルから制約を抽出"""
        constraints = []
        for c in config.get('constraints', []):
            constraints.append(InheritedConstraint(
                text=c['text'],
                level=c['level'],
                source_layer=source_layer,
                source_file=config.get('source_file', 'unknown')
            ))
        return constraints
    
    def build_context_text(self, ctx: HierarchicalContext) -> str:
        """LLMに渡すためのテキスト形式に変換"""
        parts = []
        
        if ctx.org_intent:
            parts.append(f"## 組織レベルの意図・制約(全プロジェクトに適用)\n{ctx.org_intent}")
        
        if ctx.project_intent:
            parts.append(f"## プロジェクトレベルの意図・制約(組織ルールを継承)\n{ctx.project_intent}")
        
        if ctx.subproject_intent:
            parts.append(f"## サブプロジェクトレベルの意図・制約(上位ルールを継承)\n{ctx.subproject_intent}")
        
        if ctx.effective_constraints:
            constraint_text = "\n".join([
                f"- [{c.level}] {c.text} (定義元: {c.source_layer})"
                for c in ctx.effective_constraints
            ])
            parts.append(f"## 有効な制約一覧(全層の累積)\n{constraint_text}")
        
        return "\n\n".join(parts)

アトミックカウンターによるID採番

class AtomicIDGenerator:
    """アトミックカウンターによるID採番"""
    
    def __init__(self, dynamodb_table):
        self.table = dynamodb_table
    
    def generate_id(
        self,
        org_id: str,
        project_id: str,
        subproject_id: str,
        artifact_type: str
    ) -> str:
        """一意のIDを生成
        
        形式: {org_prefix}-{project_prefix}-{subproject_prefix}-{TYPE}-{連番}
        例: acme-ec-auth-REQ-001
        """
        # プレフィックス構築
        counter_key = f"{org_id}-{project_id}-{subproject_id}-{artifact_type}"
        
        # アトミックインクリメント
        response = self.table.update_item(
            Key={
                'PK': 'ATOMIC_COUNTER',
                'SK': counter_key
            },
            UpdateExpression='SET #cnt = if_not_exists(#cnt, :zero) + :inc',
            ExpressionAttributeNames={'#cnt': 'counter'},
            ExpressionAttributeValues={':zero': 0, ':inc': 1},
            ReturnValues='UPDATED_NEW'
        )
        
        counter = response['Attributes']['counter']
        
        # 型プレフィックスの取得
        type_prefix = {
            'requirement': 'REQ',
            'design': 'DES',
            'implementation': 'IMP',
            'test': 'TST'
        }.get(artifact_type, 'DOC')
        
        return f"{org_id}-{project_id}-{subproject_id}-{type_prefix}-{counter:03d}"

Step Functionsワークフロー

IDDには2つの主要なワークフローがあります。

  1. ドキュメント処理ワークフロー: ドキュメント登録 → 意図抽出 → ベクトル保存
  2. 意図乖離検証ワークフロー: カスケードチェック → 民主型投票 → 結果保存

まず、ドキュメント処理ワークフローを見た後、意図乖離検証ワークフローの詳細(民主型投票の実装を含む)は次のセクションで説明します。

ドキュメント処理ワークフロー定義

Step Functionsを使用して、ドキュメント登録時の処理フローを管理します。

{
  "Comment": "IDD Document Processing Workflow",
  "StartAt": "ParseDocument",
  "States": {
    "ParseDocument": {
      "Type": "Task",
      "Resource": "${ParseLambdaArn}",
      "ResultPath": "$.parseResult",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "HandleError"
        }
      ],
      "Next": "ExtractIntent"
    },
    
    "ExtractIntent": {
      "Type": "Task",
      "Resource": "${ExtractIntentLambdaArn}",
      "ResultPath": "$.extractResult",
      "Retry": [
        {
          "ErrorEquals": ["LLMThrottlingException"],
          "IntervalSeconds": 5,
          "MaxAttempts": 5,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "HandleError"
        }
      ],
      "Next": "CheckValidationRequired"
    },
    
    "CheckValidationRequired": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.extractResult.requiresValidation",
          "BooleanEquals": true,
          "Next": "ValidateIntent"
        }
      ],
      "Default": "GenerateEmbedding"
    },
    
    "ValidateIntent": {
      "Type": "Task",
      "Resource": "${ValidateLambdaArn}",
      "ResultPath": "$.validationResult",
      "Next": "GenerateEmbedding"
    },
    
    "GenerateEmbedding": {
      "Type": "Task",
      "Resource": "${EmbedLambdaArn}",
      "ResultPath": "$.embeddingResult",
      "Next": "StoreResult"
    },
    
    "StoreResult": {
      "Type": "Task",
      "Resource": "${StoreResultLambdaArn}",
      "End": true
    },
    
    "HandleError": {
      "Type": "Task",
      "Resource": "${ErrorHandlerLambdaArn}",
      "End": true
    }
  }
}

ワークフローの可視化

Step Functionsのコンソールでは、ワークフローの実行状況をリアルタイムで可視化できます。これは、デバッグや監視において非常に有用です。

ドキュメント処理ワークフロー:

意図乖離検証ワークフロー:

意図乖離検証ワークフロー

ドキュメント処理ワークフローが「登録」のフローだとすれば、意図乖離検証ワークフローは「検証」のフローです。ここでは、第8回で説明したカスケード処理と民主型投票をAWS上でどう実装するかを、具体的に見ていきます。

カスケード → 民主型投票の全体フロー

Step Functions定義: カスケード検証

{
  "Comment": "IDD Intent Drift Validation - Cascade + Democratic Voting",
  "StartAt": "BuildContext",
  "States": {
    "BuildContext": {
      "Type": "Task",
      "Resource": "${BuildContextLambdaArn}",
      "ResultPath": "$.context",
      "Next": "SearchRelatedIntents"
    },
    
    "SearchRelatedIntents": {
      "Type": "Task",
      "Resource": "${SearchIntentLambdaArn}",
      "ResultPath": "$.relatedIntents",
      "Comment": "OpenSearch Serverlessで関連意図を検索",
      "Next": "Level1LightCheck"
    },
    
    "Level1LightCheck": {
      "Type": "Task",
      "Resource": "${CascadeLevel1LambdaArn}",
      "ResultPath": "$.level1Result",
      "Comment": "Haiku 4.5で軽量チェック",
      "Retry": [
        {
          "ErrorEquals": ["LLMThrottlingException"],
          "IntervalSeconds": 3,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Next": "CheckLevel1Confidence"
    },
    
    "CheckLevel1Confidence": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.level1Result.confidence_score",
          "NumericGreaterThanOrEquals": 0.8,
          "Next": "StoreResult"
        }
      ],
      "Default": "Level2StandardAnalysis"
    },
    
    "Level2StandardAnalysis": {
      "Type": "Task",
      "Resource": "${CascadeLevel2LambdaArn}",
      "ResultPath": "$.level2Result",
      "Comment": "Sonnet 4.5で標準分析",
      "Next": "CheckLevel2Confidence"
    },
    
    "CheckLevel2Confidence": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.level2Result.confidence_score",
          "NumericGreaterThanOrEquals": 0.8,
          "Next": "StoreResult"
        }
      ],
      "Default": "Level3DemocraticVoting"
    },
    
    "Level3DemocraticVoting": {
      "Type": "Map",
      "ItemsPath": "$.votingConfig.llmConfigs",
      "MaxConcurrency": 3,
      "Iterator": {
        "StartAt": "InvokeLLM",
        "States": {
          "InvokeLLM": {
            "Type": "Task",
            "Resource": "${VotingAnalysisLambdaArn}",
            "TimeoutSeconds": 60,
            "Retry": [
              {
                "ErrorEquals": ["States.TaskFailed"],
                "IntervalSeconds": 5,
                "MaxAttempts": 2,
                "BackoffRate": 2.0
              }
            ],
            "Catch": [
              {
                "ErrorEquals": ["States.ALL"],
                "ResultPath": "$.error",
                "Next": "MarkAbstention"
              }
            ],
            "End": true
          },
          "MarkAbstention": {
            "Type": "Pass",
            "Result": {
              "status": "abstention",
              "confidence_score": 0.0,
              "reason": "LLM invocation failed"
            },
            "End": true
          }
        }
      },
      "ResultPath": "$.votingResults",
      "Next": "AggregateVotes"
    },
    
    "AggregateVotes": {
      "Type": "Task",
      "Resource": "${AggregateVotesLambdaArn}",
      "ResultPath": "$.finalResult",
      "Next": "CheckConsensus"
    },
    
    "CheckConsensus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.finalResult.mode",
          "StringEquals": "deferred",
          "Next": "SendToDLQ"
        },
        {
          "Variable": "$.finalResult.status",
          "StringEquals": "needs_review",
          "Next": "RequestHumanReview"
        }
      ],
      "Default": "StoreResult"
    },
    
    "RequestHumanReview": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage",
      "Parameters": {
        "QueueUrl": "${HumanReviewQueueUrl}",
        "MessageBody.$": "States.JsonToString($.finalResult)"
      },
      "Next": "StoreResult"
    },
    
    "SendToDLQ": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage",
      "Parameters": {
        "QueueUrl": "${DeadLetterQueueUrl}",
        "MessageBody.$": "States.JsonToString($)"
      },
      "End": true
    },
    
    "StoreResult": {
      "Type": "Task",
      "Resource": "${StoreValidationResultLambdaArn}",
      "Next": "EmitMetrics"
    },
    
    "EmitMetrics": {
      "Type": "Task",
      "Resource": "${EmitMetricsLambdaArn}",
      "End": true
    }
  }
}

民主型投票のLambda実装

ここが最も重要な部分です。Step FunctionsのMap Stateから呼び出される各Lambda関数が、異なるLLMプロバイダに対して隔離されたリクエストを送信します。

投票分析Lambda(プロバイダ抽象化)

import json
import boto3
import os
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime

class VotingAnalysisInput(BaseModel):
    """Map Stateから渡される各LLM用の入力"""
    llm_provider: str = Field(description="bedrock / openai / google")
    llm_model: str = Field(description="モデルID")
    perspective: str = Field(description="分析観点の指示")
    intent: dict
    artifact: dict
    context: dict
    # 注意: 他のLLMの結果は一切含まない(独立性保証)

class VotingAnalysisOutput(BaseModel):
    """各LLMの分析結果"""
    llm_provider: str
    llm_model: str
    status: str = Field(description="passed / warning / failed")
    confidence_score: float = Field(ge=0.0, le=1.0)
    rationale: str
    issues: list = Field(default_factory=list)
    analyzed_at: str

# プロバイダごとのクライアント
class BedrockProvider:
    """Bedrock経由のLLM呼び出し"""
    
    def __init__(self):
        self.client = boto3.client('bedrock-runtime')
    
    def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
        response = self.client.invoke_model(
            modelId=model_id,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "temperature": 0.1,
                "system": "あなたは意図乖離検証のエキスパートです。",
                "messages": [{"role": "user", "content": prompt}]
            })
        )
        body = json.loads(response['body'].read())
        return body['content'][0]['text']

class OpenAIProvider:
    """OpenAI API経由のLLM呼び出し"""
    
    def __init__(self):
        # Secrets ManagerからAPIキーを取得
        secrets = boto3.client('secretsmanager')
        secret = json.loads(
            secrets.get_secret_value(
                SecretId=os.environ['OPENAI_SECRET_ARN']
            )['SecretString']
        )
        self.api_key = secret['api_key']
    
    def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
        import urllib.request
        
        req = urllib.request.Request(
            'https://api.openai.com/v1/chat/completions',
            data=json.dumps({
                "model": model_id,
                "messages": [
                    {"role": "system", "content": "あなたは意図乖離検証のエキスパートです。"},
                    {"role": "user", "content": prompt}
                ],
                "max_tokens": max_tokens,
                "temperature": 0.1
            }).encode(),
            headers={
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            }
        )
        
        with urllib.request.urlopen(req, timeout=45) as resp:
            body = json.loads(resp.read())
            return body['choices'][0]['message']['content']

class GoogleProvider:
    """Google Gemini API経由のLLM呼び出し"""
    
    def __init__(self):
        secrets = boto3.client('secretsmanager')
        secret = json.loads(
            secrets.get_secret_value(
                SecretId=os.environ['GOOGLE_SECRET_ARN']
            )['SecretString']
        )
        self.api_key = secret['api_key']
    
    def invoke(self, model_id: str, prompt: str, max_tokens: int = 4096) -> str:
        import urllib.request
        
        url = f'https://generativelanguage.googleapis.com/v1/models/{model_id}:generateContent?key={self.api_key}'
        req = urllib.request.Request(
            url,
            data=json.dumps({
                "contents": [{"parts": [{"text": prompt}]}],
                "generationConfig": {
                    "maxOutputTokens": max_tokens,
                    "temperature": 0.1
                }
            }).encode(),
            headers={'Content-Type': 'application/json'}
        )
        
        with urllib.request.urlopen(req, timeout=45) as resp:
            body = json.loads(resp.read())
            return body['candidates'][0]['content']['parts'][0]['text']

# プロバイダファクトリ
PROVIDERS = {
    'bedrock': BedrockProvider,
    'openai': OpenAIProvider,
    'google': GoogleProvider
}

def handler(event, context):
    """民主型投票の個別分析Lambda
    
    Step Functions Map Stateから呼び出される。
    各LLMプロバイダに隔離されたリクエストを送信し、
    構造化された分析結果を返す。
    """
    input_data = VotingAnalysisInput.model_validate(event)
    
    # プロバイダの初期化
    provider = PROVIDERS[input_data.llm_provider]()
    
    # 分析プロンプトの構築(観点別の指示を含む)
    prompt = f"""
{input_data.perspective}

## 検証対象

### 意図(Why/What/How/Not)
{json.dumps(input_data.intent, ensure_ascii=False, indent=2)}

### 対応する成果物
{json.dumps(input_data.artifact, ensure_ascii=False, indent=2)}

### プロジェクトコンテキスト
{json.dumps(input_data.context, ensure_ascii=False, indent=2)}

## 出力形式
以下のJSON形式で出力してください:
{{
  "status": "passed" | "warning" | "failed",
  "confidence_score": 0.0〜1.0,
  "rationale": "判断の根拠(具体的に)",
  "issues": [
    {{
      "severity": "critical" | "warning" | "info",
      "description": "問題の説明",
      "suggestion": "修正提案"
    }}
  ]
}}
"""
    
    # LLM呼び出し
    response_text = provider.invoke(
        model_id=input_data.llm_model,
        prompt=prompt
    )
    
    # JSON抽出とバリデーション
    import re
    json_match = re.search(r'\{[\s\S]*\}', response_text)
    if not json_match:
        raise ValueError("LLM出力からJSONを抽出できませんでした")
    
    result = json.loads(json_match.group())
    
    return VotingAnalysisOutput(
        llm_provider=input_data.llm_provider,
        llm_model=input_data.llm_model,
        status=result['status'],
        confidence_score=result['confidence_score'],
        rationale=result['rationale'],
        issues=result.get('issues', []),
        analyzed_at=datetime.utcnow().isoformat()
    ).model_dump()

投票集計Lambda

from collections import Counter
from typing import List
from pydantic import BaseModel

class AggregateVotesOutput(BaseModel):
    """投票集計の最終結果"""
    status: str
    confidence_score: float
    mode: str  # normal / degraded / deferred
    consensus: str
    majority_opinion: dict = None
    dissenting_opinions: list = Field(default_factory=list)
    abstentions: list = Field(default_factory=list)
    all_results: list = Field(default_factory=list)
    metadata: dict = Field(default_factory=dict)

def handler(event, context):
    """投票集計: Map Stateの出力を集約して最終判定"""
    
    voting_results = event.get('votingResults', [])
    min_required_votes = event.get('votingConfig', {}).get('min_votes', 2)
    
    # 有効票と棄権を分離
    valid_votes = []
    abstentions = []
    
    for result in voting_results:
        if result.get('status') == 'abstention':
            abstentions.append({
                'llm_provider': result.get('llm_provider', 'unknown'),
                'reason': result.get('reason', 'unknown')
            })
        else:
            valid_votes.append(result)
    
    # 全LLM障害
    if len(valid_votes) == 0:
        return AggregateVotesOutput(
            status="deferred",
            confidence_score=0.0,
            mode="deferred",
            consensus="0/3 - all failed",
            abstentions=abstentions,
            metadata={"reason": "全LLMが応答不能"}
        ).model_dump()
    
    # 縮退モード: 1票のみ
    if len(valid_votes) < min_required_votes:
        vote = valid_votes[0]
        return AggregateVotesOutput(
            status=vote['status'],
            confidence_score=max(0.0, vote['confidence_score'] - 0.3),
            mode="degraded",
            consensus=f"1/{len(voting_results)} - degraded mode",
            all_results=valid_votes,
            abstentions=abstentions,
            metadata={"warning": "縮退モード: 信頼度に0.3のペナルティ適用"}
        ).model_dump()
    
    # 通常モード: 多数決
    status_counts = Counter(v['status'] for v in valid_votes)
    
    # 三者三様チェック
    if len(status_counts) == len(valid_votes) and len(valid_votes) >= 3:
        return AggregateVotesOutput(
            status="needs_review",
            confidence_score=0.0,
            mode="normal",
            consensus=f"三者三様 - 人間レビュー必要",
            all_results=valid_votes,
            metadata={"reason": "全LLMの判断が異なる"}
        ).model_dump()
    
    # 多数決
    majority_status = status_counts.most_common(1)[0][0]
    majority_votes = [v for v in valid_votes if v['status'] == majority_status]
    dissenting_votes = [v for v in valid_votes if v['status'] != majority_status]
    
    # 信頼度: 多数派の平均
    avg_confidence = sum(v['confidence_score'] for v in majority_votes) / len(majority_votes)
    
    return AggregateVotesOutput(
        status=majority_status,
        confidence_score=round(avg_confidence, 3),
        mode="normal",
        consensus=f"{len(majority_votes)}/{len(valid_votes)} agreed on {majority_status}",
        majority_opinion={
            "status": majority_status,
            "llms": [v['llm_provider'] for v in majority_votes],
            "rationale": majority_votes[0]['rationale']
        },
        dissenting_opinions=[
            {
                "llm_provider": v['llm_provider'],
                "status": v['status'],
                "rationale": v['rationale'],
                "confidence": v['confidence_score']
            }
            for v in dissenting_votes
        ],
        abstentions=abstentions,
        all_results=valid_votes,
        metadata={
            "voting_timestamp": datetime.utcnow().isoformat(),
            "total_llms": len(voting_results),
            "valid_votes": len(valid_votes),
            "abstention_count": len(abstentions)
        }
    ).model_dump()

議論フェーズの実装(オプション)

MUST制約の検証など、高精度が求められる場合は「議論フェーズ」を組み込みます。これはStep Functionsのループで実現します。

def build_debate_round2_prompt(
    initial_analysis: dict,
    other_analyses: list,
    perspective: str
) -> str:
    """議論フェーズRound 2のプロンプト構築
    
    Round 1の結果を参照し、自分の分析を再評価する。
    独立性を「意図的に緩和」する唯一のフェーズ。
    """
    other_text = "\n\n".join([
        f"### エキスパート {i+1} ({a['llm_provider']})\n"
        f"判定: {a['status']} (信頼度: {a['confidence_score']})\n"
        f"根拠: {a['rationale']}"
        for i, a in enumerate(other_analyses)
    ])
    
    return f"""
{perspective}

## あなたの初期分析(Round 1)
判定: {initial_analysis['status']}
信頼度: {initial_analysis['confidence_score']}
根拠: {initial_analysis['rationale']}

## 他のエキスパートの分析
{other_text}

## 指示
他のエキスパートの分析を参考に、あなたの初期分析を再評価してください。

1. 他のエキスパートが指摘した点で、あなたが見落としていたものはありますか?
2. 他のエキスパートの分析に同意できない点はありますか?その理由は?
3. 再評価の結果、あなたの判断は変わりますか?

最終的な判断を、根拠とともにJSON形式で出力してください。
"""

主要フローのシーケンス図

ここまでの内容を、時間軸に沿ったシーケンス図で整理します。

フロー1: ドキュメント登録 → 意図抽出 → ベクトル保存

フロー2: 意図乖離検証(カスケード → 民主型投票)

フロー3: 議論フェーズ付き民主型投票

Bedrockの活用

モデルの使い分け

IDDでは、タスクに応じてBedrockの異なるモデルを使い分けます。

タスク モデル 理由
意図抽出 Claude Sonnet 4.5 バランスの良い性能とコスト
深い分析 Claude Opus 4.6 複雑な判断が必要な場合
軽量チェック Claude Haiku 4.5 高速・低コスト
ベクトル埋め込み Titan Embeddings v2 1536次元、多言語対応

カスケード処理の実装

Why: 全検証を高精度モデル(Opus)で実行すると、1件あたり$0.58、月$580のLLMコストが発生する。カスケード処理により、大部分をHaiku($0.01/件)やSonnet($0.06/件)で処理し、本当に判断が難しいケースのみOpusにエスカレーションすることで約90%のコスト削減を実現する。

Not: 「最初から全件Opusで処理する」アプローチを採用しなかった理由は、コストだけでなくレイテンシもある。Opusの応答時間はHaikuの5-10倍であり、開発者へのフィードバック速度が著しく低下する。また、「全件を軽量モデルだけで処理する」アプローチも採用しなかった——MUST制約の検証など高精度が求められるケースでは、軽量モデルの判断では信頼性が不十分なためである。

class CascadeLLMProcessor:
    """カスケード処理によるコスト最適化"""
    
    MODELS = {
        'light': 'anthropic.claude-haiku-4-5-20251001-v1:0',
        'standard': 'anthropic.claude-sonnet-4-5-20250929-v1:0',
        'deep': 'anthropic.claude-opus-4-6-v1'
    }
    
    def __init__(self, bedrock_client, dynamodb_table):
        self.processors = {
            level: LLMProcessor(
                bedrock_client, 
                dynamodb_table, 
                model_id=model_id
            )
            for level, model_id in self.MODELS.items()
        }
    
    def process_with_cascade(
        self,
        content: str,
        system_prompt: str,
        output_schema: Type[T],
        confidence_threshold: float = 0.8
    ) -> T:
        """カスケード処理で最適なモデルを選択"""
        
        # Level 1: 軽量チェック
        light_result = self.processors['light'].process(
            content, system_prompt, output_schema
        )
        
        if light_result.confidence_score >= confidence_threshold:
            return light_result
        
        # Level 2: 標準分析
        standard_result = self.processors['standard'].process(
            content, system_prompt, output_schema
        )
        
        if standard_result.confidence_score >= confidence_threshold:
            return standard_result
        
        # Level 3: 深い分析
        return self.processors['deep'].process(
            content, system_prompt, output_schema
        )

ベクトル埋め込みの生成とOpenSearch Serverlessへの保存

class EmbeddingGenerator:
    """Amazon Titan Embeddings v2を使用したベクトル生成"""
    
    MODEL_ID = "amazon.titan-embed-text-v2:0"
    
    def __init__(self, bedrock_client):
        self.bedrock = bedrock_client
    
    def generate_embedding(self, text: str) -> List[float]:
        """テキストをベクトルに変換"""
        
        # テキストの前処理(最大8000トークン)
        processed_text = text[:32000]  # 大まかな文字数制限
        
        response = self.bedrock.invoke_model(
            modelId=self.MODEL_ID,
            body=json.dumps({
                "inputText": processed_text,
                "dimensions": 1536,
                "normalize": True
            })
        )
        
        response_body = json.loads(response['body'].read())
        return response_body['embedding']
    
    def generate_batch_embeddings(
        self, 
        texts: List[str]
    ) -> List[List[float]]:
        """バッチでベクトル生成"""
        return [self.generate_embedding(text) for text in texts]


class IntentIndexer:
    """意図をOpenSearch Serverlessに登録"""
    
    def __init__(self, opensearch_client, embedding_generator):
        self.client = opensearch_client
        self.embedder = embedding_generator
        self.index_name = "intent-embeddings"
    
    def index_intent(self, parsed_result: LLMParserOutput, org_id: str,
                     project_id: str, subproject_id: str) -> dict:
        """抽出された意図をベクトル化してインデックスに登録"""
        
        # Why要素を中心にベクトル化(最も重要な要素)
        embed_text = f"{parsed_result.intent.why}\n{parsed_result.intent.what}"
        if parsed_result.intent.how:
            embed_text += f"\n{parsed_result.intent.how}"
        
        embedding = self.embedder.generate_embedding(embed_text)
        
        document = {
            "id": parsed_result.artifact_id,
            "org_id": org_id,
            "project_id": project_id,
            "subproject_id": subproject_id,
            "artifact_type": parsed_result.artifact_type,
            "content": parsed_result.summary,
            "intent_why": parsed_result.intent.why,
            "intent_what": parsed_result.intent.what,
            "intent_how": parsed_result.intent.how,
            "intent_not": parsed_result.intent.not_chosen,
            "embedding": embedding,
            "constraints": [
                {"text": c.text, "level": c.level}
                for c in parsed_result.constraints
            ],
            "created_at": datetime.utcnow().isoformat(),
            "updated_at": datetime.utcnow().isoformat()
        }
        
        return self.client.index(
            index=self.index_name,
            id=parsed_result.artifact_id,
            body=document
        )

設定管理(AppConfig)

動的設定の管理

Why: IDDの運用では、「信頼度閾値を0.8から0.7に一時的に下げたい」「民主型投票を一時的に無効にしたい」といった設定変更が、デプロイなしで必要になる。AppConfigにより、Lambda関数の再デプロイなしで設定をリアルタイム変更できる。

Not: 環境変数やハードコードを採用しなかった理由は、変更のたびにLambdaの再デプロイが必要になり、設定変更の反映に数分のダウンタイムが発生するため。また、S3上のJSONファイルを定期ポーリングする方式も検討したが、AppConfigの方が段階的ロールアウト(Feature Flag的な運用)に対応しており、設定変更のリスクを低減できる。

AppConfigを使用して、実行時に設定を変更できるようにしています。

import boto3
from functools import lru_cache
import json

class AppConfigClient:
    """AppConfigから設定を取得"""
    
    def __init__(
        self,
        application: str,
        environment: str,
        configuration: str
    ):
        self.client = boto3.client('appconfigdata')
        self.application = application
        self.environment = environment
        self.configuration = configuration
        self._token = None
    
    def start_session(self):
        """設定セッションを開始"""
        response = self.client.start_configuration_session(
            ApplicationIdentifier=self.application,
            EnvironmentIdentifier=self.environment,
            ConfigurationProfileIdentifier=self.configuration
        )
        self._token = response['InitialConfigurationToken']
    
    def get_config(self) -> dict:
        """最新の設定を取得"""
        if not self._token:
            self.start_session()
        
        response = self.client.get_latest_configuration(
            ConfigurationToken=self._token
        )
        
        self._token = response['NextPollConfigurationToken']
        
        if response['Configuration']:
            return json.loads(response['Configuration'].read())
        
        return {}

# 設定例
config = {
    "llm": {
        "default_model": "anthropic.claude-sonnet-4-5-20250929-v1:0",
        "max_tokens": 4096,
        "temperature": 0.1
    },
    "validation": {
        "confidence_threshold": 0.8,
        "require_unanimous": false,
        "min_votes": 2
    },
    "caching": {
        "enabled": true,
        "ttl_hours": 24
    },
    "feature_flags": {
        "multi_agent_voting": true,
        "cascade_processing": true
    }
}

コスト最適化

Why: なぜコスト最適化を設計段階で考えるのか

IDDは「便利だが高コスト」では普及しません。開発チームが月額数百ドル程度で運用できることが、採用の前提条件です。特にLLM利用料は従量課金であり、設計判断(カスケード処理、キャッシング)がコストに直結します。

Not: なぜ「コスト度外視で最高精度」を目指さないのか

全件をOpus + 5 LLMで検証すれば精度は最大化できるが、月額$2,000を超え、中小チームには導入不可能になる。IDDの価値は「完璧な検証」ではなく「実用的なコストで十分な精度の検証を継続的に回すこと」にある。

想定コスト

以下は、中規模プロジェクト(月1,000件の検証)での想定コストです。カスケード処理により、大部分はLevel 1/2で処理が完了し、民主型投票(Level 3)に到達するのは全体の約5〜10%を想定しています。

基盤サービス

サービス 使用量 月額概算
Lambda 100万リクエスト、10万GB秒 $2
Step Functions 15万状態遷移 $4
DynamoDB 10GB、100万RCU/WCU $10
OpenSearch Serverless 4 OCU(最小構成) $260
S3 10GB $0.3
SQS 10万メッセージ $0.1
小計(基盤) 約$276

LLM利用料(カスケード処理込み)

処理 件数(想定) 1件あたり 月額概算
Level 1(Haiku 4.5) 1,000件 $0.01 $10
Level 2(Sonnet 4.5) 250件(25%エスカレーション) $0.06 $15
Level 3 民主型投票 50件(5%エスカレーション) $0.58 $29
Titan Embeddings 1,000件 $0.001 $1
小計(LLM) 約$55
合計: 約$331/月

コスト削減のポイント

1. カスケード処理(最大の削減効果)
全件民主型投票なら月$580のLLMコストが、カスケード処理で$55に。約90%削減

2. キャッシング
同じ入力に対する再処理を避ける。DynamoDBのTTLで自動削除。特に意図抽出はキャッシュヒット率が高い。

3. ハッシュベースの差分検出
変更のないファイルは再処理しない。S3のETagとDynamoDBのハッシュを比較。

4. OpenSearch Serverlessの段階的導入
PoC → Aurora pgvector、本番成長期 → OpenSearch Serverless。データ量に応じた最適選択。

5. 予算認識型スケジューリング
第8回で説明したCostAwareVotingSchedulerにより、日次予算に応じて検証モードを動的に切り替え。

まとめ: この記事自体がIDDの実践である

今回は、AWS上でのサーバーレス実装について詳しく説明しました。しかし、技術選定の表面的な説明にとどまらず、IDD自身が掲げる「Why/Notを明示する」原則を、この記事自体で実践しました。

振り返ると、この記事で明示したNotは以下の通りです:

設計判断 選んだもの 選ばなかったもの(Not)
コンピュート Lambda(サーバーレス) ECS/EKS/Fargate/EC2
プロジェクト階層 3層(Org→Project→SubProject) フラット構造、2層、4層以上
成果物ストレージ S3(プレフィックス階層) Git、Confluence、DynamoDB直接保存
メタデータDB DynamoDB(シングルテーブル) Aurora、マルチテーブル
ベクトル検索 OpenSearch Serverless Aurora pgvector
オーケストレーション Step Functions SWF、自前実装
LLMプロバイダ 3社併用(Bedrock + 外部API) 単一プロバイダ、5社以上
コスト戦略 カスケード処理 全件高精度モデル、全件軽量モデル

設計ドキュメントにNotを書くことで、将来の開発者が「なぜこの構成なのか」だけでなく「なぜ他の構成でないのか」を理解でき、状況が変わったときに適切に再判断できます。 これが、IDDが「意図の乖離」を防ぐための、最も基本的な実践です。

次回予告

次回は、少し哲学的な話に戻ります。「意図を理解する」とはどういうことか。現象学、解釈学、言語ゲームといった哲学的概念を借りながら、意図乖離という問題の深層を探っていきます。


📘 IDD連載ナビゲーション

◀️ 前回: 第8回: AIマルチエージェントと集合知

▶️ 次回: 第10回: 哲学的な問い

Virtual Craft Tech Blog

Discussion