🦙

サンドボックス用AWS環境のKPIを毎週報告する機能をつけました

に公開

本記事の目標

先端技術開発グループ(WAND)の佐藤碧です。福原と小島と私は、弊社のエンジニアが使うためのサンドボックス用AWS環境(デモ環境)の保守運用を行っています。さらに、私たちはApplication Load Balancer(ALB)を提供しており、このALBにアプリケーションを登録することで、弊社のVPN経由で誰でもアクセスできるようなデモアプリを簡単に公開できます。
デモ環境を運用する中で「どのくらいのユーザーにデモ環境を利用していただいているか」「どのデモアプリが人気であるか」を可視化し、実際にユーザーに使用していただけているかを監視することになりました。本記事では、設計時や実装時の注意点や考慮事項をお伝えします。

作成物

記事を読む前に、最終的な成果物を紹介します。

作成物
毎週月曜日の朝9:00にTeamsに送信されるKPIレポート

上記のような表を、毎週月曜日の朝9:00にTeamsへWorkflows経由で送信するアプリケーションを作成しました。表には以下の情報が含まれます:

  • ACTIVE USER: 過去1週間・1ヶ月・3ヶ月のアクティブユーザー数
  • ACCESS COUNT: 過去1週間・1ヶ月・3ヶ月の各デモアプリへの期間別アクセス数

アーキテクチャ設計

アーキテクチャ図

最終的にはシンプルなサーバーレスアーキテクチャになりました。Amazon EventBridge SchedulerからAWS Lambda(Lambda)を呼び出す形はよく見られますが、この形になるまでにいくつか考慮したことがあるので、それらを解説していきます。

ダッシュボード作成の見送り

最初の予定では、Amazon OpenSearch ServiceAmazon QuickSightを用いたダッシュボードを作成する予定でした。これらのうちどのツールを使うのが適しているかを小島が検証しました。その結果としてOpenSearchが最も適していると判断しましたが、リアルタイム性を必要としないことや月額コストが発生する点も課題でした。
KPIの可視化という目的のもとでは、定期的にTeamsに結果を送信すれば十分と判断し、ダッシュボードの作成を見送ることにしました。

ログの取得先

「どのくらいのユーザーにデモ環境を利用していただいているか」「どのデモアプリが人気であるか」の2つの指標について、1週間・1ヶ月・3ヶ月の3期間で集計し、まとめてTeamsで報告する設計にしました。そこでそれぞれのログをどのように取得するのがよいかを考えました。

1. ユーザー利用状況の把握

「どのくらいのユーザーにデモ環境を利用していただいているか」の指標は、AWS CloudTrail(CloudTrail)から取得することにしました。CloudTrailはデフォルトでは90日間のイベント履歴のみを保存します。証跡を作成して明示的に設定をすることでS3バケットにログを配信できます。証跡が配信されたS3バケットに対してAmazon Athenaでクエリを実行する構成です。

ユーザー利用状況の把握クエリ

2. デモアプリの人気度測定

「どのデモアプリが人気であるか」の指標はALBのアクセスログから取得することにしました。ALBがアクセスログをS3に保存する設定を有効化し、S3バケットにログを保存します。こちらも同様にそのS3バケットに対してAthenaでクエリを実行します。

デモアプリの人気度測定クエリ

実装時のポイント

1. Amazon Athenaのパーティション射影

Athenaでクエリを実行する際のメタデータ取得を高速化する機能です。

