🐡

ローカルのElasticSearchでAWS ALB(Amazon Application Load Bのログを分析できるようにする

2023/06/03に公開

本番でissueが発生した場合、AWS ALB(Amazon Application Load Balancer)のログ分析のニーズがあると思います。本記事では、ローカル環境のElasticSearchにログを投入する方法を紹介します。ローカル実行のため、コストを気にする必要もありません。

ElasticSearch投入後は、Kibana DevToolsなどで、任意のクエリを発行してissueの対象となるリクエストを絞り込むことを想定しています。

Amazon Athenaでも同様のことが実施できますが、ElasticSearchの方が汎用的な技術スタックなので、応用が効くと思います。

https://zenn.dev/toshiro3/articles/63ae1f6a75b925

  • 該当のツールのソースコード

https://github.com/Thirosue/tools/tree/main/alb-log-importer-to-es

前提

DockerPython3がインストールされている前提とします。

ElasticSearchを起動

docker-composeでサクッと立ち上げましょう。

8系の定義が(少し)面倒だったので、7系にしています。

$ docker-compose up -d

しばらくすると、Kibanaにアクセスできるはずなので、以下URLにブラウザでアクセスしてみましょう。

http://localhost:5601/app/dev_tools#/console

ツールのセットアップ

  • git clone して、ツールをダウンロードします。
git clone git@github.com:Thirosue/tools.git
  • ツールディレクトリまで移動
cd tools/alb-log-importer-to-es/
  • 必要なモジュール(elasticsearch, pandas)をインストール
pip3 install -r requirements.txt
インストール ログ
$ pip3 install -r requirements.txt
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
Requirement already satisfied: elasticsearch<8.0.0,>=7.0.0 in /opt/homebrew/lib/python3.11/site-packages (from -r requirements.txt (line 1)) (7.17.9)
Requirement already satisfied: pandas in /opt/homebrew/lib/python3.11/site-packages (from -r requirements.txt (line 2)) (2.0.2)
Requirement already satisfied: urllib3<2,>=1.21.1 in /opt/homebrew/lib/python3.11/site-packages (from elasticsearch<8.0.0,>=7.0.0->-r requirements.txt (line 1)) (1.26.16)
Requirement already satisfied: certifi in /opt/homebrew/lib/python3.11/site-packages (from elasticsearch<8.0.0,>=7.0.0->-r requirements.txt (line 1)) (2023.5.7)
Requirement already satisfied: python-dateutil>=2.8.2 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2023.3)
Requirement already satisfied: tzdata>=2022.1 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2023.3)
Requirement already satisfied: numpy>=1.21.0 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (1.24.3)
Requirement already satisfied: six>=1.5 in /opt/homebrew/lib/python3.11/site-packages (from python-dateutil>=2.8.2->pandas->-r requirements.txt (line 2)) (1.16.0)
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'

ElasticSearchへ投入

取り込み対象のファイルをツールディレクトリに配置し、以下コマンドを実行します。
targetFile=${取り込み対象のファイル名}を指定して、ツールを実行します。

  • 投入コマンド
% targetFile=alb.tsv python3 app.py
Indexed 1000 documents
Indexed 1000 documents
Indexed 196 documents
{'statusCode': 200, 'body': '{"message": "alb created"}'}
ツールのソースコード

1000行ごとに分割して、bulk insertしています。

app.py
import os
import json
import time
import pandas as pd
import itertools
from elasticsearch import Elasticsearch, helpers

import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

ES_HOST = os.getenv("hostName", 'localhost')
TARGET_FILE = os.getenv(
    "targetFile", 'alb.tsv')


def lambda_handler(event, context):  ## lambdaをベースにしたが、特に意味はなし...
    def chunked_iterable(iterable, chunk_size):
        it = iter(iterable)
        while True:
            chunk = tuple(itertools.islice(it, chunk_size))
            if not chunk:
                return
            yield chunk

    client = Elasticsearch(
        [ES_HOST],
        scheme="http",
        port=9200,
    )

    csv_file = event['target_file'] if 'target_file' in event else TARGET_FILE
    filename = f"./{csv_file}"
    _csv = os.path.basename(filename)
    index_name, _ = os.path.splitext(_csv)  ## ファイル名のプレフィックスをインデックス名にする

    df = pd.read_csv(filename, sep='\t') ## タブ区切りで読み込む
    data_json = df.to_json(orient='records')

    data = json.loads(data_json)

    chunk_size = 1000 ## 1000行ごとチャンクする

    for chunk in chunked_iterable(data, chunk_size):
        actions = [
            {
                "_index": index_name,
                "_source": record,
            }
            for record in chunk
        ]

        helpers.bulk(client, actions)  ## bulk insert
        time.sleep(1)
        print(f'Indexed {len(actions)} documents')

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": f'{index_name} created',
        }),
    }


if __name__ == '__main__':
    event = {}
    res = lambda_handler(event=event, context=None)

    print(res)

Kibanaでリクエストを解析する

処理時間の掛かっているリクエストを抽出/集約などして、問題の原因となるリクエストを特定しましょう。

10秒以上処理時間が掛かっているリクエストを抽出
GET alb/_search
{
  "query": {
    "range": {
      "target_processing_time": {
        "gte": 10
      }
    }
  },
  "_source": [
    "client:port",
    "request",
    "target_processing_time"
  ],
  "sort": [
    {
      "target_processing_time": {
        "order": "desc"
      }
    }
  ]
}

Discussion