【Python】Elasticsearch内に溜まった古いindexをまるっと削除するツールを作った
はじめに
この記事は ZOZO #2 Advent Calendar 2022 15日目の記事になります。
Elasticsearch内に溜まった古いindexをまるっと削除するツールを作ったので紹介します。
やりたいこと
以下の観点でindexを削除するツールを作りました。
- 作成してから〇〇日以上経過したindexを削除したい
- Aliasに紐づくindexは削除したくない
- その他削除したくないindexを個別指定できる
- 例)「test_index」は削除したくない
- ワイルドカードで削除するindexを制御
- 例)「hoge_*」と「fuga_*」に該当するindexを削除
結論:ツールの実行
最終的にツールを実行すると以下のようになります。
$ poetry run python ./src/main.py
================================
test-1
test-2
test-4
test-5
合計:4件
サイズ:8.12 GB
================================
Delete all indices? (Y/N): Y
=> Target indices have deleted in http_auth.
Exit this program.
ツールでは、削除可能なindex一覧や件数、サイズを表示します。
また、実行後すぐ削除されるわけではなく、「Y」と入力しないと削除されないので安心してください。
検討したこと
当初はツールよりもElasticsearch公式で提供されている機能でやりたかったことが実現できないか検討しました
ILM (Index Lifecycle Management)
作成してから一定期間が過ぎたindexを削除する機能は、公式で提供されています。
ILM:Index Lifecycle Managementと呼ばれています。
indexを作成する際に、ライフサイクルポリシーを設定してあげることで、例えば作成から30日間たったindexはhot
からwarm
へ、さらに時間がたちindexへのアクセスがなくなればcold
へ、そして最後はdelete
のフェーズで削除する。といったライフサイクルの設定ができます。
ILMでは実現できない
公式の機能を使うことで、安全にかつ明示的にindexを管理できるようになるため、当初は検討しましたが、aliasに紐づくindexやindex名が「hoge_*
」や「fuga_*
」といったパターンにマッチしたindexを削除するといった。設定が提供されていなかったため、ツールで実現することにしました。
Curator
他にもElastic社が提供しているCuratorという運用ツールでIndexやSnapshotsの管理ができます。
最近だとElasticsearch ILMにCuratorの機能が取って代わられていますが、より柔軟な管理が可能になります。
ちなみに、先日行われたElasticOn Tokyo 2022でCuratorの作者と会って実現可能か聞いてみましたがとりあえずGitHubのIssueに投げて欲しいと言われたので今度投げて見ます。
ツールの環境
ここからはツールの紹介をします。
想定する環境は以下の通りです。
- Elasticsearch v7.17
- Python v3.10
- elasticsearch 7.17.0
- python-decouple 3.6
接続するElasticsearchのバージョンによってPythonのelasticsearch
クライアントのバージョンも合わせる必要があるので適宜変更してください。
ファイル構成
最終的なファイル構成は以下のようになります。
.
|-- .gitignore
|-- README.md
|-- poetry.lock
|-- pyproject.toml
|-- src
| |-- .env
| |-- .env.bk
| `-- main.py
`-- tests
|-- __init__.py
`-- test.py
コードの説明
コードの全体像は./src/main.pyを参照してください。
メインの処理はmain()
に書いています。
def main():
# 準備
es: Elasticsearch | None = switch_es_client_by_env()
if es is None:
print("Please set enviroment variable APP_ENV.")
print("Exit this program.")
sys.exit(1)
target_indices: list[str] = config("TARGET_INDICES").split(",")
# 削除対象のindexを抽出
deletable_indices = select_deletable_indices(
es.indices.get(index=target_indices))
if not deletable_indices.keys():
print(f"Target index is not found in {config('APP_ENV')}.")
# 結果の表示
print("================================")
for index in deletable_indices:
print(index)
print(f"合計:{len(deletable_indices.keys())}件")
# 削除対象のindexサイズを取得
total_size_in_bytes: int = get_indices_size(
es, list(deletable_indices.keys()))
print(f"サイズ:{convert_size(total_size_in_bytes)}")
print("================================")
# 削除するかの確認
key_input: str = input("Delete all indices? (Y/N): ")
# 削除 or 終了
if key_input in ["Y", "y", "yes"]:
print(f"=> Target indices have deleted in {config('APP_ENV')}.")
es.indices.delete(index=list(deletable_indices.keys()))
print("Exit this program.")
ここでやっている処理の流れはこんな感じです
-
elasticsearch
クライアントを使ってElasticsearchと接続 - Elasticsearchからindexの情報一覧を取得
- 削除可能なindexを抽出
- 削除可能なindexの数とサイズを算出
- 削除するかの確認
- 削除 or 終了
以降では、ここで読んでいる関数についてそれぞれ簡単に解説します。
Elasticsearchへの接続
環境変数のAPP_ENV
の値によって接続するElasticsearchを切り替えています。
def switch_es_client_by_env() -> (Elasticsearch | None):
match config('APP_ENV'):
case "local":
return Elasticsearch(hosts=[config("ELASTICSEARCH_LOCAL_HOST")])
case "http_auth":
return Elasticsearch(
hosts=[config("ELASTICSEARCH_HOST")],
http_auth=(config("ELASTICSEARCH_ID"), config("ELASTICSEARCH_PASSWORD")))
case _:
return None
ここではlocal
だとローカル用のElasticsearchへ、http_auth
だと接続にIDとパスワードが必要なElasticsearchへ接続します。
削除可能なIndexを抽出
引数にes.indices.get(index=target_indices)
が渡されています。target_indices
は 環境変数TARGET_INDICES
で指定されたindex名(ワイルドカードを含む)になります。
例えば、TARGET_INDICES=hoge_*,fuga_*
が指定されていた場合、index名がhoge_*
とfuga_*
に該当するindex情報を取得してきます。
# 削除可能なindexを選択
def select_deletable_indices(indices):
return dict(filter(
lambda index: is_deletable_index(index[0], index[1]),
indices.items()))
この関数では、指定されたindex名のパターンに一致する条件の他に、次に紹介する関数で削除可能と判断されたindexを抽出しています。
削除可能な条件
引数に指定されたindex名とindexの情報をもとに削除可能なindexか判断します。
# 削除可能なindexか判定
def is_deletable_index(index_name: str, property) -> bool:
# 初期のindexとaliasに紐づくindexは除外
if index_name.startswith(".") or property["aliases"]:
return False
# その他、除外するindexがあればスキップ
if index_name in excluded_indicies:
return False
# 寿命を超えたindexが削除対象
return is_not_alive_index(property)
def is_not_alive_index(property) -> bool:
# datetimeでミリ秒のEpoch Timeが扱えないので秒に変換
formatted_epoch_time = int(property["settings"]
["index"]["creation_date"]) / 1000
creation_datetime = datetime.fromtimestamp(formatted_epoch_time)
# 経過した日数を算出
elapsed_days = (now - creation_datetime).days
# 寿命を超えていた場合
return int(config("MAX_INDEX_AGE_DAYS")) < elapsed_days
削除の条件は以下の通りです。
- aliasnに紐づくindexと初期から存在するindex(
.kibana_7.17.0_001
など)は削除しない - 環境変数
EXCLUDED_INDICIES
で指定されたindexは削除しない - 環境変数
MAX_INDEX_AGE_DAYS
で指定された日数を超えるindexを削除する
削除する全てのIndexのサイズを計測
indexのサイズはes.indicies.get
メソッドで取得できる情報になかったため、es.indicies.stats
メソッドを使っています。
indexのサイズが書かれたオブジェクトはかなり階層が深く_all.total.store.size_in_bytes
にありました。
この関数は、削除可能なindex全てのサイズを取得して、その総和を返します。
# 指定された複数のindexサイズを取得
def get_indices_size(es: Elasticsearch, indices: list[str]) -> int:
total_size_in_bytes = 0
for i in range(0, len(indices), 100):
# 大量のindexを指定するとサイズオーバーでリクエストできないので分ける
indices_stats = es.indices.stats(
index=indices[i: i + 100], filter_path=['_all'])
total_size_in_bytes += indices_stats['_all']['total']['store']['size_in_bytes']
return total_size_in_bytes
サイズを単位に変換
1000Bを1KBや1000000を1MBに変換する関数も作成しました。
# bytesを適切なサイズ表記に変換
def convert_size(size: int) -> str:
units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB")
i = math.floor(math.log(size, 1024)) if size > 0 else 0
size = round(size / 1024 ** i, 2)
return f"{size} {units[i]}"
インストール
ツールのインストールについて紹介します。
poetryで管理しているので無い方は下記記事が参考になると思います
下記コマンドを実行してリポジトリーをクローンして依存パッケージをインストールします。
$ git clone https://github.com/sattosan/old_index_removal_tool.git
$ cd old_index_removal_tool
$ poetry install
実行
poetryコマンドでmain.py
を実行すると削除可能なindex一覧が表示されるので、
削除する場合はYをその後入力。やっぱり辞める場合はNやその他文字を入力すると取りやめます。
$ poetry run python ./src/main.py
================================
test-1
test-2
test-4
test-5
合計:4件
サイズ:8.12 GB
================================
Delete all indices? (Y/N): Y
=> Target indices have deleted in http_auth.
Exit this program.
件数以外にも、indexのサイズも表示します。意外と放置していたindexをかき集めると結構なサイズになったりするので、定期的に消したいですね。
今後の展望
例えばこのツールをクーロンで実行できるようにすると自動で定期的にお掃除してくれたり、AWSのLambdaとSlackを連携してSlackのコマンドで削除したりなど色々応用はできそうです。
また、Curatorでできれば管理したいのでIssueを挙げて公式機能で実現できるとわかればまた記事を書きたいと思います。
おわりに
古いindexを削除するツールを紹介しました。
本来公式が提供するILMで管理したかったのですが、調べた限りやりたいことができなかったので暫定対応としてツールを作成しました。(このやり方でできるよ!などご知見あるかたいましたら教えてください。)
ElasticOn Tokyo 2022の話ではILMの話もあったので、今後機能がより拡充され、より使いやすくなることを願っています。
Discussion