Amazon S3 Vectorsでacl.jsonではなくmetadata filterを選んだ制限付きRAG試してみた
はじめに
Fusicのレオナです。
2025年12月にGAされ、Amazon Bedrock Knowledge BasesのベクトルストアとしてS3 Vectorsを使えるようになりました。
本ブログでは、Amazon S3 VectorsとAmazon Bedrock Knowledge Basesを使って、Amazon Cognitoのグループごとに検索できるドキュメントを制限するRAGを作ってみます。
Amazon S3 Vectors自体については以前にブログを書いているので、先にそちらをご覧ください。
検証データにはallganize/RAG-Evaluation-Dataset-JAを使います。
日本語RAG評価向けデータセットのdomain情報を使って、ユーザーごとの検索範囲を制御します。domainにはfinance、it、manufacturing、public、retailなどがあり、文書の分野を表します。この値をCognito groupと対応づけることで、ユーザーが検索できる文書をドメイン単位で制限します。
今回は以下を検証してみます。
1.financeグループのユーザーはfinanceドメインの文書だけ検索できる
2.financeとitの両方に所属するユーザーは、両方のドメインを検索できる
3.どの許可グループにも所属しないユーザーは403を返す
前提
ローカルのPython実行環境はuvを使う前提です。また、AWS LambdaのランタイムをPython 3.14にするので、ローカルでもPython 3.14の仮想環境を作っておきます。
uv venv --python 3.14
source .venv/bin/activate
取り込みスクリプトを動かすときは、scripts/requirements.txtをuv pip installで入れます。
uv pip install -r scripts/requirements.txt
Terraformはinfra/envs/devから実行します。
cd infra/envs/dev
terraform init
terraform apply
構成

