🚲

AWS BedrockでAthenaを操作する分析AIエージェントを作ってみた

に公開

🎯 はじめに

AWS Bedrock で Claude モデル(Claude 3 Sonnet)が利用可能になったので、
Athena を経由してオープンデータを分析する「AIデータアナリストエージェント」を構築してみました。

Claude が自動で SQL を生成し、Athena でクエリを実行、結果を自然文で要約して返す構成です。


🗺 構成概要

  • Bedrock (Claude 3 Sonnet): 質問理解とSQL生成

  • Lambda (Python): Athenaクエリ実行、結果JSON返却

  • Athena + Glue: データ参照

  • S3: 元データとAthena出力の保管


🚲 利用するデータ:シェアサイクル Divvyとは?

Divvy はアメリカ・イリノイ州シカゴのシェアサイクルサービスです。
シカゴ市と Lyft 社が運営しており、利用履歴データをオープンデータとして公開しています。

  • 公開サイト: City of Chicago Data Portal - Divvy Trips
  • データ形式: CSV(1か月単位で提供)
  • 主なカラム例:
    カラム名 説明
    ride_id 走行ID
    rideable_type 車種(classic_bike, electric_bikeなど)
    started_at / ended_at 開始・終了時刻
    start_station_name / end_station_name 駅名
    member_casual 会員区分(member / casual)

🪣 S3にデータを格納

1か月分のCSVファイルをまとめて、Athena から参照できるように S3 にアップロードします。

例:

s3://divvy-opendata/
└── curated_trips/
├── year=2025/month=07/divvy_trips_2025_07.csv
├── year=2025/month=08/divvy_trips_2025_08.csv
└── ...

💡 ポイント

  • 年月でパーティションを切ると、Athenaのクエリコストを大幅に削減できます。
  • 後述のSQL生成ルールでも、Claudeに「WHERE year=... AND month='..' を常に付ける」と教えています。

🧩 Glueデータカタログの設定

Athenaで利用するために、Glueデータカタログでテーブルを作成します。

CREATE EXTERNAL TABLE divvy.curated_trips (
  ride_id string,
  rideable_type string,
  started_at timestamp,
  ended_at timestamp,
  start_station_name string,
  end_station_name string,
  member_casual string
)
PARTITIONED BY (year int, month string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('serialization.format' = ',')
LOCATION 's3://divvy-opendata/curated_trips/'
TBLPROPERTIES ('has_encrypted_data'='false');

⚙️ Lambda の役割

Claude(Bedrock Agent)が生成した SQL を受け取り、Athena に投げて結果を JSON で返します。
返り値は “素のJSON”{columns, rows, query_id})にするのがポイントです。


📦 ランタイムと環境変数

  • ランタイム: Python 3.12
  • ハンドラー: lambda_function.handler(または lambda_function.lambda_handler 互換あり)
  • 環境変数:
    • ATHENA_DATABASE = divvy
    • ATHENA_OUTPUT = s3://<あなたのバケット>/athena/query-results/
    • ATHENA_WORKGROUP = primary(任意)

🧰 最小 IAM(Lambda 実行ロール)

{
  "Version": "2012-10-17",
  "Statement": [
    { "Effect": "Allow", "Action": [
        "athena:StartQueryExecution","athena:GetQueryExecution","athena:GetQueryResults"
      ], "Resource": "*" },
    { "Effect": "Allow", "Action": [
        "glue:GetDatabase","glue:GetTable","glue:GetPartitions"
      ], "Resource": "*" },
    { "Effect": "Allow", "Action": [
        "s3:GetObject","s3:PutObject","s3:ListBucket","s3:GetBucketLocation"
      ], "Resource": [
        "arn:aws:s3:::<データ/結果のバケット>", "arn:aws:s3:::<データ/結果のバケット>/*"
      ]}
  ]
}

🧾 Lambda コード(コメント・ログ付き)

import json, os, re, time
import boto3

# --- クライアントと環境変数の準備 ---
athena = boto3.client("athena")  # Athena クエリ実行用クライアント
DB = os.environ.get("ATHENA_DATABASE", "divvy")  # クエリ対象のDB
OUTPUT = os.environ["ATHENA_OUTPUT"]             # 結果のS3出力先
WORKGROUP = os.environ.get("ATHENA_WORKGROUP", "primary")  # ワークグループ

# --- セキュリティチェック用の正規表現 ---
SAFE_SQL = re.compile(r"^\s*select\s", re.I)  # SELECT で始まるか
BLOCK = re.compile(
    r"\b(drop|insert|update|delete|create|alter|grant|revoke|msck|unload|ctas)\b", re.I
)  # 危険な DDL/DML を禁止