通常のパーティションだと、S3上のデータを日付などで分割して保存します(e.g. s3://bucket-name/logs/2025/11/06/data.log)。この方式には次のような課題があります:

  1. パーティション情報をAWS Glue Data Catalogに事前に登録しておく必要がある
  2. 日付でパーティションを行うので、毎日パーティション情報を登録する作業が発生する
  3. クエリ実行前にGlue Data Catalogからメタデータを取得する必要がある
  4. パーティション数が増えると、メタデータ取得にかかる時間が増える

これらの問題をパーティション射影を用いることによって解決できます。パーティションのパスをテーブル定義に基づいて自動的に生成できます。具体的には、WHERE句で指定された条件から直接S3パスを構築してくれます。これによって、以下の恩恵を受けることができます:

  1. 高速化: S3パス構築のためのメタデータ取得が不要になりGlue Data Catalogへの問い合わせがなくなる
  2. 運用効率化: パーティション情報の管理が必要なくなり登録作業も不要になる
  3. スケーラビリティ: パーティション数の増加によるクエリ時間の増加がなくなる

今回は、CloudTrailログとALBアクセスログの両方に対してパーティション射影を設定し、大量のログデータに対して高速なクエリ実行を実現しました。

2. クエリの並列実行による高速化

LambdaからAthenaを実行する際、合計で6つのクエリを行う必要があります。6つを直列で実行するとかなり時間がかかります。Pythonのboto3はクエリ実行(start_query_execution)とクエリ結果取得(get_query_results)が分離されています。この特性を利用して、以下のような並列実行を実装しました:

  1. 全てのクエリを一斉に実行開始
  2. 各クエリのステータスを定期的にポーリング
  3. 完了したものから順次結果を取得

これにより、最も遅いクエリの実行時間+若干のオーバーヘッドまで実行時間を短縮することができます。実際の実装を紹介します:

import time
from typing import Any
import boto3

session = boto3.Session()
athena_client = session.client("athena", region_name="ap-northeast-1")


def post_athena_queries(queries: dict[str, str], force: bool = False) -> dict[str, str]:
    """
    クエリを実行する。クエリ結果までは受け取らない。
    実行が開始されなかったクエリに関しては、クエリIDに空文字列が入る。
    Args:
        queries: クエリ名とクエリ文字列のmap
        force: Trueの場合、実行失敗時に例外を発生させる
    Returns:
        クエリ名とクエリIDのmap
    """
    results = dict()
    for query_name, query in queries.items():
        try:
            query_start_response = athena_client.start_query_execution(
                QueryString=query,
                QueryExecutionContext={
                    "Database": DATABASE,
                },
                ResultConfiguration={
                    "OutputLocation": f"{QUERY_OUTPUT_S3}/{query_name}",
                    "EncryptionConfiguration": {
                        "EncryptionOption": "SSE_S3",
                    }
                },
                WorkGroup=WORK_GROUP,
            )
        except Exception as e:
            print(f"Error in executing {query_name} query: {e}")
            results[query_name] = ""
            if force:
                raise Exception(f"Force stopped {query_name}")
        else:
            query_execution_id = query_start_response["QueryExecutionId"]
            print(f"Executing {query_name} query")
            print(f"The ID is {query_execution_id}")
            results[query_name] = query_execution_id
    return results


def get_athena_queries(queries: dict[str, str]) -> dict[str, list[Any]]:
    """
    クエリの実行結果を取得する。
    取得できなかったクエリの実行結果に関しては、空のリストになる。
    全体として RETRY_CNT の回数だけトライする。
    Args:
        queries: クエリ名とクエリIDのmap。クエリIDが空文字列の場合はスキップする。
    Returns:
        queries: クエリ名とクエリの実行結果。整形はされない。
    """
    results: dict[str, list[Any]] = dict()
    for i in range(1, 1 + RETRY_CNT):
        for query_name, query_id in queries.items():
            if query_name in results or query_id == "":
                continue
            try:
                query_check_response = athena_client.get_query_execution(
                    QueryExecutionId=query_id,
                )
            except Exception as e:
                print(f"Error in fetching {query_name} query result: {e}")
                results[query_name] = list()
            else:
                print("Fetched the status.")
                match query_check_response["QueryExecution"]["Status"]["State"]:
                    case "SUCCEEDED":
                        print("Status is SUCCEEDED.")
                        try:
                            query_result_response = athena_client.get_query_results(
                                QueryExecutionId=query_id,
                            )
                            result = query_result_response["ResultSet"]["Rows"][1:]
                        except Exception as e:
                            print(f"Getting Result is FAILED in {query_name}: {e}")
                            results[query_name] = list()
                        else:
                            print(f"Query {query_name} result is:\n{result}")
                            results[query_name] = result
                    case "FAILED":
                        print("Status is FAILED.")
                        results[query_name] = list()
                    case status:
                        print(f"Status is {status}. Retry after {i * 10} seconds.")
        if len(results) == len(queries):
            break
        else:
            time.sleep(i * 10)
    for query_name, query_id in queries.items():
        if query_name not in results and query_id != "":
            try:
                athena_client.stop_query_execution(
                    QueryExecutionId=query_id
                )
            except Exception as e:
                print(f"Query {query_name} did not stop.: {e}")
            else:
                print(f"Query {query_name} did not succeed in time.")
            results[query_name] = list()
    return results

3. ALBアクセスログのフォーマット変更に対する柔軟性

ALBのアクセスログはフォーマットが公開されていますが、AWSによって不定期に項目が追加されることがあります。このフォーマット変更の情報はドキュメントの変更履歴に記載されますが、既存のAthenaテーブル定義は自動的には更新されないため、手動で対応する必要があります。
実際に2025年8月に行われたフォーマットの変更によって、新しいフィールドが追加されました。この影響で、既存のテーブル定義では新しいログが正常にパースできず、クエリ結果が取得できなくなりました。
この問題への対応として、テーブル定義の最後に additional_column string というフィールドを追加し、将来的なフィールド追加にも柔軟に対応できるようにしました。この方法は、クラスメソッドさんの記事が参考になりました。素晴らしい記事をありがとうございます。

まとめ

本記事では、サンドボックス環境のKPIをどのように集計しているかを解説しました。
アーキテクチャを検討する流れとAmazon Athenaを使用するときのテクニックを重点的に解説しました。

さらにデモ環境の利用を促進するため、人気のアプリケーションの内容を解説する機能なども実装していきたいと考えています。

エクサウィザーズ Tech Blog

Discussion