🌏

個人で地図にAIをマッピングしてみた #3 AWSバッチ処理編

に公開

1. はじめに

前回の記事(個人で地図にAIをマッピングしてみた #2 ローカル開発編)では、地図AI向けに大規模なWikipediaデータを並列分散処理するロジック構成について紹介しました。

今回はその実装フェーズをクラウドにデプロイするための「AWS上でバッチ処理を実行する方法」に焦点を当てていきます。

最終目標は、地図上にAIをマッピングし、ユーザーの位置情報とクエリに応じて適切なコンテキスト(知識)を抽出し、生成AIに渡せる仕組みを構築することです。

しかし、地図データや位置情報はそもそも情報量が膨大で、空間的な演算には高い計算コストが発生します。
この巨大なデータをいかに小さく分割・分散し、必要最小限のデータを高速かつ効率的に処理できるかが鍵となります。

本プロジェクトでは、こうした要件を満たすために、並列分散バッチ処理を使って地理情報付きナレッジベースを構築しています。
個人開発であることを踏まえ、巨大な専用データベースを使わずとも高速でアクセスできる設計を目指しました。

今回は、そのバッチ処理をいよいよAWS環境にデプロイし、実際に実行する手順を紹介します。
Wikipediaのアーカイブデータをもとに、どのようにAWS環境へデプロイしたのかを順を追って紹介していきます。


2. AWS上の全体構成と処理フロー

本章では、地図AIにおけるWikipediaデータの処理をAWS上でどのように構成しているかを紹介します。

2.1 構成図

以下の図は、ローカルで開発したバッチ処理ロジックをAWSに展開し、実際にスケーラブルに動かすための全体構成を示しています。
またこの後、地図アプリからデータを取得することも想定しているため、図の下部にはAPI連携の構成も含めています。

今回は図の上半分、バッチ処理のデプロイと実行に関する構成が中心となります。
API部分については次回の記事で詳しく解説します。


2.2 主な構成要素

VPC構成(上部)

  • プライベートサブネット内に、ソースデプロイ用の EC2 (Deploy Machine)を配置
  • 同じくプライベートサブネット内に、EMRバッチ処理用の複数台で構成された EC2 (EMR Batch Cluster) を自動配置
  • NAT GatewaySSM Endpoint を設置し、外部アクセスと運用性を確保
  • S3 Endpoint を通じて、Wikipediaデータを読み込み
  • なお、NAT GatewayやSSM Endpointは常時起動するとコストがかかるため、利用しないときは削除

バッチ実行フロー(中央)

  • 開発者が AWS Console から、Lambda (EMR Batch I/F) を起点に、EMRを実行
  • Lambda (EMR Batch I/F) は、Spark処理用の EC2(複数台) (EMR Batch Cluster)を起動
  • あわせて EventBridge による定期実行設定(cron)を作成し、Lambda (Monitoring I/F) でバッチ状態をモニタリング
  • データは分散形式でS3に保存
  • 処理が完了すると、EMR Batch ClusterやNAT GatewayやSSM Endpointは自動で削除

クエリ応答API(下部・次回の記事で解説予定)

  • ユーザーのGETクエリは API Gateway を通じて、Lambda → Bedrock(Embedding / ChatAI) に連携
  • 応答の際には、今回のバッチ処理で構築した「地理情報付きナレッジベース分散データ(S3)」を参照

2.3 処理フローのざっくり流れ

  1. ローカルPCから、WikipediaアーカイブをS3にアップロード
  2. バッチ処理ロジックをGitHub経由でデプロイマシンにデプロイし、AMIイメージを作成
  3. Lambdaを起点に、EMRクラスターとその監視cronを起動
  4. Spark処理でデータをH3グリッドで分散化してS3に出力
  5. クラスタ終了後、LambdaでNATゲートウェイやEC2などのリソースを削除
  6. 構築されたデータは、地図アプリからGETアクセスされる

3. AWS環境設定

この章では、実際にAWS上にバッチ処理を実行するための環境構築を行います。
構成図で示したとおり、主に以下のリソースを設定していきます。

  • VPC
  • サブネット(パブリック/プライベート)
  • NAT Gateway、SSM・S3エンドポイント
  • セキュリティグループ
  • EC2インスタンス(Deploy Machine)

個人開発のため、これらのリソースを最小限の構成で必要なときだけ起動し、不要なときに削除できるようにしています。

この章では、それぞれの設定方法や構築手順について紹介していきます。


3.1 VPCとサブネットの構築

本プロジェクトでは、バッチ処理用のEMRと、ソースデプロイ用のEC2を配置するために、専用のVPCを構築しています。
VPCは 2つのアベイラビリティゾーン(AZ)にまたがる構成とし、パブリックサブネットとプライベートサブネットをそれぞれ用意しています。

以下が、VPC構築時の具体的な設定値です。

項目 設定値
作成するリソース VPCなど
自動タグの自動生成 自動生成にチェック
タグ名 任意の名前 → mapai(今回のプロジェクト名)
IPv4 CIDR ブロック 172.31.0.0/16
IPv6 CIDR ブロック Amazon提供のIPv6 CIDRブロック
テナンシー デフォルト
アベイラビリティゾーン(AZ)の数 2
第1アベイラビリティゾーン ap-northeast-1a (東京リージョン)
パブリックサブネットの数 2
プライベートサブネットの数 2
ap-northeast-1aのパブリックサブネットCIDRブロック 172.31.0.0/20
ap-northeast-1cのパブリックサブネットCIDRブロック 172.31.16.0/20
ap-northeast-1aのプライベートサブネットCIDRブロック 172.31.128.0/20
ap-northeast-1cのプライベートサブネットCIDRブロック 172.31.144.0/20
NATゲートウェイ なし(この時点では作成せず、Lambdaで後から自動設定)
Egress Only インターネットゲートウェイ いいえ
VPCエンドポイント S3ゲートウェイ
DNSホスト名を有効化 ✅ チェック
DNS解決を有効化 ✅ チェック
追加のタグ 設定しない

補足:NAT Gateway や SSM エンドポイントはコスト対策のため、必要なときにだけLambdaで動的に作成・削除しています。このLambdaについては後半の章で詳しく説明します。


3.2 セキュリティグループの設定

バッチ処理用やデプロイ用のEC2、Lambda、NAT Gatewayなどが正しく通信できるように、セキュリティグループは以下のように設定します。

インバウンドルール (※今回はすべて許可)

項目 設定値
タイプ すべてのトラフィック
プロトコル すべて
ポート範囲 すべて

アウトバウンドルール(IPv4 / IPv6)

IPバージョン タイプ プロトコル ポート範囲
IPv4 すべてのトラフィック すべて すべて
IPv6 すべてのトラフィック すべて すべて

補足:実運用では必要最小限の許可に絞るのが望ましいですが、今回は開発フェーズのためフル開放としています。


3.3 NAT Gatewayの設定

NAT Gatewayは、プライベートサブネット内のリソース(EMR、Lambdaなど)がインターネットにアクセスするために必要です。

項目 設定値
名前 名前(例:mapai-natgw)
サービスカテゴリ パブリックサブネット
接続タイプ パブリック
Elastic IP 割り当て ID 「Elastic IP を割り当て」を選択
追加設定 なし

3.4 各種VPCエンドポイントの設定(SSM / S3)

3.4.1 SSMエンドポイント(Private Link)

項目 設定値
名前タグ mapai-ssm(任意)
サービスカテゴリ AWSのサービス
サービス com.amazonaws.ap-northeast-1.ssm
VPC 対象のVPCを選択
サブネット プライベートサブネットを選択
IPアドレスタイプ IPv4
セキュリティグループ 上記のセキュリティグループを選択
ポリシー フルアクセス
タグ 自動生成のみ

補足:このエンドポイントにより、EC2やEMRがSSM経由で外部に出ることなく制御可能になります。

3.4.2 ssmmessagesエンドポイント(SSMセッションマネージャ用)

項目 設定値
名前タグ mapai-ssmmessages(任意)
サービスカテゴリ AWSのサービス
サービス com.amazonaws.ap-northeast-1.ssmmessages
VPC 対象のVPCを選択
サブネット プライベートサブネットを選択
IPアドレスタイプ IPv4
セキュリティグループ 上記で作成したセキュリティグループを選択
ポリシー フルアクセス
タグ 自動生成のみ

補足:ssmmessages エンドポイントは、EC2 などが SSM セッションマネージャを通じて管理されるために必要な通信を担います。


3.4.3 S3ゲートウェイエンドポイントの設定

S3ゲートウェイエンドポイントは、プライベートサブネットからS3へ直接アクセスするために必要な設定です。
これは NAT Gateway を経由せずに S3 へアクセスできるため、コスト削減にも効果的です。

VPC作成時のウィザードでチェックを入れておくことで、自動的に作成することもできます。


3.5 ソースデプロイ用EC2の構築

Sparkバッチ処理のスクリプト管理や、EMRジョブのデプロイを行うために、専用のEC2インスタンス(Deploy Machine)を構築します。
このインスタンスは、Python環境を整えたうえで、GitHubから処理エンジンをクローン・インストールするための中継マシンとして使用します。
また、このEC2のAMIイメージをもとに、EMRがクラスターを構成し、Sparkの並列分散処理を行います。
つまり、Deploy MachineはソースをデプロイしたEMRクラスターのベースとなる実行環境でもあるという位置づけです。


3.5.1 インスタンスの起動設定

AWSマネジメントコンソールから EC2 > インスタンスを起動 を選び、以下の通りに設定します。

項目 設定値
名前 任意(例:mapai-deploy
Amazonマシンイメージ Amazon Linux 2 AMI(HMV)
アーキテクチャ 64ビット (Arm)
インスタンスタイプ t4g.nano(最小スペックかつ最小コスト)
キーペア 既存のキーペアを選択
VPC 作成済みのVPCを選択
サブネット プライベートサブネット
パブリックIPの自動割り当て 無効化
IPv6の自動割り当て 無効化
セキュリティグループ 共通のセキュリティグループを選択
ストレージ設定 デフォルトのままでOK
IAMインスタンスプロファイル EMR操作用IAMロール(例:EC2EMRAdminRole

3.5.2 pyenv による Python 3.12.1 のインストール

Amazon Linux 2 は標準で古い Python が入っているため、pyenv を使って最新版の 3.12.1 をインストールします。

# ec2-user に切り替え
sudo su - ec2-user

# 依存パッケージのインストール
sudo yum install openssl11 openssl11-devel -y
sudo yum install gcc make zlib-devel bzip2 bzip2-devel readline-devel \
sqlite sqlite-devel tk-devel libffi-devel xz-devel git -y

# pyenv のクローン
sudo git clone https://github.com/pyenv/pyenv.git /usr/local/pyenv

# 環境変数設定
sudo vi /etc/profile.d/pyenv.sh
# 以下を記述
export PYENV_ROOT="/usr/local/pyenv"
export PATH="${PYENV_ROOT}/bin:${PATH}"
eval "$(pyenv init -)"

# 権限を調整
sudo chmod 777 /usr/local/pyenv/

# 一度ログアウト → 再ログインして反映
# その後 Python をインストール
pyenv install 3.12.1
pyenv global 3.12.1

# OpenSSL 11 は削除(競合回避のため)
sudo yum remove openssl11 openssl11-devel -y

3.5.3 バッチエンジンソースのインストール

続いて、GitHubからバッチ処理用のPythonパッケージをデプロイします。

# SSH鍵を生成
ssh-keygen -t rsa

# 公開鍵を GitHub に登録(Deploy Key など)
cat ~/.ssh/id_rsa.pub | pbcopy

# 接続テスト
ssh -T git@github.com

# ソースインストール(developブランチを想定)
python -m pip install git+ssh://git@github.com/your-org/your-repo.git@develop -U -I

your-org/your-repo の部分は、実際のGitHubリポジトリに置き換えてください。


3.5.4 AMIイメージの作成

ここまでで、Deploy Machine に必要な環境(Python 3.12.1、バッチ処理エンジン、SSH設定など)が整いました。
この状態の EC2 をベースに AMI(Amazon Machine Image)を作成し、EMR クラスターで再利用できるようにします。

AMIの作成は、AWSの EC2 Console から行います。

  • EC2 インスタンスの「イメージ > イメージの作成」から作成できます。
  • 作成後は、出力された AMI ID を控えておき、後で EMR の起動時に使用します。

3.5.5 EMR実行ロールの作成

最後に、EMRクラスターを起動するために必要な IAM ロールを作成します。

aws emr create-default-roles

これにより以下の2つのIAMロールが作成されます。
1.EMR_DefaultRole
2.EMR_EC2_DefaultRole

これらのロールが作成されていれば、EMR クラスターを起動・制御する準備が整った状態となります。

3.6 この章のポイント

  • デプロイマシンは Armアーキテクチャ(t4g)+Amazon Linux 2 でコストと互換性を両立
  • Python環境を pyenv で管理し、柔軟なバージョン切り替えを可能にする
  • GitHubからSSH経由でデプロイする方法を明示しており、セキュアなCI/CDの導入基盤としても参考になる構成
  • EMR実行に必要なロールも自動作成でカバー

4. EMR Sparkバッチの起動(Lambda)

この章では、実際にSparkバッチ処理をAWS上で実行する構成について紹介します。
本プロジェクトでは、以下のような流れでバッチを起動・監視・後処理まで自動化しています。

  1. LambdaからEMRクラスターを起動
  2. EMRがSparkバッチ処理を実行
  3. EventBridgeで定期監視
  4. クラスタ終了後、VPC/NATなどの一時リソースを削除

4.1 bootstrap.shの準備

このシェルスクリプトは、EMRノードの起動時に実行されるブートストラップアクションです。

#!/bin/bash
sudo yum remove openssl-devel -y
sudo yum install openssl11 openssl11-devel -y

Amazon Linux 2(EMRノード)ではOpenSSL周りに互換性の問題があるため、pyenv を使ったPython環境に合わせるためにこの設定を入れています。
この bootstrap.sh は s3://<batch-bucket>/script/bootstrap.sh にアップロードしておきます。


4.2 exec_mapai_batch.py の準備

以下のPythonスクリプトが EMRクラスタ上で実行されるSparkバッチ処理の起点です。
実行対象の処理を引数で指定し、モジュール単位で処理を切り替える構成になっています。

import argparse
from importlib import import_module

predict_names = [
  "wikipedia_location",
]

parser = argparse.ArgumentParser(description="mapai_batch")
parser.add_argument("-n", "--name", required=True, choices=predict_names)
parser.add_argument("-i", "--s3_input", required=True, nargs=2)
parser.add_argument("-o", "--s3_output", nargs=2)
args = parser.parse_args()

module_path = f"mapai_batch.main.{args.name}"
module = import_module(module_path)
module.main(args)

この構成のポイント

  1. --name で実行バッチ名(wikipedia_location)を指定
  2. S3 入出力は --s3_input, --s3_output で指定
  3. mainメソッドは mapai_batch.main.<name>.py に配置

4.3 Lambda関数の構成

EMRバッチの実行は、Lambda関数からトリガーされます。
Lambdaでは入力引数を受け取り、バッチの実行を Batch クラスに委譲します。

def lambda_handler(event, context):
    action = event.get("action")  # start / end / heartbeat
    exec_name = event.get("exec_name")
    batch_model = Batch(action, exec_name, ...)
    
    return {
        "statusCode": 200,
        "actionStatus": batch_model.do_action()
    } if batch_model.check_param() else {
        "statusCode": 400
    }

Lambda自体は軽量に保ち、バッチロジックは Batch クラスに記述しています。


4.4 Batchクラスによる実行制御

Batch クラスは以下の3つのアクションに対応しています。

action 内容
start VPC/NATの作成 → EMRクラスター起動
heartbeat Sparkステップの状態確認
end EventBridgeの削除、他にEMRがなければVPC削除
class Batch(object):
    def do_action(self):
        if self.action == "start":
            return self.__start()
        elif self.action == "heartbeat":
            return self.__end() if self.__heartbeat() in ["COMPLETED", "FAILED"] else True
        elif self.action == "end":
            return self.__end()
アクション (action) 処理のタイミング 主な処理内容
"start" バッチ処理の開始時 - VPC構成(NAT / SSM)を起動
- EMRクラスターを立ち上げてSparkバッチを実行- EventBridgeルール(監視)を作成
"heartbeat" EMR実行中の定期監視 - EMRのステップ状態を確認
- "COMPLETED" or "FAILED" なら end 処理を実行
"end" バッチ処理の終了時 - EventBridgeルールを削除
- 他のEMRが動作していなければVPC構成(NAT / SSM)も削除

処理の完了を自動で検出し、後続のリソース解放まで管理します。


4.5 EMRクラスによるSpark実行

EMRクラスは、設定ファイルと環境変数を読み込んでEMRクラスターを立ち上げ、Spark処理を実行します。

self.configurations = [
  {
    "Classification": "spark-env",
    "Configurations": [{
      "Classification": "export",
      "Properties": {
        "PYSPARK_PYTHON": "/usr/local/pyenv/versions/3.12.1/bin/python"
      }
    }]
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-1.amazonaws.com",
      ...
    }
  },
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.resource.memory-mb": "14336"
    }
  }
]