def _run_athena(sql: str, limit: int | None = None):
    """Athena に SQL を投げ、columns/rows を返すユーティリティ"""
    print(f"[INFO] Starting Athena query. SQL(before limit): {sql}, limit={limit}")

    # 1) SQL バリデーション
    if not SAFE_SQL.search(sql) or BLOCK.search(sql):
        print("[ERROR] Invalid SQL detected")
        raise ValueError("Only SELECT queries are allowed.")

    # 2) LIMIT 追加(未指定なら付けない/既にあれば触らない)
    if limit and re.search(r"\blimit\s+\d+", sql, re.I) is None:
        sql = f"{sql.rstrip().rstrip(';')} LIMIT {int(limit)}"
        print(f"[INFO] SQL after adding LIMIT: {sql}")

    # 3) 実行開始
    q = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": DB},
        ResultConfiguration={"OutputLocation": OUTPUT},
        WorkGroup=WORKGROUP
    )
    qid = q["QueryExecutionId"]
    print(f"[INFO] QueryExecutionId={qid}")

    # 4) ステータスをポーリング
    while True:
        meta = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]
        state = meta["Status"]["State"]
        print(f"[DEBUG] Query status={state}")
        if state in ("SUCCEEDED","FAILED","CANCELLED"):
            break
        time.sleep(0.6)

    if state != "SUCCEEDED":
        reason = meta["Status"].get("StateChangeReason", "")
        print(f"[ERROR] Athena query failed. State={state} Reason={reason}")
        raise RuntimeError(f"Athena status={state} {reason}")

    # 5) 結果取得(1行目=カラム名, 2行目以降=データ)
    res = athena.get_query_results(QueryExecutionId=qid)
    rows_all = res["ResultSet"]["Rows"]
    cols = [c["VarCharValue"] for c in rows_all[0]["Data"]]
    rows = []
    for r in rows_all[1:]:
        rows.append([d.get("VarCharValue") if d else None for d in r["Data"]])

    print(f"[INFO] Query finished. Columns={cols}, RowCount={len(rows)}")
    return {"columns": cols, "rows": rows, "query_id": qid}

def _extract_payload(event):
    """Bedrock→Lambda の入力形式ゆらぎを吸収して {sql, limit} を取り出す"""
    try:
        snapshot = json.dumps(event)[:800]
    except Exception:
        snapshot = str(event)[:800]
    print(f"[DEBUG] Raw event: {snapshot}")

    # 1) { "parameters": { "sql": "...", "limit": 100 } }
    if isinstance(event, dict) and "parameters" in event and isinstance(event["parameters"], dict):
        print("[INFO] Detected Bedrock style payload with 'parameters'")
        return event["parameters"]

    # 2) { "body": "{\"sql\":\"...\"}" } または { "body": { "sql": "..." } }
    if isinstance(event, dict) and "body" in event:
        body = event["body"]
        if isinstance(body, str):
            try:
                parsed = json.loads(body)
                print("[INFO] Detected body as JSON string")
                return parsed
            except Exception as e:
                print(f"[WARN] body string JSON decode failed: {e}")
        if isinstance(body, dict):
            print("[INFO] Detected body as dict")
            return body

    # 3) { "sql": "...", "limit": 100 } のフラット形式
    if isinstance(event, dict) and "sql" in event:
        print("[INFO] Detected flat style payload with 'sql'")
        return event

    print("[WARN] Could not extract payload, returning empty dict")
    return {}

def handler(event, context):
    """メインハンドラ:SQL抽出→Athena実行→素のJSONで返却"""
    print("=== Lambda handler START ===")
    try:
        payload = _extract_payload(event)
        sql = payload.get("sql")
        limit = payload.get("limit")
        print(f"[INFO] Extracted sql={sql}, limit={limit}")

        if not sql:
            print("[ERROR] Missing SQL in request")
            return {"columns": ["error"], "rows": [["missing sql"]]}

        out = _run_athena(sql, limit)
        print("[INFO] Returning successful response")
        # BedrockのAPIスキーマと一致する“素のJSON”で返す
        return {"columns": out["columns"], "rows": out["rows"], "query_id": out["query_id"]}

    except Exception as e:
        print(f"[EXCEPTION] {str(e)}")
        # 失敗時も素のJSONを返せば、Bedrock側の処理が継続しやすい
        return {"columns": ["error"], "rows": [[str(e)]]}

# 互換: ランタイム設定が lambda_function.lambda_handler の場合
def lambda_handler(event, context):
    return handler(event, context)

🧪 単体テスト用イベント(Lambda コンソール)

{
  "parameters": {
    "sql": "SELECT count(*) FROM divvy.curated_trips WHERE year=2025 AND month='07'",
    "limit": 10
  }
}

期待結果(例)

{"columns":["_col0"], "rows":[["763330"]], "query_id":"..."}

