Amazon S3 Vectorsでacl.jsonではなくmetadata filterを選んだ制限付きRAG試してみた

に公開

はじめに

Fusicのレオナです。
2025年12月にGAされ、Amazon Bedrock Knowledge BasesのベクトルストアとしてS3 Vectorsを使えるようになりました。

本ブログでは、Amazon S3 VectorsとAmazon Bedrock Knowledge Basesを使って、Amazon Cognitoのグループごとに検索できるドキュメントを制限するRAGを作ってみます。

Amazon S3 Vectors自体については以前にブログを書いているので、先にそちらをご覧ください。
https://zenn.dev/fusic/articles/14a98be48d9266

検証データにはallganize/RAG-Evaluation-Dataset-JAを使います。

https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-JA

日本語RAG評価向けデータセットのdomain情報を使って、ユーザーごとの検索範囲を制御します。domainにはfinanceitmanufacturingpublicretailなどがあり、文書の分野を表します。この値をCognito groupと対応づけることで、ユーザーが検索できる文書をドメイン単位で制限します。

今回は以下を検証してみます。

1.financeグループのユーザーはfinanceドメインの文書だけ検索できる
2.financeitの両方に所属するユーザーは、両方のドメインを検索できる
3.どの許可グループにも所属しないユーザーは403を返す

前提

ローカルのPython実行環境はuvを使う前提です。また、AWS LambdaのランタイムをPython 3.14にするので、ローカルでもPython 3.14の仮想環境を作っておきます。

uv venv --python 3.14
source .venv/bin/activate

取り込みスクリプトを動かすときは、scripts/requirements.txtuv pip installで入れます。

uv pip install -r scripts/requirements.txt

Terraformはinfra/envs/devから実行します。

cd infra/envs/dev
terraform init
terraform apply

構成

Cognito group: finance
metadata.domain: finance
RetrievalFilter: {"in": {"key": "domain", "value": ["finance"]}}

この対応がずれると、認可は通っているのに検索結果が返らない状態になります。

全体のコード

検索範囲制御の再現に必要な最小構成だけ載せます。README.md.env、実値のterraform.tfvars、Terraformのstate/plan、.terraform/.venv/、単体テスト、e2eテスト、任意のStreamlit UIは省略しています。