Sparkジョブ起動コマンド:

self.spark_args = [
  "spark-submit",
  "--jars",
  "s3://bucket_name/jars/spark-xml_2.12-0.17.0.jar",
  "s3://bucket_name/script/exec_geoai_batch.py",
  "-n", self.name,
  "-i", self.aws_region, self.data_bucket_name,
  "-o", self.aws_region, self.data_bucket_name
]
引数 説明
"spark-submit" Sparkのジョブを起動する標準コマンド
"--jars" 外部jarファイル(依存ライブラリ)を指定
"s3://...spark-xml_2.12-0.17.0.jar" XML形式のファイルをパースするためのDatabricksのライブラリ
"s3://.../exec_geoai_batch.py" Spark処理の実行本体(Pythonスクリプト)
-n <name> 実行するバッチ名。これがモジュール名に対応している
-i <region> <bucket> 入力データのあるS3リージョンとバケット名
-o <region> <bucket> 出力データの保存先(デバック用に省略可としています)

4.6 EventBridgeによる自動監視

EventBridge クラスは、定期的にLambdaを呼び出すルールを作成します(10分間隔)。

self.event_bridge_client.put_rule(
  Name=self.rule_name,
  ScheduleExpression="rate(10 minutes)"
)

self.event_bridge_client.put_targets(
  Rule=self.rule_name,
  Targets=[{
    "Id": self.cluster_id,
    "Arn": self.lambda_arn,
    "Input": json.dumps({
        "action": "heartbeat",
        "exec_name": self.exec_name,
        "cluster_id": self.cluster_id,
        "is_delete_vpc": self.is_delete_vpc
    })
  }]
)