🔁 うまくいかない時のチェック

  • 返り値が statusCode/body ではなく素のJSON になっているか

  • ATHENA_OUTPUT の S3 パスに書き込み権限があるか

  • CloudWatch Logs の

    • [DEBUG] Raw event: で入力形式

    • [INFO] QueryExecutionId= / status= で進捗

    • [ERROR] / [EXCEPTION] で失敗箇所
      を確認する


🤖 Bedrock Agent を作成する

1) エージェント作成

  • モデル: Claude 3 Sonnet(組織で許可されたもの)
  • リージョン: Lambda と同じ(例: ap-northeast-1)
  • エージェント名: divvy-analyst
  • 説明: シカゴDivvyのトリップデータ分析エージェント

2) エージェント向け指示(System Prompt)

Claude が Athenaで動く Presto SQL を使うように促します。

**WARNING: Athena uses Presto SQL. Common mistakes to avoid:**
- NEVER use DATEDIFF() - use date_diff() instead (lowercase with underscore)
- NEVER use EXTRACT(EPOCH FROM ...) or EXTRACT(DOW FROM ...)
- For time differences: date_diff('minute', start_time, end_time)

You are a data analyst for Chicago's bikeshare (Divvy).
- Generate concise, correct Athena SQL that queries table divvy.curated_trips only.
- Columns: ride_id, rideable_type, started_at, ended_at, start_station_id, start_station_name, end_station_id, end_station_name, start_lat, start_lng, end_lat, end_lng, member_casual, year, month.
- Time grouping/filters use started_at. Default to 2025-07 if the user gives no timeframe.
- Always add WHERE year=... AND month='..' to minimize scanned data. Avoid SELECT * unless needed.
- After calling the tool, summarize results plainly (JA ok) with key numbers and any caveats.
- Only SELECT queries are allowed. Never write DDL/DML.

**Athena (Presto) date/time functions - USE THESE:**
Duration: date_diff('minute', started_at, ended_at)
Weekday: day_of_week(started_at)
Hour: hour(started_at)
Date: date(started_at)

**Example for trip duration:**
SELECT 
  MAX(date_diff('minute', started_at, ended_at)) as max_minutes,
  AVG(date_diff('minute', started_at, ended_at)) as avg_minutes
FROM divvy.curated_trips
WHERE year = '2025' AND month = '07'

🧩 アクショングループの定義

3) 「API スキーマで定義」を選択し、OpenAPI を貼り付け

(/run_sql を 1 本だけ定義。パスに description が必須です)

{
  "openapi": "3.0.1",
  "info": { "title": "Athena SQL Runner", "version": "1.0.0" },
  "paths": {
    "/run_sql": {
      "description": "Execute an Athena SELECT query and return columns/rows.",
      "post": {
        "operationId": "run_sql",
        "summary": "Run Athena SQL",
        "description": "Accepts a SELECT-only SQL string and optional row limit, executes via Athena, and returns tabular results.",
        "requestBody": {
          "required": true,
          "content": {
            "application/json": {
              "schema": {
                "type": "object",
                "properties": {
                  "sql":   { "type": "string",  "description": "Athena SQL (SELECT only)" },
                  "limit": { "type": "integer", "minimum": 1, "maximum": 5000, "description": "Optional LIMIT if not present in SQL" }
                },
                "required": ["sql"]
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "Query results",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "columns":  { "type": "array", "items": { "type": "string" } },
                    "rows":     { "type": "array", "items": { "type": "array", "items": { "type": "string" } } },
                    "query_id": { "type": "string" }
                  },
                  "required": ["columns","rows"]
                }
              }
            }
          }
        }
      }
    }
  }
}


4) 統合先として 既存の Lambda 関数 を選択

analytical_ai_agent_test(先に作成したもの)

パラメータマッピングは Passthrough(リクエスト本文のまま)でOK

🔐 権限設定(IAM)

5) エージェント実行ロールに Lambda 実行権限を付与

(Bedrock のエージェント画面「許可」リンクからロールを開き、ポリシーをアタッチ)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowInvokeLambdaForAgent",
      "Effect": "Allow",
      "Action": ["lambda:InvokeFunction"],
      "Resource": [
        "arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test",
        "arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test:*"
      ]
    },
    {
      "Sid": "AllowInvokeClaude",
      "Effect": "Allow",
      "Action": ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
      "Resource": "*"
    }
  ]
}

6) (必要に応じて)Lambda のリソースベースポリシー

同一アカウントなら通常不要ですが、組織の制約で必要な場合があります。
コンソールで プリンシパル:bedrock.amazonaws.com、アクション:lambda:InvokeFunction。
可能なら SourceArn にエージェントエイリアス ARN(arn:aws:bedrock:ap-northeast-1:...:agent-alias/<AgentId>/<AliasId>)を指定して絞り込み。

