📘

複数の実行中プロセスをテキストファイルで簡易的に把握するPythonのデコレータ

2021/05/01に公開3

例えば複数のジョブをサーバに投げている時、いま自分がどのジョブを投げているのかを忘れてしまうことがある。
自分は深層学習の実験用スクリプトを投げた時に、どの実験を投げているのか結構忘れる。

もちろんmlflowなどの実験管理ツールを使えば、どのモデルの学習がどれくらい進んでいるかはある程度把握できるのだけど、「学習用スクリプトをどんなコマンドでいつどのサーバで投げたか」あたりの情報はあんまり管理できない(たぶん)。
あとローカルサーバでいいとはいえ、確認のために何か立ち上げたりするのは面倒。

なので、学習用スクリプトが投げられた時に、main()をラップすることで、どのプロセスがいつどう立ち上がったかを把握できるデコレータを書いた。

以下のような感じで使える、memoize_runnnig_processesが今回作ったデコレータ。
memoizeという名前がいいのかは微妙。

hoge.py
@memoize_runnnig_processes
def main():
    exp = HogeExperiment()
    exp.run()
    
if __name__ == "__main__":
    main()

これをやると、カレントディレクトリに.runnning-processes.txtというテキストファイルに実行中プロセスの情報を書き込む、その中身はこんな感じになる。

.runnning-processes.txt
server1	2021年5月1日 3時17分36.110417秒	hoge.py arg1
server2	2021年5月1日 9時52分0.868838秒	huga.py arg1 arg2 arg3

状態の管理は簡易的にテキストファイルでやることにした。
mmapとかいろいろな手段があると思うけど、NFS上の複数サーバ構成とかも統一的に扱いたい場合はファイルベースでやり取りするのがいいんじゃないかという気がする。

コードは以下の通り、普通のデコレータ。
排他ロックをとって、テキストファイルの各行を生きているプロセスに対応させている。Pythonのfinallyで、途中で投げているスクリプトが死んでも、きっちりテキストファイルの該当行を消してから死ぬようにした。

memoize_runnnig_processes.py
import os, sys, fcntl
from datetime import datetime
from pathlib import Path


def memoize_runnnig_processes(func):
    memory = Path("./.runnning-processes.txt")
    memory.touch()

    nodename = os.uname().nodename
    now = datetime.today().strftime('%Y年%-m月%-d日 %-H時%-M分%-S.%f秒')
    args = " ".join(sys.argv)
    l = f"{nodename}\t{now}\t{args}"

    def inner(*args, **kwargs):
        # add process status
        try:
            with memory.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.add(l)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

            return func(*args, **kwargs)

        # remove process status
        finally:
            with memory.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.discard(l)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

    return inner

.runnning-processes.txtは事前に作っておいてもいいけど、touchすればファイルがない場合は作成、みたいな挙動を実現できて便利。

発展として、Pythonのデコレータで書くんじゃなくてコマンドを作っちゃうみたいな方針もあると思う。
memo python hoge.pyみたいな感じで動かせるやつ。時間あったらやろうと思う。

もっといいやり方もあると思うので、何かあったら教えてください🙏

Discussion

hpphpp

追記:

この記事に書いてあるデコレータの発展版を、いまの自分の環境で使っている。厳密にはちょっとだけ記事投稿時からアップデートしてある。以下に実際のコードを示す。
processesというディレクトリに実行中プロセス/実行が終了したプロセスを記録しつつ、killコマンドなどにも対応するようにしている。
DEADBEATS は自作のslack API bindingライブラリのもの、プロセスの生死をslackに通知するようにしているだけなので、特に複雑なことをやっているわけではない。

advaced.py
import collections, os, sys, fcntl
from datetime import datetime
from pathlib import Path
from deadbeats import DEADBEATS
import signal