ジョブが完了すると自動で end アクションに切り替わり、VPCやNATゲートウェイを削除します。


4.7 VPC構成の自動起動と削除

NAT Gateway や SSM エンドポイントなど、コストの高い一時的リソースは、バッチ実行時にのみ動的に構築し、処理完了後に自動で削除するように設計しています。
この制御は、別の処理でも使用しているため、専用のLambda関数によって行われています。

呼び出し元

def invoke(self, action):
    payload_json = {
        "action": action,
        "services": ["nat_gateway", "ssm_endpoints"]
    }

    self.lambda_client.invoke(
        FunctionName="mapai_vpc_setting",
        InvocationType="Event",
        Payload=json.dumps(payload_json)
    )

バッチ起動時には "start"、終了時には "delete" が指定され、
指定されたサービス(NAT Gateway、SSMエンドポイントなど)に応じて動作が分岐します。


Lambdaの中身(抜粋)

vpcを設定するLambdaでは、サービスごとのモジュール(NatGateway, SSMEndpoints)を呼び出し、個別に起動・削除を行います。

service_iter = [
    NatGateway(...) if "nat_gateway" in service_set else None,
    SSMEndpoints(...) if "ssm_endpoints" in service_set else None,
]

for service in service_iter:
    if service:
        service.do_action()