ディレクトリ構造
.
|-- infra/
|   |-- envs/dev/
|   |   |-- backend.tf
|   |   |-- providers.tf
|   |   |-- variables.tf
|   |   |-- main.tf
|   |   |-- outputs.tf
|   |   `-- terraform.tfvars.example
|   `-- modules/
|       |-- storage/
|       |   |-- providers.tf
|       |   |-- variables.tf
|       |   |-- main.tf
|       |   `-- outputs.tf
|       |-- kb/
|       |   |-- providers.tf
|       |   |-- variables.tf
|       |   |-- main.tf
|       |   `-- outputs.tf
|       |-- api/
|       |   |-- providers.tf
|       |   |-- variables.tf
|       |   |-- main.tf
|       |   `-- outputs.tf
|       |-- cognito/
|       |   |-- variables.tf
|       |   |-- main.tf
|       |   `-- outputs.tf
|       `-- observability/
|           |-- providers.tf
|           |-- variables.tf
|           |-- main.tf
|           `-- outputs.tf
|-- lambda/rag_handler/
|   |-- handler.py
|   `-- requirements.txt
`-- scripts/
    |-- ingest_allganize.py
    `-- requirements.txt
全体のコード
infra/envs/dev/backend.tf
terraform {
  backend "local" {
    path = "terraform.tfstate"
  }
}
infra/envs/dev/providers.tf
terraform {
  required_version = ">= 1.7.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 6.46"
    }
    archive = {
      source  = "hashicorp/archive"
      version = "~> 2.5"
    }
    random = {
      source  = "hashicorp/random"
      version = "~> 3.6"
    }
  }
}

provider "aws" {
  region = var.region

  default_tags {
    tags = {
      Project     = var.project
      Environment = var.environment
      ManagedBy   = "terraform"
    }
  }
}
infra/envs/dev/variables.tf
variable "project" {
  type        = string
  description = "プロジェクト名のプレフィックス。リソース名とタグに使います。"
  default     = "strict-rag"
}

variable "environment" {
  type    = string
  default = "dev"
}

variable "region" {
  type        = string
  description = "AWSリージョン。主な検証対象はap-northeast-1です。S3 VectorsやBedrockモデルが使えない場合はus-east-1に切り替えます。"
  default     = "ap-northeast-1"
}

variable "domains" {
  type        = list(string)
  description = "ACL用のドメイングループ。finance / manufacturing / public / it / retail の5つを使います。"
  default     = ["finance", "manufacturing", "public", "it", "retail"]
}

variable "embedding_model_id" {
  type        = string
  description = "Bedrockの埋め込みモデルID。Cohere multilingual v3とTitan v2はいずれも多言語対応で1024次元です。"
  default     = "cohere.embed-multilingual-v3"
}

variable "embedding_dimension" {
  type        = number
  description = "ベクトル次元数。埋め込みモデルの出力次元と合わせます。Cohere v3 = 1024、Titan v2 = 1024です。"
  default     = 1024
}

variable "generation_model_id" {
  type        = string
  description = "RetrieveAndGenerateで使うBedrock生成モデルID。anthropic.claude-sonnet-4-6のような基盤モデルID、またはjp.anthropic.claude-sonnet-4-6のような推論プロファイルIDを指定します。"
  default     = "anthropic.claude-sonnet-4-6"
}

variable "callback_urls" {
  type        = list(string)
  description = "Cognito Hosted UIのコールバックURL。"
  default     = ["http://localhost:8501/"]
}

variable "logout_urls" {
  type    = list(string)
  default = ["http://localhost:8501/"]
}

variable "demo_users" {
  type = map(object({
    email  = string
    groups = list(string)
  }))
  description = "apply時に作成するデモ用Cognitoユーザー。初期パスワードは自動生成し、outputで確認します。"
  default = {
    finance_user = {
      email  = "finance_user@example.com"
      groups = ["finance"]
    }
    manufacturing_user = {
      email  = "manufacturing_user@example.com"
      groups = ["manufacturing"]
    }
    multi_user = {
      email  = "multi_user@example.com"
      groups = ["finance", "it"]
    }
    no_group_user = {
      email  = "no_group_user@example.com"
      groups = []
    }
  }
}
infra/envs/dev/main.tf
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}

locals {
  account_id = data.aws_caller_identity.current.account_id
  partition  = data.aws_partition.current.partition
  region     = data.aws_region.current.region
  name       = "${var.project}-${var.environment}"

  embedding_model_arn  = "arn:${local.partition}:bedrock:${local.region}::foundation-model/${var.embedding_model_id}"
  generation_model_arn = startswith(var.generation_model_id, "arn:") ? var.generation_model_id : (length(regexall("^(jp|apac|us|eu|global)\\.", var.generation_model_id)) > 0 ? "arn:${local.partition}:bedrock:${local.region}:${local.account_id}:inference-profile/${var.generation_model_id}" : "arn:${local.partition}:bedrock:${local.region}::foundation-model/${var.generation_model_id}")
}

resource "random_id" "suffix" {
  byte_length = 3
}

module "cognito" {
  source = "../../modules/cognito"

  name          = local.name
  domains       = var.domains
  callback_urls = var.callback_urls
  logout_urls   = var.logout_urls
  demo_users    = var.demo_users
  domain_suffix = random_id.suffix.hex
}

module "storage" {
  source = "../../modules/storage"

  name                = local.name
  suffix              = random_id.suffix.hex
  embedding_dimension = var.embedding_dimension
}

module "kb" {
  source = "../../modules/kb"

  name                = local.name
  corpus_bucket_arn   = module.storage.corpus_bucket_arn
  corpus_bucket_name  = module.storage.corpus_bucket_name
  vector_bucket_arn   = module.storage.vector_bucket_arn
  vector_bucket_name  = module.storage.vector_bucket_name
  vector_index_arn    = module.storage.vector_index_arn
  vector_index_name   = module.storage.vector_index_name
  embedding_model_arn = local.embedding_model_arn
}

module "api" {
  source = "../../modules/api"

  name                 = local.name
  kb_id                = module.kb.knowledge_base_id
  generation_model_arn = local.generation_model_arn
  allowed_groups       = var.domains
  cognito_issuer       = module.cognito.issuer
  cognito_audience     = module.cognito.client_id
}

module "observability" {
  source = "../../modules/observability"

  name = local.name
}
infra/envs/dev/outputs.tf
output "region" {
  value = local.region
}

output "cognito_user_pool_id" {
  value = module.cognito.user_pool_id
}

output "cognito_client_id" {
  value = module.cognito.client_id
}

output "cognito_hosted_ui_domain" {
  value = module.cognito.hosted_ui_domain
}

output "cognito_issuer" {
  value = module.cognito.issuer
}

output "demo_user_initial_passwords" {
  value     = module.cognito.demo_user_initial_passwords
  sensitive = true
}

output "corpus_bucket_name" {
  value = module.storage.corpus_bucket_name
}

output "vector_bucket_name" {
  value = module.storage.vector_bucket_name
}

output "vector_index_name" {
  value = module.storage.vector_index_name
}

output "knowledge_base_id" {
  value = module.kb.knowledge_base_id
}

output "data_source_id" {
  value = module.kb.data_source_id
}

output "api_base_url" {
  value = module.api.invoke_url
}

output "env_example_lines" {
  description = "Streamlitをデプロイ済みインフラに接続するため、app/.envに貼り付ける値。"
  value       = <<-EOT
    AWS_REGION=${local.region}
    COGNITO_USER_POOL_ID=${module.cognito.user_pool_id}
    COGNITO_CLIENT_ID=${module.cognito.client_id}
    COGNITO_DOMAIN=${module.cognito.hosted_ui_domain}
    COGNITO_ISSUER=${module.cognito.issuer}
    API_BASE_URL=${module.api.invoke_url}
  EOT
}
infra/envs/dev/terraform.tfvars.example
project     = "strict-rag"
environment = "dev"
region      = "ap-northeast-1"

domains = ["finance", "manufacturing", "public", "it", "retail"]

embedding_model_id  = "cohere.embed-multilingual-v3"
embedding_dimension = 1024

# Claude Sonnet 4.6の基盤モデルID。東京の地理的クロスリージョン推論を使う場合は、
# 代わりに"jp.anthropic.claude-sonnet-4-6"を指定します。
generation_model_id = "anthropic.claude-sonnet-4-6"

callback_urls = ["http://localhost:8501/"]
logout_urls   = ["http://localhost:8501/"]
infra/modules/storage/providers.tf
terraform {
  required_providers {
    aws = { source = "hashicorp/aws" }
  }
}
infra/modules/storage/variables.tf
variable "name" {
  type = string
}

variable "suffix" {
  type        = string
  description = "S3バケット名をグローバルに一意にするために付けるランダムサフィックス。"
}

variable "embedding_dimension" {
  type    = number
  default = 1024
}
infra/modules/storage/main.tf
resource "aws_s3_bucket" "corpus" {
  bucket        = "${var.name}-corpus-${var.suffix}"
  force_destroy = true
}

resource "aws_s3_bucket_versioning" "corpus" {
  bucket = aws_s3_bucket.corpus.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "corpus" {
  bucket = aws_s3_bucket.corpus.id
  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

resource "aws_s3_bucket_public_access_block" "corpus" {
  bucket                  = aws_s3_bucket.corpus.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_ownership_controls" "corpus" {
  bucket = aws_s3_bucket.corpus.id
  rule {
    object_ownership = "BucketOwnerEnforced"
  }
}

resource "aws_s3vectors_vector_bucket" "this" {
  vector_bucket_name = "${var.name}-vec-${var.suffix}"

  encryption_configuration {
    sse_type = "AES256"
  }
}

resource "aws_s3vectors_index" "this" {
  index_name         = "${var.name}-idx"
  vector_bucket_name = aws_s3vectors_vector_bucket.this.vector_bucket_name

  data_type       = "float32"
  dimension       = var.embedding_dimension
  distance_metric = "cosine"

  metadata_configuration {
    non_filterable_metadata_keys = [
      "AMAZON_BEDROCK_TEXT",
      "AMAZON_BEDROCK_METADATA",
    ]
  }
}
infra/modules/storage/outputs.tf
output "corpus_bucket_name" {
  value = aws_s3_bucket.corpus.id
}

output "corpus_bucket_arn" {
  value = aws_s3_bucket.corpus.arn
}

output "vector_bucket_name" {
  value = aws_s3vectors_vector_bucket.this.vector_bucket_name
}

output "vector_bucket_arn" {
  value = aws_s3vectors_vector_bucket.this.vector_bucket_arn
}

output "vector_index_name" {
  value = aws_s3vectors_index.this.index_name
}

output "vector_index_arn" {
  value = aws_s3vectors_index.this.index_arn
}
infra/modules/kb/providers.tf
terraform {
  required_providers {
    aws = { source = "hashicorp/aws" }
  }
}
infra/modules/kb/variables.tf
variable "name" {
  type = string
}

variable "corpus_bucket_arn" {
  type = string
}

variable "corpus_bucket_name" {
  type = string
}

variable "vector_bucket_arn" {
  type = string
}

variable "vector_bucket_name" {
  type = string
}

variable "vector_index_arn" {
  type = string
}

variable "vector_index_name" {
  type = string
}

variable "embedding_model_arn" {
  type = string
}
infra/modules/kb/main.tf
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}

locals {
  account_id = data.aws_caller_identity.current.account_id
  region     = data.aws_region.current.region
}

data "aws_iam_policy_document" "kb_assume" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["bedrock.amazonaws.com"]
    }

    condition {
      test     = "StringEquals"
      variable = "aws:SourceAccount"
      values   = [local.account_id]
    }
  }
}

resource "aws_iam_role" "kb" {
  name               = "${var.name}-kb-role"
  assume_role_policy = data.aws_iam_policy_document.kb_assume.json
}

data "aws_iam_policy_document" "kb_inline" {
  statement {
    sid     = "ReadCorpusObjects"
    effect  = "Allow"
    actions = ["s3:GetObject"]
    resources = [
      "${var.corpus_bucket_arn}/*",
    ]
  }

  statement {
    sid     = "ListCorpusBucket"
    effect  = "Allow"
    actions = ["s3:ListBucket"]
    resources = [
      var.corpus_bucket_arn,
    ]
  }

  statement {
    sid    = "UseEmbeddingModel"
    effect = "Allow"
    actions = [
      "bedrock:InvokeModel",
    ]
    resources = [
      var.embedding_model_arn,
    ]
  }

  statement {
    sid    = "S3VectorsIndexOps"
    effect = "Allow"
    actions = [
      "s3vectors:GetVectorBucket",
      "s3vectors:GetIndex",
      "s3vectors:PutVectors",
      "s3vectors:GetVectors",
      "s3vectors:ListVectors",
      "s3vectors:DeleteVectors",
      "s3vectors:QueryVectors",
    ]
    resources = [
      var.vector_bucket_arn,
      var.vector_index_arn,
    ]
  }
}

resource "aws_iam_role_policy" "kb" {
  name   = "${var.name}-kb-policy"
  role   = aws_iam_role.kb.id
  policy = data.aws_iam_policy_document.kb_inline.json
}

resource "aws_bedrockagent_knowledge_base" "main" {
  name        = "${var.name}-kb"
  role_arn    = aws_iam_role.kb.arn
  description = "Strict RAG用のKnowledge Base。検索時に`domain`メタデータでアクセス範囲を制限します。"

  knowledge_base_configuration {
    type = "VECTOR"
    vector_knowledge_base_configuration {
      embedding_model_arn = var.embedding_model_arn
    }
  }

  storage_configuration {
    type = "S3_VECTORS"
    s3_vectors_configuration {
      index_arn = var.vector_index_arn
    }
  }

  depends_on = [aws_iam_role_policy.kb]
}

resource "aws_bedrockagent_data_source" "corpus" {
  name              = "${var.name}-s3-corpus"
  knowledge_base_id = aws_bedrockagent_knowledge_base.main.id

  data_source_configuration {
    type = "S3"
    s3_configuration {
      bucket_arn         = var.corpus_bucket_arn
      inclusion_prefixes = ["corpus/"]
    }
  }

  data_deletion_policy = "DELETE"
}
infra/modules/kb/outputs.tf
output "knowledge_base_id" {
  value = aws_bedrockagent_knowledge_base.main.id
}

output "knowledge_base_arn" {
  value = aws_bedrockagent_knowledge_base.main.arn
}

output "data_source_id" {
  value = aws_bedrockagent_data_source.corpus.data_source_id
}

output "kb_role_arn" {
  value = aws_iam_role.kb.arn
}
infra/modules/api/providers.tf
terraform {
  required_providers {
    aws     = { source = "hashicorp/aws" }
    archive = { source = "hashicorp/archive" }
  }
}
infra/modules/api/variables.tf
variable "name" {
  type = string
}

variable "kb_id" {
  type = string
}

variable "generation_model_arn" {
  type = string
}

variable "allowed_groups" {
  type        = list(string)
  description = "許可するドメイングループのホワイトリスト。LambdaにはALLOWED_GROUPS環境変数として渡し、検索前に呼び出し元の`cognito:groups`クレームと突き合わせます。"
}

variable "cognito_issuer" {
  type = string
}

variable "cognito_audience" {
  type = string
}

variable "lambda_timeout" {
  type    = number
  default = 30
}

variable "lambda_memory" {
  type    = number
  default = 512
}

variable "num_results" {
  type    = number
  default = 8
}
infra/modules/api/main.tf
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}

locals {
  account_id      = data.aws_caller_identity.current.account_id
  region          = data.aws_region.current.region
  partition       = data.aws_partition.current.partition
  lambda_src_dir  = "${path.root}/../../../lambda/rag_handler"
  lambda_zip_path = "${path.module}/.build/rag_handler.zip"
}

data "archive_file" "lambda" {
  type        = "zip"
  source_dir  = local.lambda_src_dir
  output_path = local.lambda_zip_path
  excludes    = ["tests", "tests/__init__.py", "tests/test_filter.py", "__pycache__"]
}

data "aws_iam_policy_document" "lambda_assume" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda" {
  name               = "${var.name}-lambda-role"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}

resource "aws_iam_role_policy_attachment" "lambda_logs" {
  role       = aws_iam_role.lambda.name
  policy_arn = "arn:${local.partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

data "aws_iam_policy_document" "lambda_inline" {
  statement {
    sid    = "BedrockKbRetrieveAndGenerate"
    effect = "Allow"
    actions = [
      "bedrock:Retrieve",
      "bedrock:RetrieveAndGenerate",
    ]
    resources = [
      "arn:${local.partition}:bedrock:${local.region}:${local.account_id}:knowledge-base/${var.kb_id}",
    ]
  }

  statement {
    sid    = "BedrockInvokeGenerationModel"
    effect = "Allow"
    actions = [
      "bedrock:InvokeModel",
      "bedrock:GetInferenceProfile",
    ]
    resources = [
      var.generation_model_arn,
      "arn:${local.partition}:bedrock:*::foundation-model/anthropic.claude-sonnet-4-*",
    ]
  }
}

resource "aws_iam_role_policy" "lambda" {
  name   = "${var.name}-lambda-policy"
  role   = aws_iam_role.lambda.id
  policy = data.aws_iam_policy_document.lambda_inline.json
}

resource "aws_cloudwatch_log_group" "lambda" {
  name              = "/aws/lambda/${var.name}-rag-handler"
  retention_in_days = 14
}

resource "aws_lambda_function" "rag_handler" {
  function_name    = "${var.name}-rag-handler"
  role             = aws_iam_role.lambda.arn
  handler          = "handler.handler"
  runtime          = "python3.14"
  filename         = data.archive_file.lambda.output_path
  source_code_hash = data.archive_file.lambda.output_base64sha256
  timeout          = var.lambda_timeout
  memory_size      = var.lambda_memory

  environment {
    variables = {
      KB_ID                = var.kb_id
      GENERATION_MODEL_ARN = var.generation_model_arn
      ALLOWED_GROUPS       = join(",", var.allowed_groups)
      NUM_RESULTS          = tostring(var.num_results)
    }
  }

  depends_on = [
    aws_iam_role_policy.lambda,
    aws_iam_role_policy_attachment.lambda_logs,
    aws_cloudwatch_log_group.lambda,
  ]
}

resource "aws_apigatewayv2_api" "this" {
  name          = "${var.name}-api"
  protocol_type = "HTTP"

  cors_configuration {
    allow_methods = ["POST", "OPTIONS"]
    allow_headers = ["authorization", "content-type"]
    allow_origins = ["http://localhost:8501"]
    max_age       = 300
  }
}

resource "aws_apigatewayv2_authorizer" "jwt" {
  api_id           = aws_apigatewayv2_api.this.id
  authorizer_type  = "JWT"
  identity_sources = ["$request.header.Authorization"]
  name             = "${var.name}-cognito-jwt"

  jwt_configuration {
    audience = [var.cognito_audience]
    issuer   = var.cognito_issuer
  }
}

resource "aws_apigatewayv2_integration" "lambda" {
  api_id                 = aws_apigatewayv2_api.this.id
  integration_type       = "AWS_PROXY"
  integration_uri        = aws_lambda_function.rag_handler.invoke_arn
  payload_format_version = "2.0"
}

resource "aws_apigatewayv2_route" "ask" {
  api_id             = aws_apigatewayv2_api.this.id
  route_key          = "POST /ask"
  target             = "integrations/${aws_apigatewayv2_integration.lambda.id}"
  authorization_type = "JWT"
  authorizer_id      = aws_apigatewayv2_authorizer.jwt.id
}

resource "aws_apigatewayv2_stage" "default" {
  api_id      = aws_apigatewayv2_api.this.id
  name        = "$default"
  auto_deploy = true

  default_route_settings {
    throttling_rate_limit  = 5
    throttling_burst_limit = 10
  }

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.api.arn
    format = jsonencode({
      requestId    = "$context.requestId"
      sourceIp     = "$context.identity.sourceIp"
      requestTime  = "$context.requestTime"
      route        = "$context.routeKey"
      status       = "$context.status"
      protocol     = "$context.protocol"
      integration  = "$context.integrationStatus"
      claimsSub    = "$context.authorizer.claims.sub"
      claimsGroups = "$context.authorizer.claims.cognito:groups"
    })
  }
}

resource "aws_cloudwatch_log_group" "api" {
  name              = "/aws/apigw/${var.name}-api"
  retention_in_days = 14
}

resource "aws_lambda_permission" "apigw" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.rag_handler.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_apigatewayv2_api.this.execution_arn}/*/*"
}
infra/modules/api/outputs.tf
output "invoke_url" {
  value = aws_apigatewayv2_api.this.api_endpoint
}

output "ask_route" {
  value = "POST ${aws_apigatewayv2_api.this.api_endpoint}/ask"
}

output "lambda_name" {
  value = aws_lambda_function.rag_handler.function_name
}

output "lambda_role_arn" {
  value = aws_iam_role.lambda.arn
}
infra/modules/cognito/variables.tf
variable "name" {
  type = string
}

variable "domains" {
  type        = list(string)
  description = "グループ名。それぞれCognito groupになり、文書の`domain`メタデータの値にもなります。"
}

variable "callback_urls" {
  type = list(string)
}

variable "logout_urls" {
  type = list(string)
}

variable "demo_users" {
  type = map(object({
    email  = string
    groups = list(string)
  }))
}

variable "domain_suffix" {
  type        = string
  description = "Cognito Hosted UIのドメインプレフィックスをグローバルに一意にするために付けるランダムサフィックス。"
}
infra/modules/cognito/main.tf
resource "aws_cognito_user_pool" "this" {
  name = "${var.name}-pool"

  username_attributes      = ["email"]
  auto_verified_attributes = ["email"]

  password_policy {
    minimum_length                   = 12
    require_lowercase                = true
    require_uppercase                = true
    require_numbers                  = true
    require_symbols                  = false
    temporary_password_validity_days = 7
  }

  admin_create_user_config {
    allow_admin_create_user_only = true
  }

  account_recovery_setting {
    recovery_mechanism {
      name     = "verified_email"
      priority = 1
    }
  }

  schema {
    name                = "email"
    attribute_data_type = "String"
    required            = true
    mutable             = true
    string_attribute_constraints {
      min_length = 1
      max_length = 256
    }
  }
}

resource "aws_cognito_user_pool_domain" "this" {
  domain       = "${var.name}-${var.domain_suffix}"
  user_pool_id = aws_cognito_user_pool.this.id
}

resource "aws_cognito_user_pool_client" "this" {
  name         = "${var.name}-app"
  user_pool_id = aws_cognito_user_pool.this.id

  generate_secret               = false
  prevent_user_existence_errors = "ENABLED"

  allowed_oauth_flows_user_pool_client = true
  allowed_oauth_flows                  = ["code"]
  allowed_oauth_scopes                 = ["openid", "email", "profile"]
  callback_urls                        = var.callback_urls
  logout_urls                          = var.logout_urls
  supported_identity_providers         = ["COGNITO"]

  explicit_auth_flows = [
    "ALLOW_USER_SRP_AUTH",
    "ALLOW_REFRESH_TOKEN_AUTH",
    "ALLOW_ADMIN_USER_PASSWORD_AUTH",
  ]

  id_token_validity      = 1
  access_token_validity  = 1
  refresh_token_validity = 30

  token_validity_units {
    id_token      = "hours"
    access_token  = "hours"
    refresh_token = "days"
  }

  read_attributes  = ["email", "email_verified"]
  write_attributes = ["email"]
}

resource "aws_cognito_user_group" "this" {
  for_each     = toset(var.domains)
  name         = each.value
  user_pool_id = aws_cognito_user_pool.this.id
  description  = "domain '${each.value}' 用のACLグループ。このグループのメンバーは、同じdomainが付いた文書を検索できます。"
}

resource "random_password" "demo_user" {
  for_each = var.demo_users

  length           = 16
  special          = true
  override_special = "!@#$%^&*()-_=+"
  min_lower        = 2
  min_upper        = 2
  min_numeric      = 2
  min_special      = 1
}

resource "aws_cognito_user" "demo" {
  for_each = var.demo_users

  user_pool_id             = aws_cognito_user_pool.this.id
  username                 = each.value.email
  password                 = random_password.demo_user[each.key].result
  message_action           = "SUPPRESS"
  desired_delivery_mediums = ["EMAIL"]

  attributes = {
    email          = each.value.email
    email_verified = "true"
  }
}

locals {
  user_group_pairs = merge([
    for user_key, user in var.demo_users : {
      for group in user.groups :
      "${user_key}_${group}" => { user_key = user_key, group = group }
    }
  ]...)
}

resource "aws_cognito_user_in_group" "demo" {
  for_each = local.user_group_pairs

  user_pool_id = aws_cognito_user_pool.this.id
  username     = aws_cognito_user.demo[each.value.user_key].username
  group_name   = aws_cognito_user_group.this[each.value.group].name
}
infra/modules/cognito/outputs.tf
output "user_pool_id" {
  value = aws_cognito_user_pool.this.id
}

output "user_pool_arn" {
  value = aws_cognito_user_pool.this.arn
}

output "client_id" {
  value = aws_cognito_user_pool_client.this.id
}

output "hosted_ui_domain" {
  description = "Cognito Hosted UIのドメインプレフィックス(AWS側のサフィックスを除く)。完全なURLは https://<domain>.auth.<region>.amazoncognito.com です。"
  value       = aws_cognito_user_pool_domain.this.domain
}

output "issuer" {
  value = "https://cognito-idp.${data.aws_region.current.region}.amazonaws.com/${aws_cognito_user_pool.this.id}"
}

output "demo_user_initial_passwords" {
  description = "デモユーザーの初期パスワード。Cognitoにより初回ログイン時にパスワード変更が要求されます。"
  value = {
    for k, v in var.demo_users :
    k => {
      email    = v.email
      password = random_password.demo_user[k].result
      groups   = v.groups
    }
  }
  sensitive = true
}

data "aws_region" "current" {}
infra/modules/observability/providers.tf
terraform {
  required_providers {
    aws = { source = "hashicorp/aws" }
  }
}
infra/modules/observability/variables.tf
variable "name" {
  type = string
}

variable "retention_days" {
  type    = number
  default = 30
}
infra/modules/observability/main.tf
resource "aws_cloudwatch_log_group" "audit" {
  name              = "/strict-rag/${var.name}/audit"
  retention_in_days = var.retention_days
}
infra/modules/observability/outputs.tf
output "audit_log_group_name" {
  value = aws_cloudwatch_log_group.audit.name
}
lambda/rag_handler/handler.py
"""Strict RAGのLambdaハンドラー。

ACL強制の前提:
  1. グループ所属の信頼できる情報源は、API GatewayのJWT Authorizerが検証して
     イベントに付与したCognito ID tokenのクレームです。bodyは信頼しない入力として扱い、
     `filter`が含まれていても無視します。
  2. 検索フィルターは、許可されたドメイングループのホワイトリストからサーバー側で生成します。
  3. 呼び出し元に一致するドメイングループがない場合は、Bedrockを呼ぶ前にHTTP 403を返します。
"""

import json
import logging
import os

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

KB_ID = os.environ["KB_ID"]
GENERATION_MODEL_ARN = os.environ["GENERATION_MODEL_ARN"]
ALLOWED_GROUPS = frozenset(
    g.strip() for g in os.environ.get("ALLOWED_GROUPS", "").split(",") if g.strip()
)
NUM_RESULTS = int(os.environ.get("NUM_RESULTS", "8"))

_runtime = None


def _get_runtime():
    global _runtime
    if _runtime is None:
        _runtime = boto3.client("bedrock-agent-runtime")
    return _runtime


def parse_groups(claims):
    """JWTクレームから呼び出し元のドメイングループを取り出します。

    このLambdaはAPI Gateway HTTP API連携のpayload format 2.0で呼び出されます。
    そのイベント形式やテスト用スタブでは、`cognito:groups`がリストの場合もあれば、
    "[finance, it]"のようなJSON風文字列の場合もあるため、両方を処理します。
    そのうえでALLOWED_GROUPSと突き合わせ、Cognito group名の誤設定によって
    任意のドメインへ権限昇格しないようにします。
    """
    raw = claims.get("cognito:groups", [])
    if isinstance(raw, str):
        stripped = raw.strip().lstrip("[").rstrip("]")
        raw = [s for s in stripped.replace(",", " ").split() if s]
    elif not isinstance(raw, list):
        raw = []
    return sorted({g for g in raw if g in ALLOWED_GROUPS})


def build_filter(groups):
    return {"in": {"key": "domain", "value": groups}}


def _response(status, body):
    return {
        "statusCode": status,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body, ensure_ascii=False),
    }


def _shape_citations(citations):
    shaped = []
    for c in citations or []:
        for ref in c.get("retrievedReferences", []) or []:
            shaped.append(
                {
                    "uri": (ref.get("location") or {}).get("s3Location", {}).get("uri"),
                    "metadata": ref.get("metadata") or {},
                }
            )
    return shaped


def handler(event, _ctx):
    try:
        claims = event["requestContext"]["authorizer"]["jwt"]["claims"]
    except (KeyError, TypeError):
        return _response(401, {"error": "missing JWT claims"})

    groups = parse_groups(claims)
    if not groups:
        return _response(403, {"error": "no allowed domain group on caller"})

    try:
        body = json.loads(event.get("body") or "{}")
    except json.JSONDecodeError:
        return _response(400, {"error": "invalid JSON body"})

    question = body.get("question")
    if not isinstance(question, str) or not question.strip():
        return _response(400, {"error": "`question` is required"})

    if "filter" in body:
        logger.warning(
            "ignoring client-supplied filter for user=%s", claims.get("cognito:username")
        )

    server_filter = build_filter(groups)

    resp = _get_runtime().retrieve_and_generate(
        input={"text": question},
        retrieveAndGenerateConfiguration={
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": KB_ID,
                "modelArn": GENERATION_MODEL_ARN,
                "retrievalConfiguration": {
                    "vectorSearchConfiguration": {
                        "numberOfResults": NUM_RESULTS,
                        "filter": server_filter,
                    }
                },
            },
        },
    )

    return _response(
        200,
        {
            "answer": resp.get("output", {}).get("text", ""),
            "citations": _shape_citations(resp.get("citations")),
            "user_groups": groups,
            "applied_filter": server_filter,
        },
    )
lambda/rag_handler/requirements.txt
boto3>=1.35.0
scripts/ingest_allganize.py
"""AllganizeのPDFコーパスを取得し、S3サイドカーメタデータを書き、KB取り込みを開始します。

AllganizeのHugging Faceデータセット`allganize/RAG-Evaluation-Dataset-JA`は
CSVメタデータのみを公開しています。実際のPDFは`documents.csv`の`url`列が参照する
外部公開サイトにあります。このスクリプトでは以下を行います。

    1. Hugging Faceから`documents.csv`をダウンロードする。
    2. 各行について、`url`からPDFをダウンロードする。
    3. PDFを`s3://{corpus_bucket}/corpus/{domain}/{doc_id}_{file_name}`へアップロードする。
    4. `domain` ACL属性を含む対応する`*.metadata.json`サイドカーを書く。
    5. `bedrock-agent.start_ingestion_job`を呼び、完了までポーリングする。