Cognito group: finance
metadata.domain: finance
RetrievalFilter: {"in": {"key": "domain", "value": ["finance"]}}
この対応がずれると、認可は通っているのに検索結果が返らない状態になります。
全体のコード
検索範囲制御の再現に必要な最小構成だけ載せます。README.md、.env、実値のterraform.tfvars、Terraformのstate/plan、.terraform/、.venv/、単体テスト、e2eテスト、任意のStreamlit UIは省略しています。
.
|-- infra/
| |-- envs/dev/
| | |-- backend.tf
| | |-- providers.tf
| | |-- variables.tf
| | |-- main.tf
| | |-- outputs.tf
| | `-- terraform.tfvars.example
| `-- modules/
| |-- storage/
| | |-- providers.tf
| | |-- variables.tf
| | |-- main.tf
| | `-- outputs.tf
| |-- kb/
| | |-- providers.tf
| | |-- variables.tf
| | |-- main.tf
| | `-- outputs.tf
| |-- api/
| | |-- providers.tf
| | |-- variables.tf
| | |-- main.tf
| | `-- outputs.tf
| |-- cognito/
| | |-- variables.tf
| | |-- main.tf
| | `-- outputs.tf
| `-- observability/
| |-- providers.tf
| |-- variables.tf
| |-- main.tf
| `-- outputs.tf
|-- lambda/rag_handler/
| |-- handler.py
| `-- requirements.txt
`-- scripts/
|-- ingest_allganize.py
`-- requirements.txt
全体のコード
terraform {
backend "local" {
path = "terraform.tfstate"
}
}
terraform {
required_version = ">= 1.7.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 6.46"
}
archive = {
source = "hashicorp/archive"
version = "~> 2.5"
}
random = {
source = "hashicorp/random"
version = "~> 3.6"
}
}
}
provider "aws" {
region = var.region
default_tags {
tags = {
Project = var.project
Environment = var.environment
ManagedBy = "terraform"
}
}
}
variable "project" {
type = string
description = "プロジェクト名のプレフィックス。リソース名とタグに使います。"
default = "strict-rag"
}
variable "environment" {
type = string
default = "dev"
}
variable "region" {
type = string
description = "AWSリージョン。主な検証対象はap-northeast-1です。S3 VectorsやBedrockモデルが使えない場合はus-east-1に切り替えます。"
default = "ap-northeast-1"
}
variable "domains" {
type = list(string)
description = "ACL用のドメイングループ。finance / manufacturing / public / it / retail の5つを使います。"
default = ["finance", "manufacturing", "public", "it", "retail"]
}
variable "embedding_model_id" {
type = string
description = "Bedrockの埋め込みモデルID。Cohere multilingual v3とTitan v2はいずれも多言語対応で1024次元です。"
default = "cohere.embed-multilingual-v3"
}
variable "embedding_dimension" {
type = number
description = "ベクトル次元数。埋め込みモデルの出力次元と合わせます。Cohere v3 = 1024、Titan v2 = 1024です。"
default = 1024
}
variable "generation_model_id" {
type = string
description = "RetrieveAndGenerateで使うBedrock生成モデルID。anthropic.claude-sonnet-4-6のような基盤モデルID、またはjp.anthropic.claude-sonnet-4-6のような推論プロファイルIDを指定します。"
default = "anthropic.claude-sonnet-4-6"
}
variable "callback_urls" {
type = list(string)
description = "Cognito Hosted UIのコールバックURL。"
default = ["http://localhost:8501/"]
}
variable "logout_urls" {
type = list(string)
default = ["http://localhost:8501/"]
}
variable "demo_users" {
type = map(object({
email = string
groups = list(string)
}))
description = "apply時に作成するデモ用Cognitoユーザー。初期パスワードは自動生成し、outputで確認します。"
default = {
finance_user = {
email = "finance_user@example.com"
groups = ["finance"]
}
manufacturing_user = {
email = "manufacturing_user@example.com"
groups = ["manufacturing"]
}
multi_user = {
email = "multi_user@example.com"
groups = ["finance", "it"]
}
no_group_user = {
email = "no_group_user@example.com"
groups = []
}
}
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}
locals {
account_id = data.aws_caller_identity.current.account_id
partition = data.aws_partition.current.partition
region = data.aws_region.current.region
name = "${var.project}-${var.environment}"
embedding_model_arn = "arn:${local.partition}:bedrock:${local.region}::foundation-model/${var.embedding_model_id}"
generation_model_arn = startswith(var.generation_model_id, "arn:") ? var.generation_model_id : (length(regexall("^(jp|apac|us|eu|global)\\.", var.generation_model_id)) > 0 ? "arn:${local.partition}:bedrock:${local.region}:${local.account_id}:inference-profile/${var.generation_model_id}" : "arn:${local.partition}:bedrock:${local.region}::foundation-model/${var.generation_model_id}")
}
resource "random_id" "suffix" {
byte_length = 3
}
module "cognito" {
source = "../../modules/cognito"
name = local.name
domains = var.domains
callback_urls = var.callback_urls
logout_urls = var.logout_urls
demo_users = var.demo_users
domain_suffix = random_id.suffix.hex
}
module "storage" {
source = "../../modules/storage"
name = local.name
suffix = random_id.suffix.hex
embedding_dimension = var.embedding_dimension
}
module "kb" {
source = "../../modules/kb"
name = local.name
corpus_bucket_arn = module.storage.corpus_bucket_arn
corpus_bucket_name = module.storage.corpus_bucket_name
vector_bucket_arn = module.storage.vector_bucket_arn
vector_bucket_name = module.storage.vector_bucket_name
vector_index_arn = module.storage.vector_index_arn
vector_index_name = module.storage.vector_index_name
embedding_model_arn = local.embedding_model_arn
}
module "api" {
source = "../../modules/api"
name = local.name
kb_id = module.kb.knowledge_base_id
generation_model_arn = local.generation_model_arn
allowed_groups = var.domains
cognito_issuer = module.cognito.issuer
cognito_audience = module.cognito.client_id
}
module "observability" {
source = "../../modules/observability"
name = local.name
}
output "region" {
value = local.region
}
output "cognito_user_pool_id" {
value = module.cognito.user_pool_id
}
output "cognito_client_id" {
value = module.cognito.client_id
}
output "cognito_hosted_ui_domain" {
value = module.cognito.hosted_ui_domain
}
output "cognito_issuer" {
value = module.cognito.issuer
}
output "demo_user_initial_passwords" {
value = module.cognito.demo_user_initial_passwords
sensitive = true
}
output "corpus_bucket_name" {
value = module.storage.corpus_bucket_name
}
output "vector_bucket_name" {
value = module.storage.vector_bucket_name
}
output "vector_index_name" {
value = module.storage.vector_index_name
}
output "knowledge_base_id" {
value = module.kb.knowledge_base_id
}
output "data_source_id" {
value = module.kb.data_source_id
}
output "api_base_url" {
value = module.api.invoke_url
}
output "env_example_lines" {
description = "Streamlitをデプロイ済みインフラに接続するため、app/.envに貼り付ける値。"
value = <<-EOT
AWS_REGION=${local.region}
COGNITO_USER_POOL_ID=${module.cognito.user_pool_id}
COGNITO_CLIENT_ID=${module.cognito.client_id}
COGNITO_DOMAIN=${module.cognito.hosted_ui_domain}
COGNITO_ISSUER=${module.cognito.issuer}
API_BASE_URL=${module.api.invoke_url}
EOT
}
project = "strict-rag"
environment = "dev"
region = "ap-northeast-1"
domains = ["finance", "manufacturing", "public", "it", "retail"]
embedding_model_id = "cohere.embed-multilingual-v3"
embedding_dimension = 1024
# Claude Sonnet 4.6の基盤モデルID。東京の地理的クロスリージョン推論を使う場合は、
# 代わりに"jp.anthropic.claude-sonnet-4-6"を指定します。
generation_model_id = "anthropic.claude-sonnet-4-6"
callback_urls = ["http://localhost:8501/"]
logout_urls = ["http://localhost:8501/"]
terraform {
required_providers {
aws = { source = "hashicorp/aws" }
}
}
variable "name" {
type = string
}
variable "suffix" {
type = string
description = "S3バケット名をグローバルに一意にするために付けるランダムサフィックス。"
}
variable "embedding_dimension" {
type = number
default = 1024
}
resource "aws_s3_bucket" "corpus" {
bucket = "${var.name}-corpus-${var.suffix}"
force_destroy = true
}
resource "aws_s3_bucket_versioning" "corpus" {
bucket = aws_s3_bucket.corpus.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "corpus" {
bucket = aws_s3_bucket.corpus.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
resource "aws_s3_bucket_public_access_block" "corpus" {
bucket = aws_s3_bucket.corpus.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_ownership_controls" "corpus" {
bucket = aws_s3_bucket.corpus.id
rule {
object_ownership = "BucketOwnerEnforced"
}
}
resource "aws_s3vectors_vector_bucket" "this" {
vector_bucket_name = "${var.name}-vec-${var.suffix}"
encryption_configuration {
sse_type = "AES256"
}
}
resource "aws_s3vectors_index" "this" {
index_name = "${var.name}-idx"
vector_bucket_name = aws_s3vectors_vector_bucket.this.vector_bucket_name
data_type = "float32"
dimension = var.embedding_dimension
distance_metric = "cosine"
metadata_configuration {
non_filterable_metadata_keys = [
"AMAZON_BEDROCK_TEXT",
"AMAZON_BEDROCK_METADATA",
]
}
}
output "corpus_bucket_name" {
value = aws_s3_bucket.corpus.id
}
output "corpus_bucket_arn" {
value = aws_s3_bucket.corpus.arn
}
output "vector_bucket_name" {
value = aws_s3vectors_vector_bucket.this.vector_bucket_name
}
output "vector_bucket_arn" {
value = aws_s3vectors_vector_bucket.this.vector_bucket_arn
}
output "vector_index_name" {
value = aws_s3vectors_index.this.index_name
}
output "vector_index_arn" {
value = aws_s3vectors_index.this.index_arn
}
terraform {
required_providers {
aws = { source = "hashicorp/aws" }
}
}
variable "name" {
type = string
}
variable "corpus_bucket_arn" {
type = string
}
variable "corpus_bucket_name" {
type = string
}
variable "vector_bucket_arn" {
type = string
}
variable "vector_bucket_name" {
type = string
}
variable "vector_index_arn" {
type = string
}
variable "vector_index_name" {
type = string
}
variable "embedding_model_arn" {
type = string
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}
locals {
account_id = data.aws_caller_identity.current.account_id
region = data.aws_region.current.region
}
data "aws_iam_policy_document" "kb_assume" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["bedrock.amazonaws.com"]
}
condition {
test = "StringEquals"
variable = "aws:SourceAccount"
values = [local.account_id]
}
}
}
resource "aws_iam_role" "kb" {
name = "${var.name}-kb-role"
assume_role_policy = data.aws_iam_policy_document.kb_assume.json
}
data "aws_iam_policy_document" "kb_inline" {
statement {
sid = "ReadCorpusObjects"
effect = "Allow"
actions = ["s3:GetObject"]
resources = [
"${var.corpus_bucket_arn}/*",
]
}
statement {
sid = "ListCorpusBucket"
effect = "Allow"
actions = ["s3:ListBucket"]
resources = [
var.corpus_bucket_arn,
]
}
statement {
sid = "UseEmbeddingModel"
effect = "Allow"
actions = [
"bedrock:InvokeModel",
]
resources = [
var.embedding_model_arn,
]
}
statement {
sid = "S3VectorsIndexOps"
effect = "Allow"
actions = [
"s3vectors:GetVectorBucket",
"s3vectors:GetIndex",
"s3vectors:PutVectors",
"s3vectors:GetVectors",
"s3vectors:ListVectors",
"s3vectors:DeleteVectors",
"s3vectors:QueryVectors",
]
resources = [
var.vector_bucket_arn,
var.vector_index_arn,
]
}
}
resource "aws_iam_role_policy" "kb" {
name = "${var.name}-kb-policy"
role = aws_iam_role.kb.id
policy = data.aws_iam_policy_document.kb_inline.json
}
resource "aws_bedrockagent_knowledge_base" "main" {
name = "${var.name}-kb"
role_arn = aws_iam_role.kb.arn
description = "Strict RAG用のKnowledge Base。検索時に`domain`メタデータでアクセス範囲を制限します。"
knowledge_base_configuration {
type = "VECTOR"
vector_knowledge_base_configuration {
embedding_model_arn = var.embedding_model_arn
}
}
storage_configuration {
type = "S3_VECTORS"
s3_vectors_configuration {
index_arn = var.vector_index_arn
}
}
depends_on = [aws_iam_role_policy.kb]
}
resource "aws_bedrockagent_data_source" "corpus" {
name = "${var.name}-s3-corpus"
knowledge_base_id = aws_bedrockagent_knowledge_base.main.id
data_source_configuration {
type = "S3"
s3_configuration {
bucket_arn = var.corpus_bucket_arn
inclusion_prefixes = ["corpus/"]
}
}
data_deletion_policy = "DELETE"
}
output "knowledge_base_id" {
value = aws_bedrockagent_knowledge_base.main.id
}
output "knowledge_base_arn" {
value = aws_bedrockagent_knowledge_base.main.arn
}
output "data_source_id" {
value = aws_bedrockagent_data_source.corpus.data_source_id
}
output "kb_role_arn" {
value = aws_iam_role.kb.arn
}
terraform {
required_providers {
aws = { source = "hashicorp/aws" }
archive = { source = "hashicorp/archive" }
}
}
variable "name" {
type = string
}
variable "kb_id" {
type = string
}
variable "generation_model_arn" {
type = string
}
variable "allowed_groups" {
type = list(string)
description = "許可するドメイングループのホワイトリスト。LambdaにはALLOWED_GROUPS環境変数として渡し、検索前に呼び出し元の`cognito:groups`クレームと突き合わせます。"
}
variable "cognito_issuer" {
type = string
}
variable "cognito_audience" {
type = string
}
variable "lambda_timeout" {
type = number
default = 30
}
variable "lambda_memory" {
type = number
default = 512
}
variable "num_results" {
type = number
default = 8
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_partition" "current" {}
locals {
account_id = data.aws_caller_identity.current.account_id
region = data.aws_region.current.region
partition = data.aws_partition.current.partition
lambda_src_dir = "${path.root}/../../../lambda/rag_handler"
lambda_zip_path = "${path.module}/.build/rag_handler.zip"
}
data "archive_file" "lambda" {
type = "zip"
source_dir = local.lambda_src_dir
output_path = local.lambda_zip_path
excludes = ["tests", "tests/__init__.py", "tests/test_filter.py", "__pycache__"]
}
data "aws_iam_policy_document" "lambda_assume" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
}
}
resource "aws_iam_role" "lambda" {
name = "${var.name}-lambda-role"
assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}
resource "aws_iam_role_policy_attachment" "lambda_logs" {
role = aws_iam_role.lambda.name
policy_arn = "arn:${local.partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
data "aws_iam_policy_document" "lambda_inline" {
statement {
sid = "BedrockKbRetrieveAndGenerate"
effect = "Allow"
actions = [
"bedrock:Retrieve",
"bedrock:RetrieveAndGenerate",
]
resources = [
"arn:${local.partition}:bedrock:${local.region}:${local.account_id}:knowledge-base/${var.kb_id}",
]
}
statement {
sid = "BedrockInvokeGenerationModel"
effect = "Allow"
actions = [
"bedrock:InvokeModel",
"bedrock:GetInferenceProfile",
]
resources = [
var.generation_model_arn,
"arn:${local.partition}:bedrock:*::foundation-model/anthropic.claude-sonnet-4-*",
]
}
}
resource "aws_iam_role_policy" "lambda" {
name = "${var.name}-lambda-policy"
role = aws_iam_role.lambda.id
policy = data.aws_iam_policy_document.lambda_inline.json
}
resource "aws_cloudwatch_log_group" "lambda" {
name = "/aws/lambda/${var.name}-rag-handler"
retention_in_days = 14
}
resource "aws_lambda_function" "rag_handler" {
function_name = "${var.name}-rag-handler"
role = aws_iam_role.lambda.arn
handler = "handler.handler"
runtime = "python3.14"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
timeout = var.lambda_timeout
memory_size = var.lambda_memory
environment {
variables = {
KB_ID = var.kb_id
GENERATION_MODEL_ARN = var.generation_model_arn
ALLOWED_GROUPS = join(",", var.allowed_groups)
NUM_RESULTS = tostring(var.num_results)
}
}
depends_on = [
aws_iam_role_policy.lambda,
aws_iam_role_policy_attachment.lambda_logs,
aws_cloudwatch_log_group.lambda,
]
}
resource "aws_apigatewayv2_api" "this" {
name = "${var.name}-api"
protocol_type = "HTTP"
cors_configuration {
allow_methods = ["POST", "OPTIONS"]
allow_headers = ["authorization", "content-type"]
allow_origins = ["http://localhost:8501"]
max_age = 300
}
}
resource "aws_apigatewayv2_authorizer" "jwt" {
api_id = aws_apigatewayv2_api.this.id
authorizer_type = "JWT"
identity_sources = ["$request.header.Authorization"]
name = "${var.name}-cognito-jwt"
jwt_configuration {
audience = [var.cognito_audience]
issuer = var.cognito_issuer
}
}
resource "aws_apigatewayv2_integration" "lambda" {
api_id = aws_apigatewayv2_api.this.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.rag_handler.invoke_arn
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_route" "ask" {
api_id = aws_apigatewayv2_api.this.id
route_key = "POST /ask"
target = "integrations/${aws_apigatewayv2_integration.lambda.id}"
authorization_type = "JWT"
authorizer_id = aws_apigatewayv2_authorizer.jwt.id
}
resource "aws_apigatewayv2_stage" "default" {
api_id = aws_apigatewayv2_api.this.id
name = "$default"
auto_deploy = true
default_route_settings {
throttling_rate_limit = 5
throttling_burst_limit = 10
}
access_log_settings {
destination_arn = aws_cloudwatch_log_group.api.arn
format = jsonencode({
requestId = "$context.requestId"
sourceIp = "$context.identity.sourceIp"
requestTime = "$context.requestTime"
route = "$context.routeKey"
status = "$context.status"
protocol = "$context.protocol"
integration = "$context.integrationStatus"
claimsSub = "$context.authorizer.claims.sub"
claimsGroups = "$context.authorizer.claims.cognito:groups"
})
}
}
resource "aws_cloudwatch_log_group" "api" {
name = "/aws/apigw/${var.name}-api"
retention_in_days = 14
}
resource "aws_lambda_permission" "apigw" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.rag_handler.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_apigatewayv2_api.this.execution_arn}/*/*"
}
output "invoke_url" {
value = aws_apigatewayv2_api.this.api_endpoint
}
output "ask_route" {
value = "POST ${aws_apigatewayv2_api.this.api_endpoint}/ask"
}
output "lambda_name" {
value = aws_lambda_function.rag_handler.function_name
}
output "lambda_role_arn" {
value = aws_iam_role.lambda.arn
}
variable "name" {
type = string
}
variable "domains" {
type = list(string)
description = "グループ名。それぞれCognito groupになり、文書の`domain`メタデータの値にもなります。"
}
variable "callback_urls" {
type = list(string)
}
variable "logout_urls" {
type = list(string)
}
variable "demo_users" {
type = map(object({
email = string
groups = list(string)
}))
}
variable "domain_suffix" {
type = string
description = "Cognito Hosted UIのドメインプレフィックスをグローバルに一意にするために付けるランダムサフィックス。"
}
resource "aws_cognito_user_pool" "this" {
name = "${var.name}-pool"
username_attributes = ["email"]
auto_verified_attributes = ["email"]
password_policy {
minimum_length = 12
require_lowercase = true
require_uppercase = true
require_numbers = true
require_symbols = false
temporary_password_validity_days = 7
}
admin_create_user_config {
allow_admin_create_user_only = true
}
account_recovery_setting {
recovery_mechanism {
name = "verified_email"
priority = 1
}
}
schema {
name = "email"
attribute_data_type = "String"
required = true
mutable = true
string_attribute_constraints {
min_length = 1
max_length = 256
}
}
}
resource "aws_cognito_user_pool_domain" "this" {
domain = "${var.name}-${var.domain_suffix}"
user_pool_id = aws_cognito_user_pool.this.id
}
resource "aws_cognito_user_pool_client" "this" {
name = "${var.name}-app"
user_pool_id = aws_cognito_user_pool.this.id
generate_secret = false
prevent_user_existence_errors = "ENABLED"
allowed_oauth_flows_user_pool_client = true
allowed_oauth_flows = ["code"]
allowed_oauth_scopes = ["openid", "email", "profile"]
callback_urls = var.callback_urls
logout_urls = var.logout_urls
supported_identity_providers = ["COGNITO"]
explicit_auth_flows = [
"ALLOW_USER_SRP_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH",
"ALLOW_ADMIN_USER_PASSWORD_AUTH",
]
id_token_validity = 1
access_token_validity = 1
refresh_token_validity = 30
token_validity_units {
id_token = "hours"
access_token = "hours"
refresh_token = "days"
}
read_attributes = ["email", "email_verified"]
write_attributes = ["email"]
}
resource "aws_cognito_user_group" "this" {
for_each = toset(var.domains)
name = each.value
user_pool_id = aws_cognito_user_pool.this.id
description = "domain '${each.value}' 用のACLグループ。このグループのメンバーは、同じdomainが付いた文書を検索できます。"
}
resource "random_password" "demo_user" {
for_each = var.demo_users
length = 16
special = true
override_special = "!@#$%^&*()-_=+"
min_lower = 2
min_upper = 2
min_numeric = 2
min_special = 1
}
resource "aws_cognito_user" "demo" {
for_each = var.demo_users
user_pool_id = aws_cognito_user_pool.this.id
username = each.value.email
password = random_password.demo_user[each.key].result
message_action = "SUPPRESS"
desired_delivery_mediums = ["EMAIL"]
attributes = {
email = each.value.email
email_verified = "true"
}
}
locals {
user_group_pairs = merge([
for user_key, user in var.demo_users : {
for group in user.groups :
"${user_key}_${group}" => { user_key = user_key, group = group }
}
]...)
}
resource "aws_cognito_user_in_group" "demo" {
for_each = local.user_group_pairs
user_pool_id = aws_cognito_user_pool.this.id
username = aws_cognito_user.demo[each.value.user_key].username
group_name = aws_cognito_user_group.this[each.value.group].name
}
output "user_pool_id" {
value = aws_cognito_user_pool.this.id
}
output "user_pool_arn" {
value = aws_cognito_user_pool.this.arn
}
output "client_id" {
value = aws_cognito_user_pool_client.this.id
}
output "hosted_ui_domain" {
description = "Cognito Hosted UIのドメインプレフィックス(AWS側のサフィックスを除く)。完全なURLは https://<domain>.auth.<region>.amazoncognito.com です。"
value = aws_cognito_user_pool_domain.this.domain
}
output "issuer" {
value = "https://cognito-idp.${data.aws_region.current.region}.amazonaws.com/${aws_cognito_user_pool.this.id}"
}
output "demo_user_initial_passwords" {
description = "デモユーザーの初期パスワード。Cognitoにより初回ログイン時にパスワード変更が要求されます。"
value = {
for k, v in var.demo_users :
k => {
email = v.email
password = random_password.demo_user[k].result
groups = v.groups
}
}
sensitive = true
}
data "aws_region" "current" {}
terraform {
required_providers {
aws = { source = "hashicorp/aws" }
}
}
variable "name" {
type = string
}
variable "retention_days" {
type = number
default = 30
}
resource "aws_cloudwatch_log_group" "audit" {
name = "/strict-rag/${var.name}/audit"
retention_in_days = var.retention_days
}
output "audit_log_group_name" {
value = aws_cloudwatch_log_group.audit.name
}
"""Strict RAGのLambdaハンドラー。
ACL強制の前提:
1. グループ所属の信頼できる情報源は、API GatewayのJWT Authorizerが検証して
イベントに付与したCognito ID tokenのクレームです。bodyは信頼しない入力として扱い、
`filter`が含まれていても無視します。
2. 検索フィルターは、許可されたドメイングループのホワイトリストからサーバー側で生成します。
3. 呼び出し元に一致するドメイングループがない場合は、Bedrockを呼ぶ前にHTTP 403を返します。
"""
import json
import logging
import os
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
KB_ID = os.environ["KB_ID"]
GENERATION_MODEL_ARN = os.environ["GENERATION_MODEL_ARN"]
ALLOWED_GROUPS = frozenset(
g.strip() for g in os.environ.get("ALLOWED_GROUPS", "").split(",") if g.strip()
)
NUM_RESULTS = int(os.environ.get("NUM_RESULTS", "8"))
_runtime = None
def _get_runtime():
global _runtime
if _runtime is None:
_runtime = boto3.client("bedrock-agent-runtime")
return _runtime
def parse_groups(claims):
"""JWTクレームから呼び出し元のドメイングループを取り出します。
このLambdaはAPI Gateway HTTP API連携のpayload format 2.0で呼び出されます。
そのイベント形式やテスト用スタブでは、`cognito:groups`がリストの場合もあれば、
"[finance, it]"のようなJSON風文字列の場合もあるため、両方を処理します。
そのうえでALLOWED_GROUPSと突き合わせ、Cognito group名の誤設定によって
任意のドメインへ権限昇格しないようにします。
"""
raw = claims.get("cognito:groups", [])
if isinstance(raw, str):
stripped = raw.strip().lstrip("[").rstrip("]")
raw = [s for s in stripped.replace(",", " ").split() if s]
elif not isinstance(raw, list):
raw = []
return sorted({g for g in raw if g in ALLOWED_GROUPS})
def build_filter(groups):
return {"in": {"key": "domain", "value": groups}}
def _response(status, body):
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body, ensure_ascii=False),
}
def _shape_citations(citations):
shaped = []
for c in citations or []:
for ref in c.get("retrievedReferences", []) or []:
shaped.append(
{
"uri": (ref.get("location") or {}).get("s3Location", {}).get("uri"),
"metadata": ref.get("metadata") or {},
}
)
return shaped
def handler(event, _ctx):
try:
claims = event["requestContext"]["authorizer"]["jwt"]["claims"]
except (KeyError, TypeError):
return _response(401, {"error": "missing JWT claims"})
groups = parse_groups(claims)
if not groups:
return _response(403, {"error": "no allowed domain group on caller"})
try:
body = json.loads(event.get("body") or "{}")
except json.JSONDecodeError:
return _response(400, {"error": "invalid JSON body"})
question = body.get("question")
if not isinstance(question, str) or not question.strip():
return _response(400, {"error": "`question` is required"})
if "filter" in body:
logger.warning(
"ignoring client-supplied filter for user=%s", claims.get("cognito:username")
)
server_filter = build_filter(groups)
resp = _get_runtime().retrieve_and_generate(
input={"text": question},
retrieveAndGenerateConfiguration={
"type": "KNOWLEDGE_BASE",
"knowledgeBaseConfiguration": {
"knowledgeBaseId": KB_ID,
"modelArn": GENERATION_MODEL_ARN,
"retrievalConfiguration": {
"vectorSearchConfiguration": {
"numberOfResults": NUM_RESULTS,
"filter": server_filter,
}
},
},
},
)
return _response(
200,
{
"answer": resp.get("output", {}).get("text", ""),
"citations": _shape_citations(resp.get("citations")),
"user_groups": groups,
"applied_filter": server_filter,
},
)
boto3>=1.35.0
"""AllganizeのPDFコーパスを取得し、S3サイドカーメタデータを書き、KB取り込みを開始します。
AllganizeのHugging Faceデータセット`allganize/RAG-Evaluation-Dataset-JA`は
CSVメタデータのみを公開しています。実際のPDFは`documents.csv`の`url`列が参照する
外部公開サイトにあります。このスクリプトでは以下を行います。
1. Hugging Faceから`documents.csv`をダウンロードする。
2. 各行について、`url`からPDFをダウンロードする。
3. PDFを`s3://{corpus_bucket}/corpus/{domain}/{doc_id}_{file_name}`へアップロードする。
4. `domain` ACL属性を含む対応する`*.metadata.json`サイドカーを書く。
5. `bedrock-agent.start_ingestion_job`を呼び、完了までポーリングする。
`domain`サイドカー属性はLambdaの検索フィルターの基準になるため、サイドカー形式は
AWSのS3データソースが期待する形式に合わせます。
使い方:
python scripts/ingest_allganize.py \\
--kb-id $KB_ID --data-source-id $DS_ID \\
--corpus-bucket $CORPUS_BUCKET
便利なオプション:
--dry-run S3へのアップロードと取り込み開始を行わない
--limit N フィルタリング後の先頭N件だけ処理する
--domain finance,it 指定したdomainだけに絞る
--skip-ingest アップロードだけ行い、取り込みジョブは開始しない
--resume S3キーがすでに存在するPDFをスキップする
"""
from __future__ import annotations
import argparse
import csv
import hashlib
import io
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from typing import Iterable
import boto3
import urllib.request
import urllib.error
DOCUMENTS_CSV_URL = (
"https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-JA"
"/raw/main/documents.csv"
)
DEFAULT_DOMAINS = {"finance", "it", "manufacturing", "public", "retail"}
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stderr
)
log = logging.getLogger("ingest")
@dataclass
class Doc:
domain: str
title: str
page: str
url: str
file_name: str
publisher: str
@property
def doc_id(self) -> str:
return hashlib.sha256(self.url.encode("utf-8")).hexdigest()[:12]
@property
def s3_key(self) -> str:
safe_name = self.file_name.replace(" ", "_")
return f"corpus/{self.domain}/{self.doc_id}_{safe_name}"
@property
def metadata_key(self) -> str:
return f"{self.s3_key}.metadata.json"
def fetch_documents_csv(url: str = DOCUMENTS_CSV_URL) -> list[Doc]:
log.info("Fetching %s", url)
with urllib.request.urlopen(url, timeout=30) as resp:
text = resp.read().decode("utf-8")
reader = csv.DictReader(io.StringIO(text))
docs: list[Doc] = []
for row in reader:
norm = {k.strip(): (v or "").strip() for k, v in row.items()}
docs.append(
Doc(
domain=norm["domain"],
title=norm.get("title", ""),
page=norm.get("page", ""),
url=norm["url"],
file_name=norm["file_name"],
publisher=norm.get("publisher", ""),
)
)
log.info("Loaded %d documents from documents.csv", len(docs))
return docs
def filter_docs(
docs: Iterable[Doc],
allowed_domains: set[str] | None,
limit: int | None,
) -> list[Doc]:
out: list[Doc] = []
for d in docs:
if allowed_domains and d.domain not in allowed_domains:
continue
out.append(d)
if limit is not None and len(out) >= limit:
break
return out
def download_pdf(url: str, timeout: int = 60) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": "strict-rag-ingest/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def build_sidecar(doc: Doc) -> dict:
"""Bedrock KBのS3データソースで使うサイドカーファイル形式。
S3データソースのメタデータファイルのドキュメントにある
**map**形式(key -> { value: {type, stringValue}, includeForEmbedding })を使います。
Bedrockは簡易的なフラット形式(key -> value)もサポートしていますが、
これはincludeForEmbedding=false相当で、属性ごとの埋め込み挙動を表現できません。
"""
return {
"metadataAttributes": {
"domain": {
"value": {"type": "STRING", "stringValue": doc.domain},
"includeForEmbedding": False,
},
"doc_id": {
"value": {"type": "STRING", "stringValue": doc.doc_id},
"includeForEmbedding": False,
},
"source_url": {
"value": {"type": "STRING", "stringValue": doc.url},
"includeForEmbedding": False,
},
"publisher": {
"value": {"type": "STRING", "stringValue": doc.publisher},
"includeForEmbedding": False,
},
"title": {
"value": {"type": "STRING", "stringValue": doc.title},
"includeForEmbedding": True,
},
}
}
def already_uploaded(s3, bucket: str, key: str) -> bool:
try:
s3.head_object(Bucket=bucket, Key=key)
return True
except s3.exceptions.ClientError:
return False
def upload_doc(
s3,
bucket: str,
doc: Doc,
pdf_bytes: bytes,
sidecar: dict,
dry_run: bool,
) -> None:
if dry_run:
log.info("[dry-run] would upload %s (%d bytes) + %s", doc.s3_key, len(pdf_bytes), doc.metadata_key)
return
s3.put_object(
Bucket=bucket,
Key=doc.s3_key,
Body=pdf_bytes,
ContentType="application/pdf",
)
s3.put_object(
Bucket=bucket,
Key=doc.metadata_key,
Body=json.dumps(sidecar, ensure_ascii=False).encode("utf-8"),
ContentType="application/json",
)
def start_and_wait(
bedrock_agent,
kb_id: str,
ds_id: str,
timeout_minutes: int = 60,
poll_seconds: int = 10,
) -> dict:
log.info("Starting ingestion job kb=%s ds=%s", kb_id, ds_id)
started = bedrock_agent.start_ingestion_job(
knowledgeBaseId=kb_id, dataSourceId=ds_id
)
job_id = started["ingestionJob"]["ingestionJobId"]
log.info("Ingestion job started: %s", job_id)
deadline = time.time() + timeout_minutes * 60
while time.time() < deadline:
job = bedrock_agent.get_ingestion_job(
knowledgeBaseId=kb_id,
dataSourceId=ds_id,
ingestionJobId=job_id,
)["ingestionJob"]
status = job["status"]
stats = job.get("statistics", {})
log.info(
" job %s status=%s indexed=%s scanned=%s failed=%s",
job_id,
status,
stats.get("numberOfDocumentsIndexed", "-"),
stats.get("numberOfDocumentsScanned", "-"),
stats.get("numberOfDocumentsFailed", "-"),
)
if status in ("COMPLETE", "FAILED", "STOPPED"):
if status != "COMPLETE":
log.error("Ingestion job ended with status=%s reasons=%s", status, job.get("failureReasons"))
return job
time.sleep(poll_seconds)
raise TimeoutError(f"Ingestion job {job_id} did not complete within {timeout_minutes}m")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="AllganizeのPDFをBedrock KBへ取り込む")
p.add_argument("--corpus-bucket", required=False, default=os.environ.get("CORPUS_BUCKET"))
p.add_argument("--kb-id", required=False, default=os.environ.get("KB_ID"))
p.add_argument("--data-source-id", required=False, default=os.environ.get("DATA_SOURCE_ID"))
p.add_argument("--region", required=False, default=os.environ.get("AWS_REGION"))
p.add_argument("--limit", type=int, default=None)
p.add_argument("--domain", default=None, help="取り込み対象にするdomainのカンマ区切りリスト")
p.add_argument("--dry-run", action="store_true")
p.add_argument("--skip-ingest", action="store_true")
p.add_argument("--resume", action="store_true", help="S3キーがすでに存在する文書をスキップする")
p.add_argument("--errors-out", default="ingest_errors.jsonl")
return p.parse_args()
def main() -> int:
args = parse_args()
if not args.dry_run and not args.corpus_bucket:
log.error("--corpus-bucket is required (or set $CORPUS_BUCKET) unless --dry-run")
return 2
if not args.skip_ingest and not args.dry_run and not (args.kb_id and args.data_source_id):
log.error("--kb-id and --data-source-id are required unless --skip-ingest or --dry-run")
return 2
allowed: set[str] | None = (
{d.strip() for d in args.domain.split(",") if d.strip()} if args.domain else None
)
if allowed:
unknown = allowed - DEFAULT_DOMAINS
if unknown:
log.warning("Domains not in the default Allganize set: %s", sorted(unknown))
session = boto3.Session(region_name=args.region) if args.region else boto3.Session()
s3 = session.client("s3") if not args.dry_run else None
bedrock_agent = session.client("bedrock-agent") if not (args.dry_run or args.skip_ingest) else None
docs = fetch_documents_csv()
docs = filter_docs(docs, allowed, args.limit)
log.info("After filter: %d documents", len(docs))
errors: list[dict] = []
uploaded = 0
skipped = 0
for i, doc in enumerate(docs, start=1):
log.info("[%d/%d] %s/%s %s", i, len(docs), doc.domain, doc.file_name, doc.url)
try:
if args.resume and not args.dry_run and already_uploaded(s3, args.corpus_bucket, doc.s3_key):
log.info(" → skip (already in S3)")
skipped += 1
continue
pdf_bytes = download_pdf(doc.url)
sidecar = build_sidecar(doc)
upload_doc(s3, args.corpus_bucket, doc, pdf_bytes, sidecar, dry_run=args.dry_run)
uploaded += 1
except urllib.error.HTTPError as e:
err = {"file_name": doc.file_name, "domain": doc.domain, "url": doc.url, "error": f"HTTPError {e.code}: {e.reason}"}
log.error(" HTTP %d for %s", e.code, doc.url)
errors.append(err)
except Exception as e: # noqa: BLE001
err = {"file_name": doc.file_name, "domain": doc.domain, "url": doc.url, "error": repr(e)}
log.exception(" failed: %s", e)
errors.append(err)
if errors:
with open(args.errors_out, "w", encoding="utf-8") as f:
for e in errors:
f.write(json.dumps(e, ensure_ascii=False) + "\n")
log.warning("%d documents failed — see %s", len(errors), args.errors_out)
log.info("Upload phase done: uploaded=%d skipped=%d failed=%d", uploaded, skipped, len(errors))
if args.skip_ingest or args.dry_run:
log.info("Ingestion job skipped (--skip-ingest or --dry-run).")
return 0
job = start_and_wait(bedrock_agent, args.kb_id, args.data_source_id)
return 0 if job["status"] == "COMPLETE" else 1
if __name__ == "__main__":
sys.exit(main())
boto3>=1.35.0
単一Knowledge Base + metadata filterにした理由
ドメインごとにKnowledge Baseを分ける方法も考えましたが、今回は単一Knowledge Baseに全ドキュメントを入れて、metadata filterで検索範囲を制限する構成にしました。
| 観点 | 単一Knowledge Base + filter | ドメインごとにKnowledge Base |
|---|---|---|
| 構成 | シンプル | ドメイン数だけ増える |
| 複数ドメイン検索 |
in filterで自然にできる |
結果のマージが必要 |
| 取り込み | ingestion pipelineが1つ | ドメインごとに分けて管理できる |
| 分離 | metadataで論理的に分ける | 物理的に分けやすい |
どちらが正解というより、制御したい単位に合わせて選ぶものだと思っています。
| 観点 | 単一Knowledge Base + metadata filterが向くケース |
acl.jsonを使う別設計が向くケース |
|---|---|---|
| 制御単位 |
finance/itのようなグループ単位で分けたい |
ユーザー単位、文書単位で細かく分けたい |
| 権限情報の置き場所 |
domainなど少数のmetadataで表せる |
acl.jsonを権限のsource of truthにしたい |
| 検索対象の決め方 | 1つのKnowledge Baseに対してmetadata filterをかけたい |
acl.jsonを見て、検索前に対象のKnowledge Base/index/document scopeを決めたい |
| 分離の考え方 | metadataによる論理分離でよい | Knowledge Baseやindexを分ける設計にしたい |
| 検索方式 | semantic searchが中心 | hybrid searchやkeyword条件も含めて、対象文書集合を組み立てたい |
今回は、finance/it/manufacturingのようなドメイン単位で検索範囲を分けられれば十分なので、単一Knowledge Base + metadata filterにしています。ユーザー単位や文書単位で細かく制御したい場合は、acl.jsonを権限のsource of truthにして、そこから許可されたKnowledge Base/S3 Vectors index/document scopeにルーティングする設計も選択肢になります。
注意点として、acl.jsonを置くだけではBedrock Knowledge BasesのRetrievalFilterにはなりません。厳密に制限したい場合は、acl.jsonをLambdaなど検索前の処理で参照し、対象を絞る必要があります。
今回の実装では、RetrievalFilterはLambdaがCognitoのJWT claimsから生成します。
実装解説
Step.1 TerraformでS3 VectorsとKnowledge Baseを作成する
Terraform AWS providerは6.46を使っています。
S3 VectorsとBedrock Knowledge Basesは以下のAWS providerリソースで作成しています。
aws_s3vectors_vector_bucketaws_s3vectors_indexaws_bedrockagent_knowledge_baseaws_bedrockagent_data_source
S3 Vectors側は以下のように作成しています。
resource "aws_s3vectors_vector_bucket" "this" {
vector_bucket_name = "${var.name}-vec-${var.suffix}"
encryption_configuration {
sse_type = "AES256"
}
}
resource "aws_s3vectors_index" "this" {
index_name = "${var.name}-idx"
vector_bucket_name = aws_s3vectors_vector_bucket.this.vector_bucket_name
data_type = "float32"
dimension = var.embedding_dimension
distance_metric = "cosine"
metadata_configuration {
non_filterable_metadata_keys = [
"AMAZON_BEDROCK_TEXT",
"AMAZON_BEDROCK_METADATA",
]
}
}
domainはfilterに使いたいのでfilterable metadataとして残します。
一方で、Bedrock Knowledge BasesがS3 Vectorsに保存するチャンク本文やBedrock管理メタデータはサイズが大きくなりやすいため、non_filterable_metadata_keysに逃がしています。
実際にaws s3vectors list-vectors --return-metadataで確認すると、Bedrock KBが保存したvectorにはAMAZON_BEDROCK_TEXT/AMAZON_BEDROCK_METADATA/domainなどが入っていました。
{
"domain": "manufacturing",
"doc_id": "cf003e4fe3cc",
"title": "JAS活用マニュアル",
"AMAZON_BEDROCK_TEXT": "...",
"AMAZON_BEDROCK_METADATA": "..."
}
non_filterable_metadata_keysに入れたキーはfilter条件には使えませんが、metadata自体を保存しないという意味ではありません。そのため、--return-metadataで確認するとAMAZON_BEDROCK_TEXTやAMAZON_BEDROCK_METADATAも返ります。
S3 Vectorsのmetadataはデフォルトでfilterableになります。Bedrock KB + S3 Vectorsではcustom metadataに上限があり、S3 Vectorsのmetadata filteringドキュメントではcustom metadataは1KB、metadata keysは35個までと説明されています。検索条件に使わない大きなキーはnon-filterableにしておくのが安全です。
Knowledge Base側はstorage_configuration.type = "S3_VECTORS"を指定します。
resource "aws_bedrockagent_knowledge_base" "main" {
name = "${var.name}-kb"
role_arn = aws_iam_role.kb.arn
description = "Strict RAG用のKnowledge Base。検索時に`domain`メタデータでアクセス範囲を制限します。"
knowledge_base_configuration {
type = "VECTOR"
vector_knowledge_base_configuration {
embedding_model_arn = var.embedding_model_arn
}
}
storage_configuration {
type = "S3_VECTORS"
s3_vectors_configuration {
index_arn = var.vector_index_arn
}
}
depends_on = [aws_iam_role_policy.kb]
}
掲載コードのvariables.tfとterraform.tfvars.exampleでは、埋め込みモデルを差し替えやすいようにcohere.embed-multilingual-v3をデフォルトにしています。今回の検証では、実値のterraform.tfvarsでembedding_model_id = "amazon.titan-embed-text-v2:0"、generation_model_id = "jp.anthropic.claude-sonnet-4-6"に上書きしました。2026年5月24日時点で、AWS公式のClaude Sonnet 4.6 Model Cardにはbase model IDとしてanthropic.claude-sonnet-4-6、JPのGeo inference IDとしてjp.anthropic.claude-sonnet-4-6が載っています。
生成モデルにはinference profile IDまたはARNを指定できます。今回はTerraform側でinference profile ARNを組み立ててLambdaに渡しています。
Step.2 S3にドキュメントとmetadataを置く
Bedrock Knowledge BasesのS3 data sourceでは、元ドキュメントと同じ場所に*.metadata.jsonを置くことでmetadataを付与できます。
今回の配置は以下です。
corpus/finance/d45a82c097ef_230829_main.pdf
corpus/finance/d45a82c097ef_230829_main.pdf.metadata.json
metadata sidecarは以下の形式にしています。
{
"metadataAttributes": {
"domain": {
"value": { "type": "STRING", "stringValue": "finance" },
"includeForEmbedding": false
},
"doc_id": {
"value": { "type": "STRING", "stringValue": "d45a82c097ef" },
"includeForEmbedding": false
},
"source_url": {
"value": { "type": "STRING", "stringValue": "https://example.com/source.pdf" },
"includeForEmbedding": false
},
"publisher": {
"value": { "type": "STRING", "stringValue": "金融庁" },
"includeForEmbedding": false
},
"title": {
"value": { "type": "STRING", "stringValue": "2023事務年度 金融行政方針" },
"includeForEmbedding": true
}
}
}
domainは検索範囲制御用のキーなので、includeForEmbeddingはfalseにしています。検索の意味的な近さにdomain名を混ぜたいわけではなく、filter条件としてだけ使いたいためです。
Bedrock Knowledge BasesのS3 data source connectorでは、上記のようにmetadataAttributes配下でvalueとincludeForEmbeddingを指定する形式が使えます。{"metadataAttributes": {"domain": "finance"}}のような簡易形式もありますが、その場合はincludeForEmbedding: false相当になり、属性ごとのembedding含有有無を指定できません。
S3 data sourceのmetadata fileは、Bedrock Knowledge BasesのS3 data source connectorドキュメントにある通り、元ファイル名の末尾に.metadata.jsonを付け、同じS3 prefixに置く必要があります。
ここでacl.jsonのような独自ファイルを別に置かず、Bedrock Knowledge Basesのmetadataとしてdomainを持たせているのは、RetrievalFilterがKnowledge Baseに取り込まれたmetadata attributeに対して効くためです。
別ファイルのacl.jsonにアクセス制御情報を持たせるだけだと、Bedrock側のvector searchではその情報をfilter条件として使えません。Lambdaでacl.jsonを読んで検索後に除外する設計もできますが、その場合は範囲外チャンクが一度検索結果に出てきてしまいます。今回はretrievalの段階で絞り込みたいので、PDFと一緒に*.metadata.jsonを置き、チャンクにdomain metadataを持たせる構成にしています。
取り込み前にdry-runで数件だけ確認します。
uv run python scripts/ingest_allganize.py --dry-run --limit 3
Terraformのoutputから必要な環境変数をexportして、実際に取り込みます。
export AWS_REGION="$(terraform -chdir=infra/envs/dev output -raw region)"
export CORPUS_BUCKET="$(terraform -chdir=infra/envs/dev output -raw corpus_bucket_name)"
export KB_ID="$(terraform -chdir=infra/envs/dev output -raw knowledge_base_id)"
export DATA_SOURCE_ID="$(terraform -chdir=infra/envs/dev output -raw data_source_id)"
uv run python scripts/ingest_allganize.py --resume
取り込みスクリプトでは、Hugging Faceのdocuments.csvを読み、PDFをダウンロードしてS3にアップロードし、metadata sidecarを生成しています。
def build_sidecar(doc: Doc) -> dict:
return {
"metadataAttributes": {
"domain": {
"value": {"type": "STRING", "stringValue": doc.domain},
"includeForEmbedding": False,
},
"doc_id": {
"value": {"type": "STRING", "stringValue": doc.doc_id},
"includeForEmbedding": False,
},
"source_url": {
"value": {"type": "STRING", "stringValue": doc.url},
"includeForEmbedding": False,
},
"publisher": {
"value": {"type": "STRING", "stringValue": doc.publisher},
"includeForEmbedding": False,
},
"title": {
"value": {"type": "STRING", "stringValue": doc.title},
"includeForEmbedding": True,
},
}
}
Allganizeのdocuments.csvには65件のURLが載っていますが、参照先は外部の公開PDFです。そのため、404やtimeoutで失敗するものがありました。
自分の環境では31件をS3にアップロードでき、Bedrock KBのingestion jobは以下の結果になりました。
status=COMPLETE
numberOfDocumentsScanned=31
numberOfNewDocumentsIndexed=30
numberOfDocumentsFailed=1
失敗したURLはingest_errors.jsonlに出すようにしています。再実行時は--resumeを付けると、すでにS3にあるPDFはスキップできます。
Step.3 LambdaでRetrievalFilterを作る
LambdaではAPI Gateway JWT Authorizerが検証したCognito ID tokenのclaimsを使います。
ALLOWED_GROUPS = frozenset(
g.strip() for g in os.environ.get("ALLOWED_GROUPS", "").split(",") if g.strip()
)
def parse_groups(claims):
raw = claims.get("cognito:groups", [])
if isinstance(raw, str):
stripped = raw.strip().lstrip("[").rstrip("]")
raw = [s for s in stripped.replace(",", " ").split() if s]
elif not isinstance(raw, list):
raw = []
return sorted({g for g in raw if g in ALLOWED_GROUPS})
def build_filter(groups):
return {"in": {"key": "domain", "value": groups}}
この構成ではAPI Gateway HTTP APIのpayload format 2.0でLambdaを呼び出しています。cognito:groupsはeventの形やテスト用stubによってlistまたは"[finance, it]"のような文字列として扱えるようにし、カンマ区切りとスペース区切りの両方を処理しています。
handler側では、許可グループがない場合はBedrockを呼ぶ前に403を返します。
groups = parse_groups(claims)
if not groups:
return _response(403, {"error": "no allowed domain group on caller"})
server_filter = build_filter(groups)
RetrieveAndGenerateにはLambdaが生成したserver_filterだけを渡します。
resp = _get_runtime().retrieve_and_generate(
input={"text": question},
retrieveAndGenerateConfiguration={
"type": "KNOWLEDGE_BASE",
"knowledgeBaseConfiguration": {
"knowledgeBaseId": KB_ID,
"modelArn": GENERATION_MODEL_ARN,
"retrievalConfiguration": {
"vectorSearchConfiguration": {
"numberOfResults": NUM_RESULTS,
"filter": server_filter,
}
},
},
},
)
Bedrock Knowledge BasesのRetrievalFilterでは、in operatorを使うことで複数ドメインのunion検索ができます。
{
"in": {
"key": "domain",
"value": ["finance", "it"]
}
}
RetrievalFilterの詳細はAPI Referenceにまとまっています。
Step.4 API GatewayのJWT Authorizerを設定する
API GatewayはHTTP APIを使い、Cognito User PoolをissuerにしたJWT Authorizerを設定しています。
resource "aws_apigatewayv2_authorizer" "jwt" {
api_id = aws_apigatewayv2_api.this.id
authorizer_type = "JWT"
identity_sources = ["$request.header.Authorization"]
name = "${var.name}-cognito-jwt"
jwt_configuration {
audience = [var.cognito_audience]
issuer = var.cognito_issuer
}
}
HTTP APIのJWT Authorizerがtokenを検証し、Lambdaは検証済みclaimsだけを見ます。
エンドユーザーにAWS credentialやbedrock:Retrieve権限を渡さないことも重要です。直接Bedrock Knowledge Baseを呼べるIAM権限をユーザー側に渡すと、API Gateway+Lambdaの認可を通らずに検索できてしまいます。
検証パート
Step.1 検証用ユーザーを確認する
ここでは、デプロイ済みのAPIにCognitoユーザーのID tokenを付けて、検索範囲と回答がどう変わるかを確認します。
確認のためにheadlessでPOST /askを叩いていますが、tokenをコードやシェルに直書きする運用を推奨しているわけではありません。ここでは検索範囲制御の挙動を見るための確認手段として扱います。
検証用のCognitoユーザーは以下のように作成しています。
| ユーザー | 所属グループ | 検索できるdomain |
|---|---|---|
finance_user |
finance |
finance |
manufacturing_user |
manufacturing |
manufacturing |
multi_user |
finance, it
|
finance, it
|
no_group_user |
なし | なし |
この対応から、finance_userでは金融系の文書だけ、manufacturing_userでは製造系の文書だけを使って回答できることを確認します。multi_userは複数グループに所属しているため、financeとitのunionで検索できます。
確認する観点は以下です。
-
applied_filterがCognito groupから作られているか - 回答内容が許可されたdomainの文書に基づいているか
- citationsのdomainが許可範囲に収まっているか
- 権限外の質問で範囲外文書が返らないか
Step.2 finance_userでfinanceの質問を確認する
finance_userはCognito group financeに所属しています。まず金融行政方針について聞きます。
question:
2023事務年度の金融行政方針について教えてください
レスポンスでは、Lambdaが生成したfilterがfinanceになっています。
{
"user_groups": ["finance"],
"applied_filter": {
"in": {
"key": "domain",
"value": ["finance"]
}
}
}
citationsもfinanceのS3 objectだけになりました。
s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
回答も金融行政方針の内容になっています。
2023事務年度の金融行政方針は、大きく以下の4つの柱で構成されています。
Ⅰ.経済や国民生活の安定を支え、その後の成長へと繋ぐ
Ⅱ.社会課題解決と経済成長を両立させる金融システムの構築
Ⅲ.金融システムの安定・信頼を確保する
Ⅳ.金融行政を絶えず進化・深化させる
finance_userではfinanceの文書だけが検索され、回答もその範囲で生成されていることが確認できました。
Step.3 finance_userでmanufacturingの質問を確認する
次に、finance_userのままmanufacturing側にありそうな質問を投げます。
question:
JAS規格の活用効果について教えてください
この場合もfilterはfinanceのままです。
{
"applied_filter": {
"in": {
"key": "domain",
"value": ["finance"]
}
}
}
回答は以下のようになりました。
提供された検索結果には、JAS規格(日本農林規格)の活用効果に関する情報は含まれていません。
検索結果は主に暗号資産・デジタル資産に関する規制動向や金融規制に関する内容となっており、
JAS規格についての記載は見当たりません。
citationsは0件でした。
citations=[]
JAS規格の文書はmanufacturingにありますが、finance_userのfilterはfinanceのままなので、範囲外文書は回答に使われません。
Step.4 manufacturing_userでmanufacturingの質問を確認する
次に、同じ質問をmanufacturing_userで投げます。
question:
JAS規格の活用効果について教えてください
manufacturing_userのfilterはmanufacturingになります。
{
"user_groups": ["manufacturing"],
"applied_filter": {
"in": {
"key": "domain",
"value": ["manufacturing"]
}
}
}
今度はJAS規格の活用効果について回答が返りました。
JAS規格・認証の活用効果として、以下のような点が挙げられます。
- 消費者・取引先への品質保証
- 販売・単価へのプラス効果
- 技術の維持・向上
- 輸出への活用(有機JASの場合)
citationsもmanufacturingのJAS活用マニュアルだけになっています。
domain=manufacturing
s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
Step.5 multi_userでunion検索を確認する
multi_userはfinanceとitの両方に所属しています。
この場合、Lambdaは以下のfilterを作ります。
{
"in": {
"key": "domain",
"value": ["finance", "it"]
}
}
in operatorを使うことで、1回のKnowledge Base queryで複数ドメインを検索できます。
IT分野のセキュリティ動向を聞くと、回答はITの文書に基づいて生成されました。
IT分野のセキュリティ動向について、以下のような主要なトピックが挙げられます。
- 能動的サイバー防御(Active Cyber Defense)
- IoT機器のセキュリティ
- 情報通信ネットワークの安全性・信頼性の確保
- IPA「情報セキュリティ10大脅威2025」
- AIとセキュリティ
citationsはitの文書になっています。
domain=it
s3://.../corpus/it/b6bf640ae47a_report.pdf
s3://.../corpus/it/d18854f20c9e_01point.pdf
s3://.../corpus/it/7147f11497ff_skillup_guideline.pdf
multi_userはfinanceとitのunionで検索できるため、itの質問にも回答できています。
Step.6 no_group_userで403を確認する
許可グループに所属していないno_group_userで同じAPIを叩くと、Bedrockを呼ぶ前に403を返します。
status=403
body={"error":"no allowed domain group on caller"}
検索対象domainを作れないユーザーは、そもそもKnowledge Baseに問い合わせません。
Step.7 retrievalの段階でfilterが効いているか確認する
filterがretrievalの段階で効いていることを確認するため、生成を介さないbedrock-agent-runtime retrieveを直接呼びます。
まず、金融行政方針のクエリをfinance filterで検索します。
aws bedrock-agent-runtime retrieve \
--knowledge-base-id "$KB_ID" \
--retrieval-query '{"text":"2023事務年度の金融行政方針"}' \
--retrieval-configuration '{
"vectorSearchConfiguration": {
"numberOfResults": 5,
"filter": {
"in": {
"key": "domain",
"value": ["finance"]
}
}
}
}'
結果はfinanceの金融庁文書がscore 0.95以上で返りました。
score=0.970 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
score=0.961 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
score=0.961 domain=finance s3://.../corpus/finance/d45a82c097ef_230829_main.pdf
同じクエリをmanufacturing filterで検索すると、金融庁文書は返らず、manufacturingの文書だけが返ります。
score=0.592 domain=manufacturing s3://.../corpus/manufacturing/f36bfba8a37e_DM_Industry_Vision_3.0.pdf
score=0.580 domain=manufacturing s3://.../corpus/manufacturing/7be6151d5624_keshouhin-standard.pdf
score=0.573 domain=manufacturing s3://.../corpus/manufacturing/ac869f1a7748_110329-1a.pdf
逆に、JAS規格に関するクエリをfilterなしで検索するとmanufacturingのJAS文書が上位に出ます。
question:
繊維製品の安全基準やJIS規格について
score=0.715 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
score=0.713 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
score=0.711 domain=manufacturing s3://.../corpus/manufacturing/cf003e4fe3cc_index-11.pdf
これをfinance filterに変えると、JAS文書は返らず、finance domainの中で近いものだけが返ります。
score=0.602 domain=finance s3://.../corpus/finance/825c59b2823f_jimukyoku.pdf
score=0.583 domain=finance s3://.../corpus/finance/7157b2ea051e_news240340_1.pdf
score=0.579 domain=finance s3://.../corpus/finance/f2efd977da57_i-xvii.pdf
この確認から、少なくともRetrieve APIの返却結果としては、filterに合わないdomainのチャンクは返っていないことが分かります。
S3 Vectorsのmetadata filteringドキュメントでも、vector searchとfilter evaluationはtandemに実行され、filter条件に合うvectorが返ると説明されています。
最後に
Amazon S3 VectorsとAmazon Bedrock Knowledge Basesを使って、Cognito groupをアクセス制御の条件として扱う制限付きRAGを構築しました。
今回確認できたことは以下です。
- S3 VectorsをBedrock Knowledge Basesのvector storeとして使える
- Cognito groupから
RetrievalFilterをサーバー側で生成できる -
RetrieveAPIで確認すると、filterに合わないdomainのチャンクは返らない
RAGを作りたいが、ユーザー制御をしたい場合や「検索できる文書の範囲」をユーザーごとに分けたいケースは多いと思います。
S3 Vectors + Bedrock Knowledge Basesのmetadata filterを使うと、Knowledge Baseをドメインごとに分けずに、Lambdaで検索範囲を制御できます。同じ構成を組む方の役に立てば幸いです。
Discussion