🚀 準備とデプロイ

  1. 右上の 「準備」(= Build)を実行 → PREPARED になるまで待つ

  2. エイリアス作成(例:test)

  3. 下書きをデプロイ(test エイリアスに)

  4. 右ペインのテストで、エイリアスを test に切り替えて質問を実行

🧪 テスト例(Bedrock のテストウィンドウ)

例1:平均利用時間(会員 vs 非会員)

2025年7月の会員/非会員ごとの平均利用時間を出して

期待されるSQL(概形):

SELECT member_casual,
       AVG(date_diff('minute', started_at, ended_at)) AS avg_minutes,
       COUNT(*) AS ride_count
FROM divvy.curated_trips
WHERE year = 2025 AND month = '07'
GROUP BY member_casual
ORDER BY member_casual;

例2:最も利用された貸出ステーション

一番利用された貸出ステーションはどこ?

期待されるSQL(概形):

SELECT start_station_name, COUNT(*) AS trip_count
FROM divvy.curated_trips
WHERE year = 2025 AND month = '07'
GROUP BY start_station_name
ORDER BY trip_count DESC
LIMIT 10;

🔍 トレースの見方(重要)

  • 右ペイン 「トレース」 → トレースステップ を展開

  • アクション呼び出し(operationId: run_sql)の 入力ペイロード に SQL が入っているか確認

  • Tool result に Lambda から返ってきた {columns, rows} が見えるはず

🪵 CloudWatch Logs で追跡

Lambda のログには次が出ます:

  • [DEBUG] Raw event: … Bedrock→Lambda の入力JSON

  • [INFO] QueryExecutionId=... … Athena実行ID

  • [DEBUG] Query status=... … 実行状態の推移

  • [INFO] Query finished. Columns=..., RowCount=... … 返却前の要約

  • [EXCEPTION] ... … エラー内容

🩹 トラブルシューティング早見表

症状 主な原因 対処方法
AccessDeniedException: InvokeFunction
(Lambda 呼び出し拒否)
Bedrock エージェントの実行ロールに lambda:InvokeFunction が付与されていない、または ARN が間違っている エージェント実行ロールの IAM ポリシーに以下を追加:
json<br>{ "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:ap-northeast-1:<ACCOUNT_ID>:function:analytical_ai_agent_test" }<br>
The server encountered an error processing the Lambda response Lambda の返り値が Bedrock の API スキーマと一致していない(例: statusCodebody を含んでいる) Lambda の return を { "columns": [...], "rows": [...], "query_id": "..." }素の JSON 形式 に修正
Tool 呼び出しが行われない Claude が /run_sql を呼び出していない System Prompt に以下を明示:
Always call the /run_sql action to execute your generated SQL
結果が空 (0件) WHERE 句の year / month が存在しない、または値がずれている Athena コンソールで手動クエリ確認:
SELECT COUNT(*) FROM divvy.curated_trips WHERE year=2025 AND month='07';
Lambda が別リージョンで動いている Bedrock エージェントのリージョンが us-east-1(バージニア北部)になっている Lambda と同じリージョン(例: ap-northeast-1 東京)でエージェントを作成し直す
Missing credentials / Athena permissions error Lambda 実行ロールに Athena or S3 の権限が不足 IAM ポリシーで以下を確認:
athena:StartQueryExecution, athena:GetQueryResults, s3:GetObject, s3:PutObject
Query fails with INVALID_CAST_ARGUMENT Athena のカラム型と CSV の値が不一致 Glue テーブル定義のカラム型を string に戻す、または cast() で明示的に変換
JSONDecodeError in Lambda Bedrock → Lambda の event 形式が "body" に JSON 文字列で渡されている _extract_payload() 関数で json.loads(event["body"]) するよう処理を追加
Timeout 大きなクエリで Athena の完了待機に時間がかかっている Lambda タイムアウト時間を 30 秒以上 に設定、または LIMIT 付きのクエリにする

💡 ポイント

  • Bedrock Agent ↔ Lambda ↔ Athena の間で起こるエラーは、ほとんどが 権限 or レスポンス形式 に関係します。
  • CloudWatch Logs の [INFO] / [DEBUG] / [EXCEPTION] を追えば、原因が確実に分かります。

✅ 仕上げのチェックリスト

  • Bedrock エージェントと Lambda のリージョン一致

  • エージェント実行ロールに lambda:InvokeFunction

  • 必要に応じて Lambda リソースベースポリシーに bedrock.amazonaws.com

  • Lambda 返り値は 素のJSON({columns, rows, query_id})

  • テストは エイリアス(test など)に対して行う

🎉 まとめ

  • たった3サービス(Bedrock/Lambda/Athena)の最小構成で、AIアナリストが完成。

  • Claude が自然言語→SQL→Athena実行→自然文要約までを自動化。

  • 実データ(Divvy)での分析にそのまま応用可能。

Discussion