それぞれのクラスは start() / delete() を持ち、次のようなことを行います。

  • NAT Gateway
    • EIPを割り当てて作成
    • プライベートサブネットのルートテーブルに 0.0.0.0/0 のルートを追加
  • SSMエンドポイント
    • com.amazonaws.ap-northeast-1.ssm などを対象に Interfaceエンドポイントを作成または削除

これらの処理はすべて boto3 で実装されており、**バッチ処理に必要なネットワーク構成を「必要なときだけ用意し、自動で破棄する」**ことができるようになっています。


NatGatewayクラスの主な処理

  • 既存の NAT Gateway がなければ、新たに作成
  • Elastic IP を指定して NAT Gateway を割り当て
  • 作成完了まで Waiter で待機し、状態確認
  • プライベートサブネットのルートテーブルに 0.0.0.0/0 のルートを追加
  • 削除時はルート削除 → NAT Gateway 削除
aws_session = boto3.session.Session(region_name=aws_region)
client = aws_session.client("ec2")

client.create_nat_gateway(...)
client.create_route(...)
client.delete_nat_gateway(...)

SSMEndpointsクラスの主な処理

  • com.amazonaws.ap-northeast-1.ssm など、2種類の Interface型 VPC エンドポイントを作成
  • すでに存在していればスキップ
  • 削除時は VpcEndpointId をまとめて削除