def memoize_runnnig_processes(func):
    save_dir = Path("./processes")
    save_dir.mkdir(exist_ok=True, parents=True)

    running = save_dir / "runnning.txt"
    running.touch()

    finished = save_dir / "finished.txt"
    finished.touch()

    def inner(*args, **kwargs):
        nodename = os.uname().nodename
        pid = os.getpid()
        start_time = datetime.today().strftime('%Y年%-m月%-d日 %-H時%-M分%-S.%f秒')
        sys_args = " ".join(sys.argv)
        l = f"{nodename}\t{pid}\t{start_time}\t{sys_args}"

        DEADBEATS.start_thread(
            nodename=nodename,
            start_time=start_time,
            sys_args=sys_args,
        )
        err = None

        def cleanup(l):
            with running.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.discard(l)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

            end_time = datetime.today().strftime('%Y年%-m月%-d日 %-H時%-M分%-S.%f秒')
            l = f"{nodename}\t{start_time}\t{end_time}\t{sys_args}"
            if err is not None: l += f"\t{err}"

            with finished.open("a") as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                f.write(l + "\n")
                f.flush()

            DEADBEATS.ping(end_time=end_time)

        signal.signal(signal.SIGINT, lambda: cleanup(l))
        signal.signal(signal.SIGTERM, lambda: cleanup(l))

        # add process status
        try:
            with running.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.add(l)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

            return func(*args, **kwargs)

        except Exception as e:
            DEADBEATS.ping(exception=e)
            err = str(e)

        # remove process status
        finally:
            cleanup(l)


    return inner
hpphpp
import os, sys, fcntl
from datetime import datetime
from pathlib import Path
import signal


def memoize_runnnig_processes(func):
    save_dir = Path("./processes")
    save_dir.mkdir(exist_ok=True, parents=True)

    running = save_dir / "runnning.txt"
    running.touch()

    successed = save_dir / "successed.txt"
    successed.touch()

    failed = save_dir / "failed.txt"
    failed.touch()

    def inner(*args, **kwargs):
        nodename = os.uname().nodename
        pid = os.getpid()
        start_time = datetime.today().strftime("%Y年%-m月%-d日 %-H時%-M分%-S.%f秒")
        sys_args = " ".join(sys.argv)
        err = []

        def cleanup(start_line):
            with running.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.discard(start_line)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

            end_time = datetime.today().strftime("%Y年%-m月%-d日 %-H時%-M分%-S.%f秒")
            end_line = f"{nodename}\t{start_time}\t{end_time}\t{sys_args}"

            if err:
                end_line += f"\t{err[0]}"
                file = failed
            else:
                file = successed

            with file.open("a") as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                f.write(end_line + "\n")
                f.flush()

        start_line = f"{nodename}\t{pid}\t{start_time}\t{sys_args}"

        signal.signal(signal.SIGINT, lambda: cleanup(start_line))
        signal.signal(signal.SIGTERM, lambda: cleanup(start_line))

        # add process status
        try:
            with running.open("r+") as f:
                fcntl.flock(f, fcntl.LOCK_EX)

                lines = set(line.strip() for line in f.readlines() if line.strip())
                lines.add(start_line)
                lines = sorted(lines)

                f.truncate(0)
                f.seek(os.SEEK_SET)
                f.write("\n".join(lines))
                f.flush()

            return func(*args, **kwargs)

        except BaseException as e:
            ename = type(e).__name__
            err.append(f"{ename}: {e}")

        # remove process status
        finally:
            cleanup(start_line)

    return inner

エラーが発生したら別ファイルに書き出すことにした、これで死んだプロセスがよりわかりやすくなった。

hpphpp

自分はhydraと一緒に使っているので、こんな感じの見た目になります。

import hydra
from omegaconf import DictConfig
from src.utils import memoize_runnnig_processes

@memoize_runnnig_processes
@hydra.main(config_path="configs/", config_name="config.yaml")
def main(config: DictConfig) -> None:
    // do something


if __name__ == '__main__':
    main()