AWS BedrockでAthenaを操作する分析AIエージェントを作ってみた
🎯 はじめに
AWS Bedrock で Claude モデル(Claude 3 Sonnet)が利用可能になったので、
Athena を経由してオープンデータを分析する「AIデータアナリストエージェント」を構築してみました。
Claude が自動で SQL を生成し、Athena でクエリを実行、結果を自然文で要約して返す構成です。
🗺 構成概要
-
Bedrock (Claude 3 Sonnet): 質問理解とSQL生成
-
Lambda (Python): Athenaクエリ実行、結果JSON返却
-
Athena + Glue: データ参照
-
S3: 元データとAthena出力の保管
🚲 利用するデータ:シェアサイクル Divvyとは?
Divvy はアメリカ・イリノイ州シカゴのシェアサイクルサービスです。
シカゴ市と Lyft 社が運営しており、利用履歴データをオープンデータとして公開しています。
- 公開サイト: City of Chicago Data Portal - Divvy Trips
- データ形式: CSV(1か月単位で提供)
- 主なカラム例:
カラム名 説明 ride_id
走行ID rideable_type
車種(classic_bike, electric_bikeなど) started_at
/ended_at
開始・終了時刻 start_station_name
/end_station_name
駅名 member_casual
会員区分(member / casual)
🪣 S3にデータを格納
1か月分のCSVファイルをまとめて、Athena から参照できるように S3 にアップロードします。
例:
s3://divvy-opendata/
└── curated_trips/
├── year=2025/month=07/divvy_trips_2025_07.csv
├── year=2025/month=08/divvy_trips_2025_08.csv
└── ...
💡 ポイント
- 年月でパーティションを切ると、Athenaのクエリコストを大幅に削減できます。
- 後述のSQL生成ルールでも、Claudeに「
WHERE year=... AND month='..'
を常に付ける」と教えています。
🧩 Glueデータカタログの設定
Athenaで利用するために、Glueデータカタログでテーブルを作成します。
CREATE EXTERNAL TABLE divvy.curated_trips (
ride_id string,
rideable_type string,
started_at timestamp,
ended_at timestamp,
start_station_name string,
end_station_name string,
member_casual string
)
PARTITIONED BY (year int, month string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('serialization.format' = ',')
LOCATION 's3://divvy-opendata/curated_trips/'
TBLPROPERTIES ('has_encrypted_data'='false');
⚙️ Lambda の役割
Claude(Bedrock Agent)が生成した SQL を受け取り、Athena に投げて結果を JSON で返します。
返り値は “素のJSON”({columns, rows, query_id}
)にするのがポイントです。
📦 ランタイムと環境変数
- ランタイム: Python 3.12
- ハンドラー:
lambda_function.handler
(またはlambda_function.lambda_handler
互換あり) - 環境変数:
ATHENA_DATABASE = divvy
ATHENA_OUTPUT = s3://<あなたのバケット>/athena/query-results/
-
ATHENA_WORKGROUP = primary
(任意)
🧰 最小 IAM(Lambda 実行ロール)
{
"Version": "2012-10-17",
"Statement": [
{ "Effect": "Allow", "Action": [
"athena:StartQueryExecution","athena:GetQueryExecution","athena:GetQueryResults"
], "Resource": "*" },
{ "Effect": "Allow", "Action": [
"glue:GetDatabase","glue:GetTable","glue:GetPartitions"
], "Resource": "*" },
{ "Effect": "Allow", "Action": [
"s3:GetObject","s3:PutObject","s3:ListBucket","s3:GetBucketLocation"
], "Resource": [
"arn:aws:s3:::<データ/結果のバケット>", "arn:aws:s3:::<データ/結果のバケット>/*"
]}
]
}
🧾 Lambda コード(コメント・ログ付き)
import json, os, re, time
import boto3
# --- クライアントと環境変数の準備 ---
athena = boto3.client("athena") # Athena クエリ実行用クライアント
DB = os.environ.get("ATHENA_DATABASE", "divvy") # クエリ対象のDB
OUTPUT = os.environ["ATHENA_OUTPUT"] # 結果のS3出力先
WORKGROUP = os.environ.get("ATHENA_WORKGROUP", "primary") # ワークグループ
# --- セキュリティチェック用の正規表現 ---
SAFE_SQL = re.compile(r"^\s*select\s", re.I) # SELECT で始まるか
BLOCK = re.compile(
r"\b(drop|insert|update|delete|create|alter|grant|revoke|msck|unload|ctas)\b", re.I
) # 危険な DDL/DML を禁止
def _run_athena(sql: str, limit: int | None = None):
"""Athena に SQL を投げ、columns/rows を返すユーティリティ"""
print(f"[INFO] Starting Athena query. SQL(before limit): {sql}, limit={limit}")
# 1) SQL バリデーション
if not SAFE_SQL.search(sql) or BLOCK.search(sql):
print("[ERROR] Invalid SQL detected")
raise ValueError("Only SELECT queries are allowed.")
# 2) LIMIT 追加(未指定なら付けない/既にあれば触らない)
if limit and re.search(r"\blimit\s+\d+", sql, re.I) is None:
sql = f"{sql.rstrip().rstrip(';')} LIMIT {int(limit)}"
print(f"[INFO] SQL after adding LIMIT: {sql}")
# 3) 実行開始
q = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={"Database": DB},
ResultConfiguration={"OutputLocation": OUTPUT},
WorkGroup=WORKGROUP
)
qid = q["QueryExecutionId"]
print(f"[INFO] QueryExecutionId={qid}")
# 4) ステータスをポーリング
while True:
meta = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]
state = meta["Status"]["State"]
print(f"[DEBUG] Query status={state}")
if state in ("SUCCEEDED","FAILED","CANCELLED"):
break
time.sleep(0.6)
if state != "SUCCEEDED":
reason = meta["Status"].get("StateChangeReason", "")
print(f"[ERROR] Athena query failed. State={state} Reason={reason}")
raise RuntimeError(f"Athena status={state} {reason}")
# 5) 結果取得(1行目=カラム名, 2行目以降=データ)
res = athena.get_query_results(QueryExecutionId=qid)
rows_all = res["ResultSet"]["Rows"]
cols = [c["VarCharValue"] for c in rows_all[0]["Data"]]
rows = []
for r in rows_all[1:]:
rows.append([d.get("VarCharValue") if d else None for d in r["Data"]])
print(f"[INFO] Query finished. Columns={cols}, RowCount={len(rows)}")
return {"columns": cols, "rows": rows, "query_id": qid}
def _extract_payload(event):
"""Bedrock→Lambda の入力形式ゆらぎを吸収して {sql, limit} を取り出す"""
try:
snapshot = json.dumps(event)[:800]
except Exception:
snapshot = str(event)[:800]
print(f"[DEBUG] Raw event: {snapshot}")
# 1) { "parameters": { "sql": "...", "limit": 100 } }
if isinstance(event, dict) and "parameters" in event and isinstance(event["parameters"], dict):
print("[INFO] Detected Bedrock style payload with 'parameters'")
return event["parameters"]
# 2) { "body": "{\"sql\":\"...\"}" } または { "body": { "sql": "..." } }
if isinstance(event, dict) and "body" in event:
body = event["body"]
if isinstance(body, str):
try:
parsed = json.loads(body)
print("[INFO] Detected body as JSON string")
return parsed
except Exception as e:
print(f"[WARN] body string JSON decode failed: {e}")
if isinstance(body, dict):
print("[INFO] Detected body as dict")
return body
# 3) { "sql": "...", "limit": 100 } のフラット形式
if isinstance(event, dict) and "sql" in event:
print("[INFO] Detected flat style payload with 'sql'")
return event
print("[WARN] Could not extract payload, returning empty dict")
return {}
def handler(event, context):
"""メインハンドラ:SQL抽出→Athena実行→素のJSONで返却"""
print("=== Lambda handler START ===")
try:
payload = _extract_payload(event)
sql = payload.get("sql")
limit = payload.get("limit")
print(f"[INFO] Extracted sql={sql}, limit={limit}")
if not sql:
print("[ERROR] Missing SQL in request")
return {"columns": ["error"], "rows": [["missing sql"]]}
out = _run_athena(sql, limit)
print("[INFO] Returning successful response")
# BedrockのAPIスキーマと一致する“素のJSON”で返す
return {"columns": out["columns"], "rows": out["rows"], "query_id": out["query_id"]}
except Exception as e:
print(f"[EXCEPTION] {str(e)}")
# 失敗時も素のJSONを返せば、Bedrock側の処理が継続しやすい
return {"columns": ["error"], "rows": [[str(e)]]}
# 互換: ランタイム設定が lambda_function.lambda_handler の場合
def lambda_handler(event, context):
return handler(event, context)
🧪 単体テスト用イベント(Lambda コンソール)
{
"parameters": {
"sql": "SELECT count(*) FROM divvy.curated_trips WHERE year=2025 AND month='07'",
"limit": 10
}
}
期待結果(例)
{"columns":["_col0"], "rows":[["763330"]], "query_id":"..."}
🔁 うまくいかない時のチェック
-
返り値が statusCode/body ではなく素のJSON になっているか
-
ATHENA_OUTPUT の S3 パスに書き込み権限があるか
-
CloudWatch Logs の
-
[DEBUG] Raw event: で入力形式
-
[INFO] QueryExecutionId= / status= で進捗
-
[ERROR] / [EXCEPTION] で失敗箇所
を確認する
-
🤖 Bedrock Agent を作成する
1) エージェント作成
- モデル: Claude 3 Sonnet(組織で許可されたもの)
- リージョン: Lambda と同じ(例: ap-northeast-1)
-
エージェント名:
divvy-analyst
- 説明: シカゴDivvyのトリップデータ分析エージェント
2) エージェント向け指示(System Prompt)
Claude が Athenaで動く Presto SQL を使うように促します。
**WARNING: Athena uses Presto SQL. Common mistakes to avoid:**
- NEVER use DATEDIFF() - use date_diff() instead (lowercase with underscore)
- NEVER use EXTRACT(EPOCH FROM ...) or EXTRACT(DOW FROM ...)
- For time differences: date_diff('minute', start_time, end_time)
You are a data analyst for Chicago's bikeshare (Divvy).
- Generate concise, correct Athena SQL that queries table divvy.curated_trips only.
- Columns: ride_id, rideable_type, started_at, ended_at, start_station_id, start_station_name, end_station_id, end_station_name, start_lat, start_lng, end_lat, end_lng, member_casual, year, month.
- Time grouping/filters use started_at. Default to 2025-07 if the user gives no timeframe.
- Always add WHERE year=... AND month='..' to minimize scanned data. Avoid SELECT * unless needed.
- After calling the tool, summarize results plainly (JA ok) with key numbers and any caveats.
- Only SELECT queries are allowed. Never write DDL/DML.
**Athena (Presto) date/time functions - USE THESE:**
Duration: date_diff('minute', started_at, ended_at)
Weekday: day_of_week(started_at)
Hour: hour(started_at)
Date: date(started_at)
**Example for trip duration:**
SELECT
MAX(date_diff('minute', started_at, ended_at)) as max_minutes,
AVG(date_diff('minute', started_at, ended_at)) as avg_minutes
FROM divvy.curated_trips
WHERE year = '2025' AND month = '07'
🧩 アクショングループの定義
3) 「API スキーマで定義」を選択し、OpenAPI を貼り付け
(/run_sql を 1 本だけ定義。パスに description が必須です)
{
"openapi": "3.0.1",
"info": { "title": "Athena SQL Runner", "version": "1.0.0" },
"paths": {
"/run_sql": {
"description": "Execute an Athena SELECT query and return columns/rows.",
"post": {
"operationId": "run_sql",
"summary": "Run Athena SQL",
"description": "Accepts a SELECT-only SQL string and optional row limit, executes via Athena, and returns tabular results.",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"sql": { "type": "string", "description": "Athena SQL (SELECT only)" },
"limit": { "type": "integer", "minimum": 1, "maximum": 5000, "description": "Optional LIMIT if not present in SQL" }
},
"required": ["sql"]
}
}
}
},
"responses": {
"200": {
"description": "Query results",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"columns": { "type": "array", "items": { "type": "string" } },
"rows": { "type": "array", "items": { "type": "array", "items": { "type": "string" } } },
"query_id": { "type": "string" }
},
"required": ["columns","rows"]
}
}
}
}
}
}
}
}
}
4) 統合先として 既存の Lambda 関数 を選択
analytical_ai_agent_test(先に作成したもの)
パラメータマッピングは Passthrough(リクエスト本文のまま)でOK
🔐 権限設定(IAM)
5) エージェント実行ロールに Lambda 実行権限を付与
(Bedrock のエージェント画面「許可」リンクからロールを開き、ポリシーをアタッチ)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowInvokeLambdaForAgent",
"Effect": "Allow",
"Action": ["lambda:InvokeFunction"],
"Resource": [
"arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test",
"arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test:*"
]
},
{
"Sid": "AllowInvokeClaude",
"Effect": "Allow",
"Action": ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
"Resource": "*"
}
]
}
6) (必要に応じて)Lambda のリソースベースポリシー
同一アカウントなら通常不要ですが、組織の制約で必要な場合があります。
コンソールで プリンシパル:bedrock.amazonaws.com、アクション:lambda:InvokeFunction。
可能なら SourceArn にエージェントエイリアス ARN(arn:aws:bedrock:ap-northeast-1:...:agent-alias/<AgentId>/<AliasId>)を指定して絞り込み。
🚀 準備とデプロイ
-
右上の 「準備」(= Build)を実行 → PREPARED になるまで待つ
-
エイリアス作成(例:test)
-
下書きをデプロイ(test エイリアスに)
-
右ペインのテストで、エイリアスを test に切り替えて質問を実行
🧪 テスト例(Bedrock のテストウィンドウ)
例1:平均利用時間(会員 vs 非会員)
2025年7月の会員/非会員ごとの平均利用時間を出して
期待されるSQL(概形):
SELECT member_casual,
AVG(date_diff('minute', started_at, ended_at)) AS avg_minutes,
COUNT(*) AS ride_count
FROM divvy.curated_trips
WHERE year = 2025 AND month = '07'
GROUP BY member_casual
ORDER BY member_casual;
例2:最も利用された貸出ステーション
一番利用された貸出ステーションはどこ?
期待されるSQL(概形):
SELECT start_station_name, COUNT(*) AS trip_count
FROM divvy.curated_trips
WHERE year = 2025 AND month = '07'
GROUP BY start_station_name
ORDER BY trip_count DESC
LIMIT 10;
🔍 トレースの見方(重要)
-
右ペイン 「トレース」 → トレースステップ を展開
-
アクション呼び出し(operationId: run_sql)の 入力ペイロード に SQL が入っているか確認
-
Tool result に Lambda から返ってきた {columns, rows} が見えるはず
🪵 CloudWatch Logs で追跡
Lambda のログには次が出ます:
-
[DEBUG] Raw event: … Bedrock→Lambda の入力JSON
-
[INFO] QueryExecutionId=... … Athena実行ID
-
[DEBUG] Query status=... … 実行状態の推移
-
[INFO] Query finished. Columns=..., RowCount=... … 返却前の要約
-
[EXCEPTION] ... … エラー内容
🩹 トラブルシューティング早見表
症状 | 主な原因 | 対処方法 |
---|---|---|
AccessDeniedException: InvokeFunction (Lambda 呼び出し拒否) |
Bedrock エージェントの実行ロールに lambda:InvokeFunction が付与されていない、または ARN が間違っている |
エージェント実行ロールの IAM ポリシーに以下を追加:json<br>{ "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test" }<br>
|
The server encountered an error processing the Lambda response | Lambda の返り値が Bedrock の API スキーマと一致していない(例: statusCode や body を含んでいる) |
Lambda の return を { "columns": [...], "rows": [...], "query_id": "..." } の 素の JSON 形式 に修正 |
Tool 呼び出しが行われない | Claude が /run_sql を呼び出していない |
System Prompt に以下を明示:Always call the /run_sql action to execute your generated SQL
|
結果が空 (0件) | WHERE 句の year / month が存在しない、または値がずれている |
Athena コンソールで手動クエリ確認:SELECT COUNT(*) FROM divvy.curated_trips WHERE year=2025 AND month='07';
|
Lambda が別リージョンで動いている | Bedrock エージェントのリージョンが us-east-1 (バージニア北部)になっている |
Lambda と同じリージョン(例: ap-northeast-1 東京)でエージェントを作成し直す |
Missing credentials / Athena permissions error | Lambda 実行ロールに Athena or S3 の権限が不足 | IAM ポリシーで以下を確認:athena:StartQueryExecution , athena:GetQueryResults , s3:GetObject , s3:PutObject
|
Query fails with INVALID_CAST_ARGUMENT | Athena のカラム型と CSV の値が不一致 | Glue テーブル定義のカラム型を string に戻す、または cast() で明示的に変換 |
JSONDecodeError in Lambda | Bedrock → Lambda の event 形式が "body" に JSON 文字列で渡されている |
_extract_payload() 関数で json.loads(event["body"]) するよう処理を追加 |
Timeout | 大きなクエリで Athena の完了待機に時間がかかっている | Lambda タイムアウト時間を 30 秒以上 に設定、または LIMIT 付きのクエリにする |
💡 ポイント
- Bedrock Agent ↔ Lambda ↔ Athena の間で起こるエラーは、ほとんどが 権限 or レスポンス形式 に関係します。
- CloudWatch Logs の
[INFO]
/[DEBUG]
/[EXCEPTION]
を追えば、原因が確実に分かります。
✅ 仕上げのチェックリスト
-
Bedrock エージェントと Lambda のリージョン一致
-
エージェント実行ロールに lambda:InvokeFunction
-
必要に応じて Lambda リソースベースポリシーに bedrock.amazonaws.com
-
Lambda 返り値は 素のJSON({columns, rows, query_id})
-
テストは エイリアス(test など)に対して行う
🎉 まとめ
-
たった3サービス(Bedrock/Lambda/Athena)の最小構成で、AIアナリストが完成。
-
Claude が自然言語→SQL→Athena実行→自然文要約までを自動化。
-
実データ(Divvy)での分析にそのまま応用可能。
Discussion