aws_session = boto3.session.Session(region_name=aws_region)
client = aws_session.client("ec2")

self.client.create_vpc_endpoint(...)
self.client.delete_vpc_endpoints(...)

まとめ

  • コスト最小化:NAT Gatewayなどは使う時だけ起動
  • 柔軟性:環境変数 or 引数によりVPC/サブネット/EIPを切り替え可能
  • 再利用性:Lambda化されているので、複数のバッチで共通利用可能

この構成により、インフラ構成そのものをコードとして扱えるようになり、個人でも企業レベルの運用設計が可能になります。


5. バッチ処理の実行

ここまでの準備が完了したら、いよいよバッチ処理の実行です。
本プロジェクトでは、AWS Console 上から Lambda を手動実行するだけで Spark 処理がスタートします。

5.1 入力と実行

以下のJSONを Lambda に入力すると、自動で EMR クラスターとNATゲートウェイ等のVPCネットワーク構成が起動し、S3 上のスクリプトと設定をもとに分散バッチ処理が実行されます。

{
  "action": "start",
  "exec_name": "wikipedia_location"
}

EMR クラスターの起動から監視、終了後のクリーンアップ(NAT削除など)まで、一連の処理は Lambda と EventBridge により自動化されています。
処理終了後には EMR クラスターなどの自動構成リソースも自動的に削除されます。