`domain`サイドカー属性はLambdaの検索フィルターの基準になるため、サイドカー形式は
AWSのS3データソースが期待する形式に合わせます。

使い方:

    python scripts/ingest_allganize.py \\
        --kb-id $KB_ID --data-source-id $DS_ID \\
        --corpus-bucket $CORPUS_BUCKET

便利なオプション:
    --dry-run                 S3へのアップロードと取り込み開始を行わない
    --limit N                 フィルタリング後の先頭N件だけ処理する
    --domain finance,it       指定したdomainだけに絞る
    --skip-ingest             アップロードだけ行い、取り込みジョブは開始しない
    --resume                  S3キーがすでに存在するPDFをスキップする
"""

from __future__ import annotations

import argparse
import csv
import hashlib
import io
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from typing import Iterable

import boto3
import urllib.request
import urllib.error

DOCUMENTS_CSV_URL = (
    "https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-JA"
    "/raw/main/documents.csv"
)

DEFAULT_DOMAINS = {"finance", "it", "manufacturing", "public", "retail"}

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stderr
)
log = logging.getLogger("ingest")


@dataclass
class Doc:
    domain: str
    title: str
    page: str
    url: str
    file_name: str
    publisher: str

    @property
    def doc_id(self) -> str:
        return hashlib.sha256(self.url.encode("utf-8")).hexdigest()[:12]

    @property
    def s3_key(self) -> str:
        safe_name = self.file_name.replace(" ", "_")
        return f"corpus/{self.domain}/{self.doc_id}_{safe_name}"

    @property
    def metadata_key(self) -> str:
        return f"{self.s3_key}.metadata.json"


def fetch_documents_csv(url: str = DOCUMENTS_CSV_URL) -> list[Doc]:
    log.info("Fetching %s", url)
    with urllib.request.urlopen(url, timeout=30) as resp:
        text = resp.read().decode("utf-8")
    reader = csv.DictReader(io.StringIO(text))
    docs: list[Doc] = []
    for row in reader:
        norm = {k.strip(): (v or "").strip() for k, v in row.items()}
        docs.append(
            Doc(
                domain=norm["domain"],
                title=norm.get("title", ""),
                page=norm.get("page", ""),
                url=norm["url"],
                file_name=norm["file_name"],
                publisher=norm.get("publisher", ""),
            )
        )
    log.info("Loaded %d documents from documents.csv", len(docs))
    return docs


def filter_docs(
    docs: Iterable[Doc],
    allowed_domains: set[str] | None,
    limit: int | None,
) -> list[Doc]:
    out: list[Doc] = []
    for d in docs:
        if allowed_domains and d.domain not in allowed_domains:
            continue
        out.append(d)
        if limit is not None and len(out) >= limit:
            break
    return out


def download_pdf(url: str, timeout: int = 60) -> bytes:
    req = urllib.request.Request(url, headers={"User-Agent": "strict-rag-ingest/1.0"})
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return resp.read()


def build_sidecar(doc: Doc) -> dict:
    """Bedrock KBのS3データソースで使うサイドカーファイル形式。

    S3データソースのメタデータファイルのドキュメントにある
    **map**形式(key -> { value: {type, stringValue}, includeForEmbedding })を使います。
    Bedrockは簡易的なフラット形式(key -> value)もサポートしていますが、
    これはincludeForEmbedding=false相当で、属性ごとの埋め込み挙動を表現できません。
    """
    return {
        "metadataAttributes": {
            "domain": {
                "value": {"type": "STRING", "stringValue": doc.domain},
                "includeForEmbedding": False,
            },
            "doc_id": {
                "value": {"type": "STRING", "stringValue": doc.doc_id},
                "includeForEmbedding": False,
            },
            "source_url": {
                "value": {"type": "STRING", "stringValue": doc.url},
                "includeForEmbedding": False,
            },
            "publisher": {
                "value": {"type": "STRING", "stringValue": doc.publisher},
                "includeForEmbedding": False,
            },
            "title": {
                "value": {"type": "STRING", "stringValue": doc.title},
                "includeForEmbedding": True,
            },
        }
    }


def already_uploaded(s3, bucket: str, key: str) -> bool:
    try:
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except s3.exceptions.ClientError:
        return False


def upload_doc(
    s3,
    bucket: str,
    doc: Doc,
    pdf_bytes: bytes,
    sidecar: dict,
    dry_run: bool,
) -> None:
    if dry_run:
        log.info("[dry-run] would upload %s (%d bytes) + %s", doc.s3_key, len(pdf_bytes), doc.metadata_key)
        return
    s3.put_object(
        Bucket=bucket,
        Key=doc.s3_key,
        Body=pdf_bytes,
        ContentType="application/pdf",
    )
    s3.put_object(
        Bucket=bucket,
        Key=doc.metadata_key,
        Body=json.dumps(sidecar, ensure_ascii=False).encode("utf-8"),
        ContentType="application/json",
    )


def start_and_wait(
    bedrock_agent,
    kb_id: str,
    ds_id: str,
    timeout_minutes: int = 60,
    poll_seconds: int = 10,
) -> dict:
    log.info("Starting ingestion job kb=%s ds=%s", kb_id, ds_id)
    started = bedrock_agent.start_ingestion_job(
        knowledgeBaseId=kb_id, dataSourceId=ds_id
    )
    job_id = started["ingestionJob"]["ingestionJobId"]
    log.info("Ingestion job started: %s", job_id)

    deadline = time.time() + timeout_minutes * 60
    while time.time() < deadline:
        job = bedrock_agent.get_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id,
            ingestionJobId=job_id,
        )["ingestionJob"]
        status = job["status"]
        stats = job.get("statistics", {})
        log.info(
            "  job %s status=%s indexed=%s scanned=%s failed=%s",
            job_id,
            status,
            stats.get("numberOfDocumentsIndexed", "-"),
            stats.get("numberOfDocumentsScanned", "-"),
            stats.get("numberOfDocumentsFailed", "-"),
        )
        if status in ("COMPLETE", "FAILED", "STOPPED"):
            if status != "COMPLETE":
                log.error("Ingestion job ended with status=%s reasons=%s", status, job.get("failureReasons"))
            return job
        time.sleep(poll_seconds)

    raise TimeoutError(f"Ingestion job {job_id} did not complete within {timeout_minutes}m")


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="AllganizeのPDFをBedrock KBへ取り込む")
    p.add_argument("--corpus-bucket", required=False, default=os.environ.get("CORPUS_BUCKET"))
    p.add_argument("--kb-id", required=False, default=os.environ.get("KB_ID"))
    p.add_argument("--data-source-id", required=False, default=os.environ.get("DATA_SOURCE_ID"))
    p.add_argument("--region", required=False, default=os.environ.get("AWS_REGION"))
    p.add_argument("--limit", type=int, default=None)
    p.add_argument("--domain", default=None, help="取り込み対象にするdomainのカンマ区切りリスト")
    p.add_argument("--dry-run", action="store_true")
    p.add_argument("--skip-ingest", action="store_true")
    p.add_argument("--resume", action="store_true", help="S3キーがすでに存在する文書をスキップする")
    p.add_argument("--errors-out", default="ingest_errors.jsonl")
    return p.parse_args()


def main() -> int:
    args = parse_args()

    if not args.dry_run and not args.corpus_bucket:
        log.error("--corpus-bucket is required (or set $CORPUS_BUCKET) unless --dry-run")
        return 2
    if not args.skip_ingest and not args.dry_run and not (args.kb_id and args.data_source_id):
        log.error("--kb-id and --data-source-id are required unless --skip-ingest or --dry-run")
        return 2

    allowed: set[str] | None = (
        {d.strip() for d in args.domain.split(",") if d.strip()} if args.domain else None
    )
    if allowed:
        unknown = allowed - DEFAULT_DOMAINS
        if unknown:
            log.warning("Domains not in the default Allganize set: %s", sorted(unknown))

    session = boto3.Session(region_name=args.region) if args.region else boto3.Session()
    s3 = session.client("s3") if not args.dry_run else None
    bedrock_agent = session.client("bedrock-agent") if not (args.dry_run or args.skip_ingest) else None

    docs = fetch_documents_csv()
    docs = filter_docs(docs, allowed, args.limit)
    log.info("After filter: %d documents", len(docs))

    errors: list[dict] = []
    uploaded = 0
    skipped = 0

    for i, doc in enumerate(docs, start=1):
        log.info("[%d/%d] %s/%s  %s", i, len(docs), doc.domain, doc.file_name, doc.url)
        try:
            if args.resume and not args.dry_run and already_uploaded(s3, args.corpus_bucket, doc.s3_key):
                log.info("  → skip (already in S3)")
                skipped += 1
                continue
            pdf_bytes = download_pdf(doc.url)
            sidecar = build_sidecar(doc)
            upload_doc(s3, args.corpus_bucket, doc, pdf_bytes, sidecar, dry_run=args.dry_run)
            uploaded += 1
        except urllib.error.HTTPError as e:
            err = {"file_name": doc.file_name, "domain": doc.domain, "url": doc.url, "error": f"HTTPError {e.code}: {e.reason}"}
            log.error("  HTTP %d for %s", e.code, doc.url)
            errors.append(err)
        except Exception as e:  # noqa: BLE001
            err = {"file_name": doc.file_name, "domain": doc.domain, "url": doc.url, "error": repr(e)}
            log.exception("  failed: %s", e)
            errors.append(err)

    if errors:
        with open(args.errors_out, "w", encoding="utf-8") as f:
            for e in errors:
                f.write(json.dumps(e, ensure_ascii=False) + "\n")
        log.warning("%d documents failed — see %s", len(errors), args.errors_out)

    log.info("Upload phase done: uploaded=%d skipped=%d failed=%d", uploaded, skipped, len(errors))

    if args.skip_ingest or args.dry_run:
        log.info("Ingestion job skipped (--skip-ingest or --dry-run).")
        return 0

    job = start_and_wait(bedrock_agent, args.kb_id, args.data_source_id)
    return 0 if job["status"] == "COMPLETE" else 1


if __name__ == "__main__":
    sys.exit(main())
scripts/requirements.txt
boto3>=1.35.0

単一Knowledge Base + metadata filterにした理由

ドメインごとにKnowledge Baseを分ける方法も考えましたが、今回は単一Knowledge Baseに全ドキュメントを入れて、metadata filterで検索範囲を制限する構成にしました。

観点 単一Knowledge Base + filter ドメインごとにKnowledge Base
構成 シンプル ドメイン数だけ増える
複数ドメイン検索 in filterで自然にできる 結果のマージが必要
取り込み ingestion pipelineが1つ ドメインごとに分けて管理できる
分離 metadataで論理的に分ける 物理的に分けやすい

どちらが正解というより、制御したい単位に合わせて選ぶものだと思っています。

観点 単一Knowledge Base + metadata filterが向くケース acl.jsonを使う別設計が向くケース
制御単位 finance/itのようなグループ単位で分けたい ユーザー単位、文書単位で細かく分けたい
権限情報の置き場所 domainなど少数のmetadataで表せる acl.jsonを権限のsource of truthにしたい
検索対象の決め方 1つのKnowledge Baseに対してmetadata filterをかけたい acl.jsonを見て、検索前に対象のKnowledge Base/index/document scopeを決めたい
分離の考え方 metadataによる論理分離でよい Knowledge Baseやindexを分ける設計にしたい
検索方式 semantic searchが中心 hybrid searchやkeyword条件も含めて、対象文書集合を組み立てたい

今回は、finance/it/manufacturingのようなドメイン単位で検索範囲を分けられれば十分なので、単一Knowledge Base + metadata filterにしています。ユーザー単位や文書単位で細かく制御したい場合は、acl.jsonを権限のsource of truthにして、そこから許可されたKnowledge Base/S3 Vectors index/document scopeにルーティングする設計も選択肢になります。

注意点として、acl.jsonを置くだけではBedrock Knowledge BasesのRetrievalFilterにはなりません。厳密に制限したい場合は、acl.jsonをLambdaなど検索前の処理で参照し、対象を絞る必要があります。

今回の実装では、RetrievalFilterはLambdaがCognitoのJWT claimsから生成します。

実装解説

Step.1 TerraformでS3 VectorsとKnowledge Baseを作成する

Terraform AWS providerは6.46を使っています。

S3 VectorsとBedrock Knowledge Basesは以下のAWS providerリソースで作成しています。

  • aws_s3vectors_vector_bucket
  • aws_s3vectors_index
  • aws_bedrockagent_knowledge_base
  • aws_bedrockagent_data_source

S3 Vectors側は以下のように作成しています。

resource "aws_s3vectors_vector_bucket" "this" {
  vector_bucket_name = "${var.name}-vec-${var.suffix}"

  encryption_configuration {
    sse_type = "AES256"
  }
}

resource "aws_s3vectors_index" "this" {
  index_name         = "${var.name}-idx"
  vector_bucket_name = aws_s3vectors_vector_bucket.this.vector_bucket_name

  data_type       = "float32"
  dimension       = var.embedding_dimension
  distance_metric = "cosine"

  metadata_configuration {
    non_filterable_metadata_keys = [
      "AMAZON_BEDROCK_TEXT",
      "AMAZON_BEDROCK_METADATA",
    ]
  }
}

domainはfilterに使いたいのでfilterable metadataとして残します。

一方で、Bedrock Knowledge BasesがS3 Vectorsに保存するチャンク本文やBedrock管理メタデータはサイズが大きくなりやすいため、non_filterable_metadata_keysに逃がしています。

実際にaws s3vectors list-vectors --return-metadataで確認すると、Bedrock KBが保存したvectorにはAMAZON_BEDROCK_TEXT/AMAZON_BEDROCK_METADATA/domainなどが入っていました。

{
  "domain": "manufacturing",
  "doc_id": "cf003e4fe3cc",
  "title": "JAS活用マニュアル",
  "AMAZON_BEDROCK_TEXT": "...",
  "AMAZON_BEDROCK_METADATA": "..."
}

non_filterable_metadata_keysに入れたキーはfilter条件には使えませんが、metadata自体を保存しないという意味ではありません。そのため、--return-metadataで確認するとAMAZON_BEDROCK_TEXTAMAZON_BEDROCK_METADATAも返ります。

S3 Vectorsのmetadataはデフォルトでfilterableになります。Bedrock KB + S3 Vectorsではcustom metadataに上限があり、S3 Vectorsのmetadata filteringドキュメントではcustom metadataは1KB、metadata keysは35個までと説明されています。検索条件に使わない大きなキーはnon-filterableにしておくのが安全です。

Knowledge Base側はstorage_configuration.type = "S3_VECTORS"を指定します。

resource "aws_bedrockagent_knowledge_base" "main" {
  name        = "${var.name}-kb"
  role_arn    = aws_iam_role.kb.arn
  description = "Strict RAG用のKnowledge Base。検索時に`domain`メタデータでアクセス範囲を制限します。"

  knowledge_base_configuration {
    type = "VECTOR"
    vector_knowledge_base_configuration {
      embedding_model_arn = var.embedding_model_arn
    }
  }

  storage_configuration {
    type = "S3_VECTORS"
    s3_vectors_configuration {
      index_arn = var.vector_index_arn
    }
  }

  depends_on = [aws_iam_role_policy.kb]
}

掲載コードのvariables.tfterraform.tfvars.exampleでは、埋め込みモデルを差し替えやすいようにcohere.embed-multilingual-v3をデフォルトにしています。今回の検証では、実値のterraform.tfvarsembedding_model_id = "amazon.titan-embed-text-v2:0"generation_model_id = "jp.anthropic.claude-sonnet-4-6"に上書きしました。2026年5月24日時点で、AWS公式のClaude Sonnet 4.6 Model Cardにはbase model IDとしてanthropic.claude-sonnet-4-6、JPのGeo inference IDとしてjp.anthropic.claude-sonnet-4-6が載っています。

生成モデルにはinference profile IDまたはARNを指定できます。今回はTerraform側でinference profile ARNを組み立ててLambdaに渡しています。

Step.2 S3にドキュメントとmetadataを置く

Bedrock Knowledge BasesのS3 data sourceでは、元ドキュメントと同じ場所に*.metadata.jsonを置くことでmetadataを付与できます。

今回の配置は以下です。

corpus/finance/d45a82c097ef_230829_main.pdf
corpus/finance/d45a82c097ef_230829_main.pdf.metadata.json

metadata sidecarは以下の形式にしています。

{
  "metadataAttributes": {
    "domain": {
      "value": { "type": "STRING", "stringValue": "finance" },
      "includeForEmbedding": false
    },
    "doc_id": {
      "value": { "type": "STRING", "stringValue": "d45a82c097ef" },
      "includeForEmbedding": false
    },
    "source_url": {
      "value": { "type": "STRING", "stringValue": "https://example.com/source.pdf" },
      "includeForEmbedding": false
    },
    "publisher": {
      "value": { "type": "STRING", "stringValue": "金融庁" },
      "includeForEmbedding": false
    },
    "title": {
      "value": { "type": "STRING", "stringValue": "2023事務年度 金融行政方針" },
      "includeForEmbedding": true
    }
  }
}

domainは検索範囲制御用のキーなので、includeForEmbeddingfalseにしています。検索の意味的な近さにdomain名を混ぜたいわけではなく、filter条件としてだけ使いたいためです。

Bedrock Knowledge BasesのS3 data source connectorでは、上記のようにmetadataAttributes配下でvalueincludeForEmbeddingを指定する形式が使えます。{"metadataAttributes": {"domain": "finance"}}のような簡易形式もありますが、その場合はincludeForEmbedding: false相当になり、属性ごとのembedding含有有無を指定できません。

S3 data sourceのmetadata fileは、Bedrock Knowledge BasesのS3 data source connectorドキュメントにある通り、元ファイル名の末尾に.metadata.jsonを付け、同じS3 prefixに置く必要があります。

ここでacl.jsonのような独自ファイルを別に置かず、Bedrock Knowledge Basesのmetadataとしてdomainを持たせているのは、RetrievalFilterがKnowledge Baseに取り込まれたmetadata attributeに対して効くためです。

別ファイルのacl.jsonにアクセス制御情報を持たせるだけだと、Bedrock側のvector searchではその情報をfilter条件として使えません。Lambdaでacl.jsonを読んで検索後に除外する設計もできますが、その場合は範囲外チャンクが一度検索結果に出てきてしまいます。今回はretrievalの段階で絞り込みたいので、PDFと一緒に*.metadata.jsonを置き、チャンクにdomain metadataを持たせる構成にしています。

取り込み前にdry-runで数件だけ確認します。

uv run python scripts/ingest_allganize.py --dry-run --limit 3

Terraformのoutputから必要な環境変数をexportして、実際に取り込みます。

export AWS_REGION="$(terraform -chdir=infra/envs/dev output -raw region)"
export CORPUS_BUCKET="$(terraform -chdir=infra/envs/dev output -raw corpus_bucket_name)"
export KB_ID="$(terraform -chdir=infra/envs/dev output -raw knowledge_base_id)"
export DATA_SOURCE_ID="$(terraform -chdir=infra/envs/dev output -raw data_source_id)"

uv run python scripts/ingest_allganize.py --resume

取り込みスクリプトでは、Hugging Faceのdocuments.csvを読み、PDFをダウンロードしてS3にアップロードし、metadata sidecarを生成しています。

def build_sidecar(doc: Doc) -> dict:
    return {
        "metadataAttributes": {
            "domain": {
                "value": {"type": "STRING", "stringValue": doc.domain},
                "includeForEmbedding": False,
            },
            "doc_id": {
                "value": {"type": "STRING", "stringValue": doc.doc_id},
                "includeForEmbedding": False,
            },
            "source_url": {
                "value": {"type": "STRING", "stringValue": doc.url},
                "includeForEmbedding": False,
            },
            "publisher": {
                "value": {"type": "STRING", "stringValue": doc.publisher},
                "includeForEmbedding": False,
            },
            "title": {
                "value": {"type": "STRING", "stringValue": doc.title},
                "includeForEmbedding": True,
            },
        }
    }

Allganizeのdocuments.csvには65件のURLが載っていますが、参照先は外部の公開PDFです。そのため、404やtimeoutで失敗するものがありました。

自分の環境では31件をS3にアップロードでき、Bedrock KBのingestion jobは以下の結果になりました。

status=COMPLETE
numberOfDocumentsScanned=31
numberOfNewDocumentsIndexed=30
numberOfDocumentsFailed=1

失敗したURLはingest_errors.jsonlに出すようにしています。再実行時は--resumeを付けると、すでにS3にあるPDFはスキップできます。

Step.3 LambdaでRetrievalFilterを作る

LambdaではAPI Gateway JWT Authorizerが検証したCognito ID tokenのclaimsを使います。

ALLOWED_GROUPS = frozenset(
    g.strip() for g in os.environ.get("ALLOWED_GROUPS", "").split(",") if g.strip()
)


def parse_groups(claims):
    raw = claims.get("cognito:groups", [])
    if isinstance(raw, str):
        stripped = raw.strip().lstrip("[").rstrip("]")
        raw = [s for s in stripped.replace(",", " ").split() if s]
    elif not isinstance(raw, list):
        raw = []
    return sorted({g for g in raw if g in ALLOWED_GROUPS})


def build_filter(groups):
    return {"in": {"key": "domain", "value": groups}}

この構成ではAPI Gateway HTTP APIのpayload format 2.0でLambdaを呼び出しています。cognito:groupsはeventの形やテスト用stubによってlistまたは"[finance, it]"のような文字列として扱えるようにし、カンマ区切りとスペース区切りの両方を処理しています。

handler側では、許可グループがない場合はBedrockを呼ぶ前に403を返します。

groups = parse_groups(claims)
if not groups:
    return _response(403, {"error": "no allowed domain group on caller"})

server_filter = build_filter(groups)

RetrieveAndGenerateにはLambdaが生成したserver_filterだけを渡します。

resp = _get_runtime().retrieve_and_generate(
    input={"text": question},
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            "knowledgeBaseId": KB_ID,
            "modelArn": GENERATION_MODEL_ARN,
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults": NUM_RESULTS,
                    "filter": server_filter,
                }
            },
        },
    },
)

Bedrock Knowledge BasesのRetrievalFilterでは、in operatorを使うことで複数ドメインのunion検索ができます。

{
  "in": {
    "key": "domain",
    "value": ["finance", "it"]
  }
}

RetrievalFilterの詳細はAPI Referenceにまとまっています。

Step.4 API GatewayのJWT Authorizerを設定する

API GatewayはHTTP APIを使い、Cognito User PoolをissuerにしたJWT Authorizerを設定しています。

resource "aws_apigatewayv2_authorizer" "jwt" {
  api_id           = aws_apigatewayv2_api.this.id
  authorizer_type  = "JWT"
  identity_sources = ["$request.header.Authorization"]
  name             = "${var.name}-cognito-jwt"

  jwt_configuration {
    audience = [var.cognito_audience]
    issuer   = var.cognito_issuer
  }
}

HTTP APIのJWT Authorizerがtokenを検証し、Lambdaは検証済みclaimsだけを見ます。

エンドユーザーにAWS credentialやbedrock:Retrieve権限を渡さないことも重要です。直接Bedrock Knowledge Baseを呼べるIAM権限をユーザー側に渡すと、API Gateway+Lambdaの認可を通らずに検索できてしまいます。

検証パート

Step.1 検証用ユーザーを確認する

ここでは、デプロイ済みのAPIにCognitoユーザーのID tokenを付けて、検索範囲と回答がどう変わるかを確認します。

確認のためにheadlessでPOST /askを叩いていますが、tokenをコードやシェルに直書きする運用を推奨しているわけではありません。ここでは検索範囲制御の挙動を見るための確認手段として扱います。

検証用のCognitoユーザーは以下のように作成しています。

ユーザー 所属グループ 検索できるdomain
finance_user finance finance
manufacturing_user manufacturing manufacturing
multi_user finance, it finance, it
no_group_user なし なし

この対応から、finance_userでは金融系の文書だけ、manufacturing_userでは製造系の文書だけを使って回答できることを確認します。multi_userは複数グループに所属しているため、financeitのunionで検索できます。

確認する観点は以下です。

  • applied_filterがCognito groupから作られているか
  • 回答内容が許可されたdomainの文書に基づいているか
  • citationsのdomainが許可範囲に収まっているか
  • 権限外の質問で範囲外文書が返らないか

Step.2 finance_userでfinanceの質問を確認する

finance_userはCognito group financeに所属しています。まず金融行政方針について聞きます。

question:
2023事務年度の金融行政方針について教えてください

レスポンスでは、Lambdaが生成したfilterがfinanceになっています。

{
  "user_groups": ["finance"],
  "applied_filter": {
    "in": {
      "key": "domain",
      "value": ["finance"]
    }
  }
}

citationsもfinanceのS3 objectだけになりました。

s3://.../corpus/finance/d45a82c097ef_230829_main.pdf

回答も金融行政方針の内容になっています。

2023事務年度の金融行政方針は、大きく以下の4つの柱で構成されています。

Ⅰ.経済や国民生活の安定を支え、その後の成長へと繋ぐ
Ⅱ.社会課題解決と経済成長を両立させる金融システムの構築
Ⅲ.金融システムの安定・信頼を確保する
Ⅳ.金融行政を絶えず進化・深化させる

finance_userではfinanceの文書だけが検索され、回答もその範囲で生成されていることが確認できました。

Step.3 finance_userでmanufacturingの質問を確認する

次に、finance_userのままmanufacturing側にありそうな質問を投げます。

question:
JAS規格の活用効果について教えてください

この場合もfilterはfinanceのままです。

{
  "applied_filter": {
    "in": {
      "key": "domain",
      "value": ["finance"]
    }
  }
}

回答は以下のようになりました。

提供された検索結果には、JAS規格(日本農林規格)の活用効果に関する情報は含まれていません。
検索結果は主に暗号資産・デジタル資産に関する規制動向や金融規制に関する内容となっており、
JAS規格についての記載は見当たりません。

citationsは0件でした。

citations=[]

JAS規格の文書はmanufacturingにありますが、finance_userのfilterはfinanceのままなので、範囲外文書は回答に使われません。

Step.4 manufacturing_userでmanufacturingの質問を確認する

次に、同じ質問をmanufacturing_userで投げます。

question:
JAS規格の活用効果について教えてください

manufacturing_userのfilterはmanufacturingになります。

{
  "user_groups": ["manufacturing"],
  "applied_filter": {
    "in": {
      "key": "domain",
      "value": ["manufacturing"]
    }
  }
}

今度はJAS規格の活用効果について回答が返りました。

JAS規格・認証の活用効果として、以下のような点が挙げられます。

- 消費者・取引先への品質保証
- 販売・単価へのプラス効果
- 技術の維持・向上
- 輸出への活用(有機JASの場合)

citationsもmanufacturingのJAS活用マニュアルだけになっています。

domain=manufacturing
s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf

Step.5 multi_userでunion検索を確認する

multi_userfinanceitの両方に所属しています。

この場合、Lambdaは以下のfilterを作ります。

{
  "in": {
    "key": "domain",
    "value": ["finance", "it"]
  }
}

in operatorを使うことで、1回のKnowledge Base queryで複数ドメインを検索できます。

IT分野のセキュリティ動向を聞くと、回答はITの文書に基づいて生成されました。

IT分野のセキュリティ動向について、以下のような主要なトピックが挙げられます。

- 能動的サイバー防御(Active Cyber Defense)
- IoT機器のセキュリティ
- 情報通信ネットワークの安全性・信頼性の確保
- IPA「情報セキュリティ10大脅威2025」
- AIとセキュリティ

citationsはitの文書になっています。

domain=it
s3://.../corpus/it/b6bf640ae47a_report.pdf
s3://.../corpus/it/d18854f20c9e_01point.pdf
s3://.../corpus/it/7147f11497ff_skillup_guideline.pdf

multi_userfinanceitのunionで検索できるため、itの質問にも回答できています。

Step.6 no_group_userで403を確認する

許可グループに所属していないno_group_userで同じAPIを叩くと、Bedrockを呼ぶ前に403を返します。

status=403
body={"error":"no allowed domain group on caller"}

検索対象domainを作れないユーザーは、そもそもKnowledge Baseに問い合わせません。

Step.7 retrievalの段階でfilterが効いているか確認する

filterがretrievalの段階で効いていることを確認するため、生成を介さないbedrock-agent-runtime retrieveを直接呼びます。

まず、金融行政方針のクエリをfinance filterで検索します。

aws bedrock-agent-runtime retrieve \
  --knowledge-base-id "$KB_ID" \
  --retrieval-query '{"text":"2023事務年度の金融行政方針"}' \
  --retrieval-configuration '{
    "vectorSearchConfiguration": {
      "numberOfResults": 5,
      "filter": {
        "in": {
          "key": "domain",
          "value": ["finance"]
        }
      }
    }
  }'

結果はfinanceの金融庁文書がscore 0.95以上で返りました。

score=0.970 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
score=0.961 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
score=0.961 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf

同じクエリをmanufacturing filterで検索すると、金融庁文書は返らず、manufacturingの文書だけが返ります。

score=0.592 domain=manufacturing s3://.../corpus/manufacturing/f36bfba8a37e_DM_Industry_Vision_3.0.pdf
score=0.580 domain=manufacturing s3://.../corpus/manufacturing/7be6151d5624_keshouhin-standard.pdf
score=0.573 domain=manufacturing s3://.../corpus/manufacturing/ac869f1a7748_110329-1a.pdf

逆に、JAS規格に関するクエリをfilterなしで検索するとmanufacturingのJAS文書が上位に出ます。

question:
繊維製品の安全基準やJIS規格について

score=0.715 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
score=0.713 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
score=0.711 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf

これをfinance filterに変えると、JAS文書は返らず、finance domainの中で近いものだけが返ります。

score=0.602 domain=finance s3://.../corpus/finance/825c59b2823f_jimukyoku.pdf
score=0.583 domain=finance s3://.../corpus/finance/7157b2ea051e_news240340_1.pdf
score=0.579 domain=finance s3://.../corpus/finance/f2efd977da57_i-xvii.pdf

この確認から、少なくともRetrieve APIの返却結果としては、filterに合わないdomainのチャンクは返っていないことが分かります。

S3 Vectorsのmetadata filteringドキュメントでも、vector searchとfilter evaluationはtandemに実行され、filter条件に合うvectorが返ると説明されています。

最後に

Amazon S3 VectorsとAmazon Bedrock Knowledge Basesを使って、Cognito groupをアクセス制御の条件として扱う制限付きRAGを構築しました。

今回確認できたことは以下です。

  • S3 VectorsをBedrock Knowledge Basesのvector storeとして使える
  • Cognito groupからRetrievalFilterをサーバー側で生成できる
  • Retrieve APIで確認すると、filterに合わないdomainのチャンクは返らない

RAGを作りたいが、ユーザー制御をしたい場合や「検索できる文書の範囲」をユーザーごとに分けたいケースは多いと思います。

S3 Vectors + Bedrock Knowledge Basesのmetadata filterを使うと、Knowledge Baseをドメインごとに分けずに、Lambdaで検索範囲を制御できます。同じ構成を組む方の役に立てば幸いです。

Fusic 技術ブログ

Discussion