🗑️

【Python】Elasticsearch内に溜まった古いindexをまるっと削除するツールを作った

2022/12/14に公開約7,900字

はじめに

この記事は ZOZO #2 Advent Calendar 2022 15日目の記事になります。

Elasticsearch内に溜まった古いindexをまるっと削除するツールを作ったので紹介します。
https://github.com/sattosan/old_index_removal_tool

やりたいこと

以下の観点でindexを削除するツールを作りました。

  • 作成してから〇〇日以上経過したindexを削除したい
  • Aliasに紐づくindexは削除したくない
  • その他削除したくないindexを個別指定できる
    • 例)「test_index」は削除したくない
  • ワイルドカードで削除するindexを制御
    • 例)「hoge_*」と「fuga_*」に該当するindexを削除

結論:ツールの実行

最終的にツールを実行すると以下のようになります。

$ poetry run python ./src/main.py
================================
test-1
test-2
test-4
test-5
合計:4件
サイズ:8.12 GB
================================
Delete all indices? (Y/N): Y
=> Target indices have deleted in http_auth.
Exit this program.

ツールでは、削除可能なindex一覧や件数、サイズを表示します。
また、実行後すぐ削除されるわけではなく、「Y」と入力しないと削除されないので安心してください。

検討したこと

当初はツールよりもElasticsearch公式で提供されている機能でやりたかったことが実現できないか検討しました

ILM (Index Lifecycle Management)

作成してから一定期間が過ぎたindexを削除する機能は、公式で提供されています。
ILM:Index Lifecycle Managementと呼ばれています。
https://www.elastic.co/guide/en/elasticsearch/reference/current/index-lifecycle-management.html

indexを作成する際に、ライフサイクルポリシーを設定してあげることで、例えば作成から30日間たったindexはhotからwarmへ、さらに時間がたちindexへのアクセスがなくなればcoldへ、そして最後はdeleteのフェーズで削除する。といったライフサイクルの設定ができます。

ILMでは実現できない

公式の機能を使うことで、安全にかつ明示的にindexを管理できるようになるため、当初は検討しましたが、aliasに紐づくindexやindex名が「hoge_*」や「fuga_*」といったパターンにマッチしたindexを削除するといった。設定が提供されていなかったため、ツールで実現することにしました。

Curator

他にもElastic社が提供しているCuratorという運用ツールでIndexやSnapshotsの管理ができます。
https://curator.readthedocs.io/en/latest/

最近だとElasticsearch ILMにCuratorの機能が取って代わられていますが、より柔軟な管理が可能になります。

ちなみに、先日行われたElasticOn Tokyo 2022でCuratorの作者と会って実現可能か聞いてみましたがとりあえずGitHubのIssueに投げて欲しいと言われたので今度投げて見ます。
https://www.elasticon.com/event/d10b9524-5bd9-4355-aa2e-f01b63580506

ツールの環境

ここからはツールの紹介をします。
想定する環境は以下の通りです。

  • Elasticsearch v7.17
  • Python v3.10
    • elasticsearch 7.17.0
    • python-decouple 3.6

接続するElasticsearchのバージョンによってPythonのelasticsearchクライアントのバージョンも合わせる必要があるので適宜変更してください。

ファイル構成

最終的なファイル構成は以下のようになります。

