個人で地図にAIをマッピングしてみた #4 AWS API 構築編
1. はじめに
前回の記事(個人で地図にAIをマッピングしてみた #3 AWSバッチ処理編)では、地図AI向けに大規模なWikipediaデータをAWS上で並列分散バッチ処理する構成について紹介しました。
今回はその続きとして、構築したナレッジベースを自然言語で検索できるAPIとして公開する方法を紹介します。
このAPIによって、次のような体験が実現できます。
📱「このあたりの有名な歴史的建造物って何かある?」
ユーザーの現在地に基づいて、Wikipediaベースのナレッジから関連情報を抽出し、生成AIでわかりやすく整形して応答。
つまり、「自然言語クエリ」 × 「地理情報」 × 「分散ナレッジベース」 を組み合わせることで、地図アプリやチャットアシスタントなどに応用可能な検索体験を実現します。
このAPIは以下のようなAWS構成で構築しました。
- API Gateway + Lambda によるサーバレスなAPI設計
- Bedrock による自然言語Embedding変換
- S3に分散保存されたナレッジデータのベクトル類似検索
- コスト効率を意識したスケーラブルな構成
この記事では、これらの仕組みを実装ベースで具体的に紹介しながら、個人開発でも本番運用可能な検索APIを構築するまでのプロセスを詳しく解説していきます。
2. AWS上の全体構成と処理フロー
本章では、地図AIにおけるWikipediaデータの処理をAWS上でどのように構成しているかを紹介します。
2.1 構成図
以下の図は、地図アプリからデータを取得することも想定し、ローカルで開発したバッチ処理ロジックをAWSに展開し、実際にスケーラブルに動かすための全体構成を示しています。
今回は図の下半分、API連携の構成が中心となります。
上半分のバッチ処理のデプロイと実行に関する構成については、前回の記事(#3 AWSバッチ処理編)で詳しく解説しています。
2.2 クエリ応答APIのざっくりとした構成
地図アプリなどのクライアントが位置情報と自然言語クエリを送信すると、以下のような流れで応答が返ります。
- ユーザーのGETクエリは API Gateway を通じて、Lambda → Bedrock(Embedding / ChatAI) に連携
- 前回のバッチ処理で構築した「地理情報付きナレッジベース分散データ(S3)」を参照
- このナレッジベースは、位置情報のハッシュ値(H3インデックス)をキーとしているため、クエリの位置情報から対応する大域的データを取得
- 対応する大域的データに対して Bedrock Embedding を実施し、入力クエリのEmbeddingと近傍探索(ベクトル類似検索)を行う
- 最も類似したデータを抽出し、APIのレスポンスとして返却
この仕組みによって、自然言語クエリ × 位置情報 に応じた、文脈的に関連性の高い情報検索が可能になります。
3. AWS Lambda関数(Python)実装
この章では、前章で紹介した自然言語クエリAPIの処理フローを、AWS Lambda関数としてPythonでどのように実装するかを解説します。
自然言語クエリと緯度・経度をもとに、地理情報付きナレッジベースから関連情報を取得する Lambda 関数の全体構成を紹介します。
3.1 処理全体の流れ
- クライアントから送信された 緯度 (
lat
)、経度 (lon
)、検索半径 (radius
)、自然言語クエリ (query
) を受け取る -
LocationModel
を使って、指定範囲内にある地理ナレッジデータ(Wikipedia)をS3から取得 -
EmbedModel
によって、自然言語クエリと地理ナレッジをベクトル化し、類似度を計算 - 最も類似する記事を抽出して、JSON形式で応答する
3.2 ディレクトリ構成
lambda関数名/
├── lambda_function.py
├── location_model.py
├── embed_model.py
各ファイルの役割:
- lambda_function.py
Lambdaのエントリーポイントです。
API Gatewayからのクエリパラメータを受け取り、各モデルを呼び出して結果を返却します。 - location_model.py
緯度・経度・範囲から該当する大域的データをS3から取得するクラスです。
緯度・経度・半径から検索範囲の円ポリゴンを生成し、それに重なるH3セルをもとに、S3上の対象データ(Pickle形式)を並列で取得・フィルタします。 - embed_model.py
入力クエリと大域的データのテキストをEmbeddingし、類似度で検索するクラスです。
各データのテキストを構造的に分割(セクション単位)し、自然言語クエリとともにBedrockのEmbedding APIにかけて、ベクトル類似度で最も関連性の高い情報を抽出します。
各Pythonプログラムの具体的な処理内容については、次章(第4章)以降で詳しく説明します。
4. AWS Lambda (Python) の実装解説
4.1 lambda_function.py
この章では、Lambda関数のエントリーポイントである lambda_function.py
の処理内容について詳しく説明します。
API Gateway から受け取ったクエリを起点に、地理情報ナレッジベースの取得と自然言語Embeddingを組み合わせて、最も関連する情報を返す処理フローを実装しています。
def create_output(status_code, output):
# 戻り値のヘッダー
output_header_dict = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": True,
"Access-Control-Allow-Methods": "GET",
"Access-Control-Allow-Headers": "Content-Type,X-CSRF-TOKEN"
}
# 出力をログに記録
logging.info(json.dumps({"status_code": status_code, "output": output}, ensure_ascii=False))
# API Gateway形式で返却
return {
"headers": output_header_dict,
"statusCode": status_code,
"body": json.dumps(output),
}
def lambda_handler(event, context):
# 受信イベントのログ出力
logging.info(json.dumps({"event": event, "context": str(context)}, ensure_ascii=False))
# クエリパラメータ取得
query_string_dict = event.get("queryStringParameters", {})
logging.info(json.dumps({"query_string": query_string_dict}, ensure_ascii=False))
# パラメータ取得と変換
lat = float(query_string_dict["lat"])
lon = float(query_string_dict["lon"])
radius = float(query_string_dict.get("radius", "1000"))
query = query_string_dict["query"]
# 位置情報モデルの生成とデータ読み込み
model_location = LocationModel(lat, lon, radius)
model_location.load_model()
# データが見つからなければ空配列を返す
if len(model_location.data) == 0:
return create_output(200, [])
# Embeddingモデルの定義と処理
model_embed = EmbedModel(model_location.data, query)
model_embed.chunk_data()
model_embed.embedding()
# クエリに最も近いデータを取得
data = model_embed.get_query_result()
# 結果を返却
return create_output(200, data)
処理のポイント解説
- create_output()
Lambdaの戻り値を API Gateway形式のJSONレスポンス に整形
CORS対策として Access-Control-Allow-Origin: * などのヘッダーも含めて返却 - lambda_handler()
クエリパラメータとして lat, lon, radius, query を受け取る
LocationModel を使って、指定範囲内のナレッジをS3から取得
データがなければ即時空のレスポンスを返却
EmbedModel を使って、クエリと各ナレッジをベクトル化・類似度計算
最も関連性の高い1件の情報を抽出して返却
このように、lambda_function.py は全体の制御ロジックをシンプルにまとめており、実際の重い処理は LocationModel と EmbedModel に委譲する構成になっています。
4.2 location_model.py
指定した緯度・経度と検索半径に基づいて、H3セルを使ってS3から関連データを取得するクラスです。
import os
import boto3
import pyproj
from shapely.geometry import Point
from shapely.ops import transform
import h3
from h3 import LatLngPoly
import itertools
from io import BytesIO
import pickle
from concurrent.futures import ThreadPoolExecutor
class LocationModel(object):
def __calc_circle_polygon(self):
proj_wgs84 = pyproj.CRS('EPSG:4326')
proj_aeqd = pyproj.CRS(proj='aeqd', lat_0=self.lat, lon_0=self.lon)
project = pyproj.Transformer.from_crs(proj_aeqd, proj_wgs84, always_xy=True).transform
point = Point(0, 0).buffer(self.radius)
self.circle_polygon = transform(project, point)
def __init__(self, lat, lon, radius):
self.h3_resolution = 5
self.max_workers = 5
self.lat = lat
self.lon = lon
self.radius = radius if radius is not None else 1000
self.__calc_circle_polygon()
self.s3_key_prefix = os.path.join("wikipedia", "location")
aws_region = os.environ.get("AWS_REGION")
aws_session = boto3.session.Session(region_name=aws_region)
s3_resource = aws_session.resource("s3")
self.s3_bucket = s3_resource.Bucket("バケット名")
self.data_model_key = "data.pkl"
def __calc_covering_h3_list(self):
coords = list(map(lambda x : (x[1], x[0]), self.circle_polygon.exterior.coords))
polyfill_h3_set = set(h3.polygon_to_cells(LatLngPoly(coords), self.h3_resolution))
perimeter_h3_list = list(map(lambda x : h3.latlng_to_cell(*x, self.h3_resolution), coords))
interpolated_h3_iter = itertools.pairwise(perimeter_h3_list)
interpolated_h3_iter = map(lambda x : h3.grid_path_cells(x[0], x[1]) if x[0] != x[1] else [x[0]], interpolated_h3_iter)
interpolated_h3_set = set(itertools.chain(*interpolated_h3_iter))
h3_set = polyfill_h3_set | set(perimeter_h3_list) | interpolated_h3_set
self.h3_list = list(h3_set)
def __load_s3_object(self, model_key):
s3_object_iter = map(lambda x : os.path.join(self.s3_key_prefix, x, model_key), self.h3_list)
s3_object_iter = map(lambda x : self.s3_bucket.objects.filter(Prefix=x), s3_object_iter)
s3_object_iter = itertools.chain(*s3_object_iter)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
s3_object_iter = executor.map(lambda x : pickle.load(BytesIO(x.get()["Body"].read())), s3_object_iter)
return itertools.chain(*s3_object_iter)
def load_model(self):
self.__calc_covering_h3_list()
data_iter = self.__load_s3_object(self.data_model_key)
f = lambda x : x if self.circle_polygon.contains(Point(x["lon"], x["lat"])) else None
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
data_iter = executor.map(f, data_iter)
self.data = list(filter(None, data_iter))
処理のポイント解説
- 検索範囲を平面ポリゴンに変換(pyproj + shapely)
- 円をカバーする H3 セルリストを生成
- S3 上の各セルに対応する data.pkl を並列で読み込み
- 緯度・経度が実際に検索円内にあるものだけをフィルタして返却
4.3 embed_model.py
このクラスは、自然言語クエリと地理データ(Wikipediaテキスト)をBedrockでEmbeddingし、コサイン類似度によって最も関連性の高い1件を選出します。
import mwparserfromhell
import itertools
import boto3
import json
import numpy as np
from concurrent.futures import ThreadPoolExecutor
class EmbedModel(object):
def __init__(self, data, query):
self.skip_title_keywords = [
"脚注", "出典", "文献", "外部リンク", "関連項目", "注釈", "備考", "ギャラリー", "ナビゲーション"
]
self.max_workers = 5
self.data = data
self.query = query
self.bedrock_model_id = "amazon.titan-embed-text-v2:0"
self.bedrock_client = boto3.client("bedrock-runtime", region_name="ap-northeast-1")
def __chunk_text(self, name, text):
chunk_iter = mwparserfromhell.parse(text).get_sections(include_lead=True, include_headings=True, flat=True)
chunk_iter = map(lambda x : (
x.filter_headings()[0].title.strip() if len(x.filter_headings()) > 0 else None,
x.strip_code().strip()
), chunk_iter)
chunk_iter = filter(lambda x : len(x[1]) > 0, chunk_iter)
chunk_iter = filter(lambda x : x[0] is None or (
x[0] != x[1] and not any(map(lambda y: y in x[0], self.skip_title_keywords))
), chunk_iter)
chunk_iter = map(lambda x : x[1], chunk_iter)
chunk_iter = itertools.chain([name], chunk_iter)
return chunk_iter
def chunk_data(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
chunk_iter = executor.map(lambda x : self.__chunk_text(x["name"], x["text"]), self.data)
chunk_iter = enumerate(chunk_iter)
chunk_iter = map(lambda x : itertools.product([x[0]], x[1]), chunk_iter)
chunk_iter = itertools.chain(*chunk_iter)
self.idx_chunk_data, self.chunk_data = zip(*chunk_iter)
def __embedding_text(self, text):
native_request = { "inputText": text }
request = json.dumps(native_request)
response = self.bedrock_client.invoke_model(modelId=self.bedrock_model_id, body=request)
model_response = json.loads(response["body"].read())
embedding = model_response["embedding"]
return embedding
def embedding(self):
data_iter = itertools.chain([self.query], self.chunk_data)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
embedding_data = executor.map(self.__embedding_text, data_iter)
self.embed_query = next(embedding_data)
self.embed_data = list(embedding_data)
def cosine_similarity(self):
vec_query = np.array(self.embed_query)
vecs_data = np.array(self.embed_data)
dot_products = np.dot(vecs_data, vec_query)
norm_query = np.linalg.norm(vec_query)
norms_data = np.linalg.norm(vecs_data, axis=1)
similarities = dot_products / (norm_query * norms_data + 1e-10)
return similarities.tolist()
def get_query_result(self):
score = self.cosine_similarity()
result_iter = zip(self.idx_chunk_data, score)
f = lambda x : x[0]
result_iter = itertools.groupby(sorted(result_iter, key=f), key=f)
result_iter = map(lambda x : (x[0], list(map(lambda y : y[1], x[1]))), result_iter)
result_iter = map(lambda x : (x[0], sum(x[1]) / len(x[1])), result_iter)
max_idx, max_score = max(result_iter, key=lambda x : x[1])
return self.data[max_idx]
処理のポイント解説
- Wikipediaテキストをセクション単位に分割し、ハルシネーションを起こしそうな見出し(生成AIが事実とは異なる内容を出力しやすいセクション)をスキップ
- クエリと各テキストチャンクをBedrockでEmbedding
- クエリベクトルと全チャンクのベクトルとのコサイン類似度を計算
- スコアを平均化し、最も類似度が高かった元記事1件を選出して返却
5. AWS Lambda Layer用にPythonライブラリをアーカイブ
今回は、地理的ビッグデータの処理をできるだけ軽量に保ち、レスポンススピードを重視した構成とするため、従来型の AWS Lambda + Layer を採用しています。
この構成では、shapely や pyproj などの外部ライブラリを事前にアーカイブし、Layer として登録する必要があります。
なお、Lambda では Docker コンテナを用いた構成も可能ですが、コールドスタートの問題があり、レスポンス速度を求める API にはオーバースペックと判断しました。
この章では、実際に使っているライブラリを前提に、Layer用の .zip
アーカイブを作成する方法を紹介します。
5.1 Pythonバージョンと使用しているライブラリ
Pythonバージョン : 3.12
ライブラリ
shapely==2.0.4
h3==4.2.2
numpy==1.26.4
pyproj==3.7.1
mwparserfromhell==0.6.6
5.2 zipアーカイブを作成するためのシェルスクリプト
以下の deploy.sh を使って、ライブラリを Layer 用にzip圧縮します。
#!/bin/bash
rm -rf $1_$2
mkdir $1_$2
cd $1_$2
python -m pip install $1==$2 \
--python-version $3 \
--target=python \
--platform manylinux2014_x86_64 \
--implementation cp \
--no-deps \
--only-binary=:all: \
--no-cache-dir \
--upgrade
# 不要ファイルを削除して容量削減
find python -name "*.pyc" -exec rm -rf {} +
find python -name "__pycache__" -exec rm -rf {} +
find python -name "test" -type d -exec rm -rf {} +
find python -name "tests" -type d -exec rm -rf {} +
find python -name "doc" -type d -exec rm -rf {} +
find python -name "datasets" -type d -exec rm -rf {} +
find python -name "experimental" -type d -exec rm -rf {} +
zip -r $1_$2.zip python/
使用例 (Python3.12でShapely==2.0.4をzip化)
bash deploy.sh shapely 2.0.4 3.12
処理のポイント解説
- --platform manylinux2014_x86_64 で Lambda 環境との互換性を確保
- --only-binary=:all: によりソースビルドを避け、ビルドサイズを短縮
- find コマンドで不要なテスト・ドキュメントを削除し、Layerサイズを削減
5.3 AWS LambdaでのLayer登録
- S3にzipファイルをアップロード
- AWSコンソールで「レイヤー > レイヤーの作成」
- S3に保存した <lib>_<version>.zip を指定
6. Lambda関数のデプロイとテスト
この章では、Pythonコード本体をZIPアーカイブし、AWS Lambdaにアップロード → Layerと組み合わせて動作確認を行う手順を解説します。
6.1 Lambda関数を作成(AWSコンソール)
- AWSマネジメントコンソールから「Lambda > 関数を作成」を選択
- 以下の設定で関数を作成
- 関数名を指定
- ランタイム:Python 3.12
- 実行ロール:適切なIAMロール(S3 / Bedrockへのアクセスが必要)
6.2 Pythonコードのアーカイブとアップロード
Lambdaにアップロードするコードは、以下のように .zip にまとめます。
zip source.zip lambda_function.py location_model.py embed_model.py
この source.zip を AWSのLambda コンソールでアップロードします。
6.3 Lambda Layer の指定
- Lambda関数画面の「レイヤー」セクションを開き、「レイヤーを追加」をクリック
- 5章で作成した Layer(例:shapely_2.0.4 など)を選択
- Lambda関数にレイヤーが適用されたことを確認します
6.4 テストイベントの作成と実行
「テスト」ボタンから新しいテストイベントを作成し、以下のような JSON を入力します。
{
"queryStringParameters": {
"lat": "36.69887898852606",
"lon": "138.3129466385537",
"radius": "2000",
"query": "博物館"
}
}
6.5 テスト結果の確認
S3 からナレッジデータを取得し、Bedrock で Embedding を実施、最も関連性の高い情報が返されます。
{
"headers": {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": true,
"Access-Control-Allow-Methods": "GET",
"Access-Control-Allow-Headers": "Content-Type,X-CSRF-TOKEN"
},
"statusCode": 200,
"body": "{\"id\": \"4086371\", \"name\": \"日本のあかり博物館\", \"lat\": 36.69511111111111, \"lon\": 138.3151388888889, \"text\": \"...(略)...\"}"
}
- 設定済みリソース : 1024 MB
- 実行時間:1453.25 ms
- 使用中の最大メモリ : 174 MB
7. API Gatewayの構築と連携
この章では、テスト済みの Lambda 関数を REST API として外部に公開するために、API Gateway を設定する手順を紹介します。
7.1 API Gateway の設定手順
- REST API を作成
APIタイプは「REST API」
新規リソース /query を作成し、GETメソッドを追加 - 統合タイプに Lambda 関数を指定
統合タイプ:「Lambda 関数」
対象:本記事で作成した Lambda 関数名 - CORS を有効化(必要な場合)
「アクション」>「CORSの有効化」
Access-Control-Allow-Origin などを許可 - デプロイ
ステージを新規作成してデプロイ
公開エンドポイント URL が発行される
7.2 実行例(curl)
下記のように GET リクエストを実行することで、Lambda関数と同じように検索結果が返ってきます。
curl "https://発行されたアドレス/?lat=36.69887898852606&lon=138.3129466385537&radius=2000&query=博物館"
8. まとめ
今回は、地理情報付きナレッジベースを自然言語で検索できるようにするための検索API構築プロセスについて紹介しました。
API Gateway + Lambda + Bedrock + S3 を組み合わせて、位置情報に応じたナレッジをEmbedding類似度で検索・応答する仕組みを、個人開発レベルで構築できることが実証できました。
また、Dockerなどの重量級構成ではなく、Lambda + Layer による軽量かつ高速なAPI構成とすることで、生成AIを活用した検索でもレスポンス性能を維持しつつ運用できることが確認できました。
自然言語 × 地理情報 × 分散ナレッジという構成により、ユーザーの直感的なクエリに対応する検索体験を地図アプリやチャットUIへと展開できる基盤が整いつつあります。
次回は、この検索APIをモバイルや地図アプリとどのように連携・活用していくかという「アプリ組み込み編」に進みます。
地図上での対話的な検索や、生成AIを使ったレコメンドなど、よりユーザー体験に近いレイヤーを取り上げていく予定です。
ぜひ引き続きご覧いただければ嬉しいです!
Discussion