5.2 構成と処理時間

現在の構成は以下のとおりです。

  • EC2インスタンス: m7g.xlarge
  • メモリ : 16.0 GB
  • プロセッサ : AWS Graviton3 Processor
  • vCPU : 4
  • 時間単価 : USD 0.2108
  • 台数 : 5

この構成では、おおよそ 60 分程度で処理が完了します。

処理時間を短縮したい場合は以下のように調整します。

  • スケールアウト:インスタンス数を増やす
  • スケールアップ:より高性能なインスタンスタイプ(例:m7g.2xlarge など)に変更する

6. まとめ

今回は、地理情報付きナレッジベースの構築に向けて、Wikipediaアーカイブをもとにしたバッチ処理ロジックをAWSで実行するための設定方法や、処理を自動化するためのAWS Lambda用Pythonプログラムについて紹介しました。
個人開発でも、LambdaとEMRを組み合わせることでスケーラブルかつコスト効率の高いバッチ処理環境が構築できることが分かりました。

地理情報と意味情報を結びつけることで、ユーザーの位置やクエリに応じて「最適なコンテキストを選ぶ」という、RAG的な情報検索が個人でも実現できるようになることを目指しています。
地図AI × 個人開発の挑戦で実現したコア部分です。


次回は、今回生成した分散データをアプリから取得するためのAPIの構成について説明します。
地図上での検索や生成AIとの連携にどのように運用するのか?という「データ取得編」に進みます。
引き続きご覧いただければ嬉しいです。

Discussion