.
|-- .gitignore
|-- README.md
|-- poetry.lock
|-- pyproject.toml
|-- src
|   |-- .env
|   |-- .env.bk
|   `-- main.py
`-- tests
    |-- __init__.py
    `-- test.py

コードの説明

コードの全体像は./src/main.pyを参照してください。

メインの処理はmain()に書いています。

./src/main.py
def main():
    # 準備
    es: Elasticsearch | None = switch_es_client_by_env()
    if es is None:
        print("Please set enviroment variable APP_ENV.")
        print("Exit this program.")
        sys.exit(1)

    target_indices: list[str] = config("TARGET_INDICES").split(",")

    # 削除対象のindexを抽出
    deletable_indices = select_deletable_indices(
        es.indices.get(index=target_indices))

    if not deletable_indices.keys():
        print(f"Target index is not found in {config('APP_ENV')}.")

    # 結果の表示
    print("================================")
    for index in deletable_indices:
        print(index)
    print(f"合計:{len(deletable_indices.keys())}件")

    # 削除対象のindexサイズを取得
    total_size_in_bytes: int = get_indices_size(
        es, list(deletable_indices.keys()))
    print(f"サイズ:{convert_size(total_size_in_bytes)}")
    print("================================")

    # 削除するかの確認
    key_input: str = input("Delete all indices? (Y/N): ")

    # 削除 or 終了
    if key_input in ["Y", "y", "yes"]:
        print(f"=> Target indices have deleted in {config('APP_ENV')}.")
        es.indices.delete(index=list(deletable_indices.keys()))
    print("Exit this program.")

ここでやっている処理の流れはこんな感じです

  1. elasticsearchクライアントを使ってElasticsearchと接続
  2. Elasticsearchからindexの情報一覧を取得
  3. 削除可能なindexを抽出
  4. 削除可能なindexの数とサイズを算出
  5. 削除するかの確認
  6. 削除 or 終了

以降では、ここで読んでいる関数についてそれぞれ簡単に解説します。

Elasticsearchへの接続

環境変数のAPP_ENVの値によって接続するElasticsearchを切り替えています。

def switch_es_client_by_env() -> (Elasticsearch | None):
    match config('APP_ENV'):
        case "local":
            return Elasticsearch(hosts=[config("ELASTICSEARCH_LOCAL_HOST")])
        case "http_auth":
            return Elasticsearch(
                hosts=[config("ELASTICSEARCH_HOST")],
                http_auth=(config("ELASTICSEARCH_ID"), config("ELASTICSEARCH_PASSWORD")))
        case _:
            return None

ここではlocalだとローカル用のElasticsearchへ、http_authだと接続にIDとパスワードが必要なElasticsearchへ接続します。

削除可能なIndexを抽出

引数にes.indices.get(index=target_indices)が渡されています。target_indicesは 環境変数TARGET_INDICESで指定されたindex名(ワイルドカードを含む)になります。

例えば、TARGET_INDICES=hoge_*,fuga_*が指定されていた場合、index名がhoge_*fuga_*に該当するindex情報を取得してきます。

# 削除可能なindexを選択
def select_deletable_indices(indices):
    return dict(filter(
        lambda index: is_deletable_index(index[0], index[1]),
        indices.items()))

この関数では、指定されたindex名のパターンに一致する条件の他に、次に紹介する関数で削除可能と判断されたindexを抽出しています。

削除可能な条件

引数に指定されたindex名とindexの情報をもとに削除可能なindexか判断します。

# 削除可能なindexか判定
def is_deletable_index(index_name: str, property) -> bool:
    # 初期のindexとaliasに紐づくindexは除外
    if index_name.startswith(".") or property["aliases"]:
        return False

    # その他、除外するindexがあればスキップ
    if index_name in excluded_indicies:
        return False

    # 寿命を超えたindexが削除対象
    return is_not_alive_index(property)
    
def is_not_alive_index(property) -> bool:
    # datetimeでミリ秒のEpoch Timeが扱えないので秒に変換
    formatted_epoch_time = int(property["settings"]
                               ["index"]["creation_date"]) / 1000
    creation_datetime = datetime.fromtimestamp(formatted_epoch_time)
    # 経過した日数を算出
    elapsed_days = (now - creation_datetime).days

    # 寿命を超えていた場合
    return int(config("MAX_INDEX_AGE_DAYS")) < elapsed_days

削除の条件は以下の通りです。

  • aliasnに紐づくindexと初期から存在するindex(.kibana_7.17.0_001など)は削除しない
  • 環境変数EXCLUDED_INDICIESで指定されたindexは削除しない
  • 環境変数MAX_INDEX_AGE_DAYSで指定された日数を超えるindexを削除する

削除する全てのIndexのサイズを計測

indexのサイズはes.indicies.getメソッドで取得できる情報になかったため、es.indicies.statsメソッドを使っています。

indexのサイズが書かれたオブジェクトはかなり階層が深く_all.total.store.size_in_bytesにありました。

この関数は、削除可能なindex全てのサイズを取得して、その総和を返します。

# 指定された複数のindexサイズを取得
def get_indices_size(es: Elasticsearch, indices: list[str]) -> int:
    total_size_in_bytes = 0
    for i in range(0, len(indices), 100):
        # 大量のindexを指定するとサイズオーバーでリクエストできないので分ける
        indices_stats = es.indices.stats(
            index=indices[i: i + 100], filter_path=['_all'])
        total_size_in_bytes += indices_stats['_all']['total']['store']['size_in_bytes']

    return total_size_in_bytes

サイズを単位に変換

1000Bを1KBや1000000を1MBに変換する関数も作成しました。

# bytesを適切なサイズ表記に変換
def convert_size(size: int) -> str:
    units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB")
    i = math.floor(math.log(size, 1024)) if size > 0 else 0
    size = round(size / 1024 ** i, 2)

    return f"{size} {units[i]}"

インストール

ツールのインストールについて紹介します。
poetryで管理しているので無い方は下記記事が参考になると思います
https://qiita.com/ksato9700/items/b893cf1db83605898d8a

下記コマンドを実行してリポジトリーをクローンして依存パッケージをインストールします。

$ git clone https://github.com/sattosan/old_index_removal_tool.git
$ cd old_index_removal_tool
$ poetry install

実行

poetryコマンドでmain.pyを実行すると削除可能なindex一覧が表示されるので、
削除する場合はYをその後入力。やっぱり辞める場合はNやその他文字を入力すると取りやめます。

$ poetry run python ./src/main.py
================================
test-1
test-2
test-4
test-5
合計:4件
サイズ:8.12 GB
================================
Delete all indices? (Y/N): Y
=> Target indices have deleted in http_auth.
Exit this program.

件数以外にも、indexのサイズも表示します。意外と放置していたindexをかき集めると結構なサイズになったりするので、定期的に消したいですね。

今後の展望

例えばこのツールをクーロンで実行できるようにすると自動で定期的にお掃除してくれたり、AWSのLambdaとSlackを連携してSlackのコマンドで削除したりなど色々応用はできそうです。

また、Curatorでできれば管理したいのでIssueを挙げて公式機能で実現できるとわかればまた記事を書きたいと思います。

おわりに

古いindexを削除するツールを紹介しました。

本来公式が提供するILMで管理したかったのですが、調べた限りやりたいことができなかったので暫定対応としてツールを作成しました。(このやり方でできるよ!などご知見あるかたいましたら教えてください。)

ElasticOn Tokyo 2022の話ではILMの話もあったので、今後機能がより拡充され、より使いやすくなることを願っています。

Discussion

ログインするとコメントできます