🐡
ローカルのElasticSearchでAWS ALB(Amazon Application Load Bのログを分析できるようにする
本番でissueが発生した場合、AWS ALB(Amazon Application Load Balancer)のログ分析のニーズがあると思います。本記事では、ローカル環境のElasticSearchにログを投入する方法を紹介します。ローカル実行のため、コストを気にする必要もありません。
ElasticSearch投入後は、Kibana DevToolsなどで、任意のクエリを発行してissueの対象となるリクエストを絞り込むことを想定しています。
Amazon Athenaでも同様のことが実施できますが、ElasticSearchの方が汎用的な技術スタックなので、応用が効くと思います。
- 該当のツールのソースコード
前提
Docker
とPython3
がインストールされている前提とします。
ElasticSearchを起動
docker-compose
でサクッと立ち上げましょう。
8系の定義が(少し)面倒だったので、7系にしています。
$ docker-compose up -d
しばらくすると、Kibanaにアクセスできるはずなので、以下URLにブラウザでアクセスしてみましょう。
ツールのセットアップ
- git clone して、ツールをダウンロードします。
git clone git@github.com:Thirosue/tools.git
- ツールディレクトリまで移動
cd tools/alb-log-importer-to-es/
- 必要なモジュール(elasticsearch, pandas)をインストール
pip3 install -r requirements.txt
インストール ログ
$ pip3 install -r requirements.txt
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
Requirement already satisfied: elasticsearch<8.0.0,>=7.0.0 in /opt/homebrew/lib/python3.11/site-packages (from -r requirements.txt (line 1)) (7.17.9)
Requirement already satisfied: pandas in /opt/homebrew/lib/python3.11/site-packages (from -r requirements.txt (line 2)) (2.0.2)
Requirement already satisfied: urllib3<2,>=1.21.1 in /opt/homebrew/lib/python3.11/site-packages (from elasticsearch<8.0.0,>=7.0.0->-r requirements.txt (line 1)) (1.26.16)
Requirement already satisfied: certifi in /opt/homebrew/lib/python3.11/site-packages (from elasticsearch<8.0.0,>=7.0.0->-r requirements.txt (line 1)) (2023.5.7)
Requirement already satisfied: python-dateutil>=2.8.2 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2023.3)
Requirement already satisfied: tzdata>=2022.1 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (2023.3)
Requirement already satisfied: numpy>=1.21.0 in /opt/homebrew/lib/python3.11/site-packages (from pandas->-r requirements.txt (line 2)) (1.24.3)
Requirement already satisfied: six>=1.5 in /opt/homebrew/lib/python3.11/site-packages (from python-dateutil>=2.8.2->pandas->-r requirements.txt (line 2)) (1.16.0)
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/numpy-1.24.3-py3.11.egg-info due to invalid metadata entry 'name'
ElasticSearchへ投入
取り込み対象のファイルをツールディレクトリに配置し、以下コマンドを実行します。
targetFile=${取り込み対象のファイル名}
を指定して、ツールを実行します。
- 投入コマンド
% targetFile=alb.tsv python3 app.py
Indexed 1000 documents
Indexed 1000 documents
Indexed 196 documents
{'statusCode': 200, 'body': '{"message": "alb created"}'}
ツールのソースコード
1000行ごとに分割して、bulk insertしています。
app.py
import os
import json
import time
import pandas as pd
import itertools
from elasticsearch import Elasticsearch, helpers
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
ES_HOST = os.getenv("hostName", 'localhost')
TARGET_FILE = os.getenv(
"targetFile", 'alb.tsv')
def lambda_handler(event, context): ## lambdaをベースにしたが、特に意味はなし...
def chunked_iterable(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, chunk_size))
if not chunk:
return
yield chunk
client = Elasticsearch(
[ES_HOST],
scheme="http",
port=9200,
)
csv_file = event['target_file'] if 'target_file' in event else TARGET_FILE
filename = f"./{csv_file}"
_csv = os.path.basename(filename)
index_name, _ = os.path.splitext(_csv) ## ファイル名のプレフィックスをインデックス名にする
df = pd.read_csv(filename, sep='\t') ## タブ区切りで読み込む
data_json = df.to_json(orient='records')
data = json.loads(data_json)
chunk_size = 1000 ## 1000行ごとチャンクする
for chunk in chunked_iterable(data, chunk_size):
actions = [
{
"_index": index_name,
"_source": record,
}
for record in chunk
]
helpers.bulk(client, actions) ## bulk insert
time.sleep(1)
print(f'Indexed {len(actions)} documents')
return {
"statusCode": 200,
"body": json.dumps({
"message": f'{index_name} created',
}),
}
if __name__ == '__main__':
event = {}
res = lambda_handler(event=event, context=None)
print(res)
Kibanaでリクエストを解析する
処理時間の掛かっているリクエストを抽出/集約などして、問題の原因となるリクエストを特定しましょう。
10秒以上処理時間が掛かっているリクエストを抽出
GET alb/_search
{
"query": {
"range": {
"target_processing_time": {
"gte": 10
}
}
},
"_source": [
"client:port",
"request",
"target_processing_time"
],
"sort": [
{
"target_processing_time": {
"order": "desc"
}
}
]
}
Discussion