個人で地図にAIをマッピングしてみた #2 ローカル開発編
1. はじめに
前回の記事(個人で地図にAIをマッピングしてみた #1 設計編)では、地図AIの全体像とアーキテクチャ設計について紹介しました。
今回はその実装フェーズとして、「並列分散処理のロジック構築」に焦点を当てていきます。
最終目標は地図上にAIを構築することです。
そのためには、ユーザーの位置情報とクエリに応じて、適切なコンテキスト(知識)を抽出し、生成AIに渡せるような仕組みを構築する必要があります。
しかし、地図データや位置データはもともと膨大な情報量を持ち、その空間的な演算には高い計算コストが発生します。
この巨大なデータをできるだけ小さく分散化し、高速かつ効率的に必要最小限のデータを読み込む設計が必要となります。
本プロジェクトでは、こうした要件を満たすために、並列分散バッチ処理を使って地理情報付きナレッジベースを構築しています。
個人開発であることを踏まえ、巨大な専用データベースを使わずとも高速でアクセスできる設計を目指しました。
今回は、いきなりクラウドでスケールさせるのではなく、まずはローカル環境でロジックを開発・検証し、確かな足場を固めていきます。
本記事では、そのローカル開発フェーズにおいて、Wikipediaデータをもとにどのような処理を試作したのかを紹介していきます。
2. 処理のフロー
この章では、ローカル環境で構築した並列分散処理ロジックの全体的な流れを紹介します。
Wikipediaのアーカイブデータ(.bz2
形式)を起点とし、最終的には地理情報付きのナレッジベースを構築するための一連の処理を以下のように設計しました。
実行環境については、次章「3. 実行環境(ローカル開発)」をご参照ください。
次のステップで設計しました:
- Wikipediaアーカイブデータの取得
- Sparkの起動
- Wikipediaデータのロードとパース
- 位置情報の抽出
- 位置情報のH3ハッシュ化
- ページ本文のチャンク化 & Embedding生成 ※現在は処理フローから削除しています。
- H3ハッシュ値ごとにデータをグルーピング
- データをS3へ出力
※当初は「ステップ6.ページ本文のチャンク化 & Embedding生成」をバッチ処理内に含めていましたが、データ取得APIとの整合性を考慮して、そのステップを削除しました。
代わりに AWS Lambda関数内でこの処理を行う構成 に変更しました。
この変更により、Embeddingモデルのバージョン変更や構成の切替が柔軟になり、サーバレス構成との親和性も高まりました。
ステップ1: Wikipediaアーカイブデータの取得
まず初めに、処理対象となるWikipediaの日本語アーカイブデータを取得します。
- Wikimediaの公式サイトWikimedia Downloads から、日本語Wikipediaの最新アーカイブ(
jawiki-latest-pages-articles.xml.bz2
)を取得します。 - 日本語Wikipediaの本文のみを含んでおり、緯度・経度の情報を持つ記事も多数含まれています。
- このアーカイブを読み込み、地理情報付きナレッジベースの元データとして活用します。
ステップ2: Sparkの起動
- 並列分散処理ライブラリとして、Apache Spark を使用します。
- 処理はすべて Spark 上で実行する前提で設計しています。
- ローカル環境では PySpark を用い、数コアを利用してWikipediaアーカイブデータを処理します。
- AWSで運用する際も、Amazon EMRのSparkを利用する想定です。
環境構築方法については、次章「実行環境(ローカル開発)」で解説します。
ステップ3: Wikipediaデータのロードとパース
-
.bz2
ファイルをストリーム展開しつつ、XMLパーサでページ単位に読み込んでいきます。 -
<page>
タグごとに、タイトル・本文・テンプレートなどの要素を抽出します。
ステップ4: 位置情報の抽出
- Wikipedia記事内の
{{Coord|...}}
など位置情報を表すテンプレートから正規表現で、緯度・経度を抽出します。 - 正規表現を用いて緯度・経度を取り出し、十進法に変換して扱います。
ステップ5: 位置情報のH3ハッシュ化
- 抽出した緯度・経度をもとに、
h3
ライブラリを利用してハッシュ値(H3インデックス)を計算します。 - これにより、位置情報を空間グリッド上でインデックス化することができ、グルーピングや検索範囲の絞り込みが容易になります。
ステップ6: ページ本文のチャンク化 & Embedding生成 (現在は処理フローから削除しています。)
- Wikipedia各ページの本文を
sentence-transformers
などを用いてベクトル化(Embedding)します。 - 精度向上を目的に、本文は章や段落単位で分割(チャンク化)し、それぞれを個別にベクトル化します。
- 位置情報 × 意味情報 をもったナレッジデータの基礎になります。
ステップ7: H3ハッシュ値ごとにデータをグルーピング
- 計算したh3ハッシュ値をキーとして、近隣エリア単位でデータをグループ化します。
- 各グループには、本文・章ごとのEmbedding・ページ情報を含めてまとめます。
- この構造は、Key-Valueストア形式 に整理されており、
位置情報 -> h3ハッシュエンコード → 関連データのセット
のような形でアクセス可能です。
この形式により、検索時には対象エリアのデータだけを効率よく抽出できるようになります。
さらに、Amazon S3 はもともと Key-Valueストアに近い構造でデータを管理しているため、この設計は S3 上にデータを配置する際にも非常に相性が良く、ストレージ構成の最適化にもつながります。
高速かつ最小限の読み込みで、生成AIへのコンテキスト提供できるのが大きな利点です。
ステップ8: データをS3へ出力
最終的な処理結果は、Amazon S3 に出力します。
- h3 ハッシュ値をキーとしてパスに設定します。
- 本文・章ごとの Embedding・ページ情報を含むデータを JSON 形式で構造化し、さらに
pkl
化して保存します。
この形式により、後続のRAGモデルをAPIで構築するにあたり、高速に対象エリアのコンテキストを取得できるDBが完成します。
3. 実行環境(ローカル開発)
本記事で紹介しているバッチ処理ロジックは、クラウド環境に移行する前に、ローカル環境で開発・検証を行っています。
ここでは、その際に使用した実行環境やライブラリを紹介します。
使用環境
- OS:macOS(Unix系を想定)
- Python:3.12.1
- Java (JDK):17(Spark実行に必要)
Spark環境
Apache SparkのダウンロードサイトApache Spark に従いインストールします。
Pythonライブラリ
- pyspark: 3.5.0
- mwparserfromhell: 0.6.6
- h3: 4.2.2
- sentence-transformers: 4.0.1
4. 実装ロジックの詳細
ここからは、前章で紹介した処理フローに沿って、実際にどのようなコードでバッチ処理ロジックを構築しているのかを詳しく解説していきます。
本記事で紹介している実装は、すべてローカルの Spark 環境で動作するように設計されており、最終的には Amazon EMR 上でも同様の処理が実行できる構成になっています。
扱う処理は以下のとおりです:
- Wikipediaデータの読み込みとパース
- 緯度・経度の抽出(テンプレート処理)
- 位置情報のハッシュ化(H3)
- ページテキストのチャンク分割とEmbedding
- グルーピングと出力処理(KVS形式)
それぞれの処理について、具体的なコードとそのポイントを順に見ていきます。
Sparkの起動
spark-xml パッケージを使用して Wikipedia のXMLデータをパース可能にしています。
S3 読み書きのために s3a プロトコルと必要な設定を付与しています。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("App Name") \
.config("spark.hadoop.fs.s3a.endpoint", "s3.ap-northeast-1.amazonaws.com") \
.config("spark.hadoop.fs.s3a.endpoint.region", "ap-northeast-1") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0") \
.getOrCreate()
Wikipediaアーカイブデータのロード
Wikipediaの .xml.bz2
アーカイブは、spark-xml
パッケージを使ってロードします。
読み込み対象のファイルパスには、ローカル環境のパスだけでなく、S3 上のパスも指定可能です。
from pyspark.sql.functions import col, length
df = spark \
.read \
.format("xml") \
.option("rowTag", "page") \
.load("path") # wikipediaデータのパスを指定します。ローカル環境でもS3でも可能です。
# id, name, text を抽出
df = df.select(
col("id").cast("string"),
col("title").alias("name"),
col("revision.text._VALUE").alias("text"),
)
# 空データを除外
df = df.filter(
((col("name").isNotNull()) & (length(col("name")) > 0)) &
((col("text").isNotNull()) & (length(col("text")) > 0))
)
位置情報の抽出
本文中に含まれる {{Coord|...}}
形式のテンプレートから、緯度・経度を抽出します。
Sparkの UDF(ユーザー定義関数)を用いて、カスタムロジックで位置情報を取り出しています。
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, DoubleType
import mwparserfromhell
import functools
def calc_location_dms_list(dms_list, sign):
try:
val_iter = map(lambda x : (60**x[0], float(x[1])), enumerate(dms_list))
val = functools.reduce(lambda x1,x2 : x1 + x2[1] / x2[0], val_iter, 0) * sign
except ValueError:
val = None
return val
def calc_location_filter_template_japanese_location(filter_template):
try:
lat_deg = float(filter_template.get(1).value.strip())
lat_min = float(filter_template.get(2).value.strip())
lat_sec = float(filter_template.get(3).value.strip())
lat = dms_to_decimal(lat_deg, lat_min, lat_sec, 1)
lon_deg = float(filter_template.get(4).value.strip())
lon_min = float(filter_template.get(5).value.strip())
lon_sec = float(filter_template.get(6).value.strip())
lon = dms_to_decimal(lon_deg, lon_min, lon_sec, 1)
return lat, lon
except (ValueError, IndexError):
return None, None
def extract_text_location(text):
# filter_templatesを抽出
filter_templates = mwparserfromhell.parse(text).filter_templates()
# filter_templatesからcoordを抽出
filter_templates_coord = filter(lambda x : x.name.strip().lower() == "coord", filter_templates)
filter_template_coord = next(filter_templates_coord, None)
# 緯度経度を取得
location = calc_location_filter_template_coord(filter_template_coord) if filter_template_coord is not None else None
return location
# UDF を定義(text から lat, lon を抽出)
extract_text_location_udf = udf(
extract_text_location,
StructType([
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True)
])
)
# UDF を適用して location カラムを生成
df = df.withColumn("location", extract_text_location_udf(col("text")))
# lat, lon カラムに分割し、location カラムは削除
df = df \
.withColumn("lat", col("location.lat")) \
.withColumn("lon", col("location.lon")) \
.drop("location")
# 緯度経度が存在するデータのみ抽出
df = df.filter(col("lat").isNotNull() & col("lon").isNotNull())
実際の処理では {{Coord|...}}
以外の形式からも緯度・経度を抽出していますが、本記事では割愛しています。
位置情報ハッシュ値の抽出
緯度・経度をもとに、H3 ライブラリを使って空間インデックスを計算します。
この処理により、検索時に対象エリアを素早く絞り込むことが可能になります。
H3 Python版公式ドキュメント:uber.github.io/h3-py
H3の 粒度(resolution) は用途に応じて調整する必要があります。
本プロジェクトでは以下のように設定しています:
-
設定例:
h3_resolution = 5
(半径およそ 9.87km の粒度) - 粒度を小さくしすぎると → 出力ファイル数が増えてバッチの処理時間が長くなります
- 粒度を大きくしすぎると → 各エリアに含まれるデータ量が多くなり、このデータを読み込むAPIのレスポンス性能に影響します
H3 resolutionの最適値は、ユースケースに応じた 「塩梅」 が重要になります。
# locationのh3を求める
def extract_location_h3(lat, lon):
h3_resolution = 5 # h3の粒度を設定します。
return h3.latlng_to_cell(lat, lon, h3_resolution)
# UDF
extract_location_h3_udf = udf(
extract_location_h3,
StringType()
)
# h3抽出
df = df.withColumn("h3", extract_location_h3_udf(col("lat"), col("lon")))
ページテキストのチャンク化&Embedding (現在は処理フローから削除しています。)
Wikipediaページの本文を、章単位でチャンク化し、それぞれをベクトル化(Embedding)します。
sentence-transformers
を使用し、多言語対応の軽量モデルを利用しています。
ベクトル化には、メモリ効率の観点から RDD + mapPartitions を使用しています。
Embedding はチャンク単位のリストとして生成されます。
from sentence_transformers import SentenceTransformer
import mwparserfromhell
import itertools
# 使用モデル(多言語対応)
sentence_transformer_model_name = "paraphrase-multilingual-MiniLM-L12-v2"
sentence_transformer_model = SentenceTransformer(sentence_transformer_model_name, device='cpu')
# 本文をチャンク化(章ごとに分割)
def extract_text_chunk(name, text):
chunk_iter = mwparserfromhell.parse(text).get_sections(include_lead=True, include_headings=True, flat=True)
chunk_iter = map(lambda x : (
x.strip_code().strip()
), chunk_iter)
chunk_iter = itertools.chain([name], chunk_iter)
return list(chunk_iter)
# チャンクをベクトル化
def embed_text(partition_iter):
# textのchunk
partition_iter = map(lambda x : (x, extract_text_chunk(x["name"], x["text"])), partition_iter)
# chunkのembed
partition_iter = map(lambda x : {
"id": x[0]["id"],
"name": x[0]["name"],
"h3": x[0]["h3"],
"text": x[0]["text"],
"lat": x[0]["lat"],
"lon": x[0]["lon"],
"embedding": sentence_transformer_model.encode(x[1])
}, partition_iter)
return partition_iter
# rddに変換して処理
rdd = df.rdd
# textのembeded
rdd = rdd.mapPartitions(model_embed.embed_text)
出力
キーに H3 ハッシュ値、値に Wikipedia ページテキストと Embedding ベクトルのイテレータが保持されています。
この値をリスト化し、Pickle(.pkl
)形式で保存し、boto3
ライブラリを用いて S3 に出力します。
この構成により、位置情報ベースで素早くコンテキストを取得可能な、
軽量かつスケーラブルな Key-Value 型のナレッジベースが完成します。
5. まとめ
今回は、地理情報付きナレッジベースの構築に向けて、Wikipediaアーカイブをもとにしたバッチ処理ロジックをローカル環境で開発する流れを紹介しました。
大量のページデータから緯度・経度を抽出し、H3インデックスで空間的に整理しながら、ページの内容をチャンク単位で意味ベクトル化(Embedding)していきます。
そのうえで、Key-Valueストア形式で保存し、S3上にスケーラブルに展開可能なデータにまとめます。
地理情報と意味情報を結合することで、ユーザーの位置とクエリに応じて「最適なコンテキストを選ぶ」というRAG的思考が現実的に可能になることを目的としています。
地図AI × 個人開発の挑戦で実現したコア部分です。
次回は、このロジックをAWSで実行する方法について説明します。
地図上での検索や生成AIとの連携にどのように運用するのか?という「実践編」に進みます。
引き続きご覧いただければ嬉しいです。
Discussion