🐻‍❄️

polarsのマルチプロセス化はコンテキスト(spawn)を指定しよう

2024/01/13に公開

TL;DR

polarsでマルチプロセス化する時はspawnを指定すること

マルチプロセス化したら動かなくなった

polarsでマルチプロセス化を行う処理を書いたところ
MacOSは動くが、Linuxは動かなくなる事象が発生しました。

使用したCSVはこの通りです。

sample.csv
code,name,age
0001,hoge,20
0002,fuga,10
0003,piyo,32
0004,foo,15
0005,bar,60

今回作成したソースコードは
CSV1列目のcodeに合致する箇所を個別に取得する処理です。

main.py
from concurrent.futures import ProcessPoolExecutor, as_completed

import polars as pl

TARGET_CODES = ["0001", "00002", "0005"]
CSV_PATH = "sample.csv"


def _filter(code: str, df: pl.DataFrame) -> None:
    df = df.filter(pl.col("code") == code)
    print(df)


def main() -> None:
    dtype = {"code": pl.String, "name": pl.String, "age": pl.Int64}
    df = pl.read_csv(CSV_PATH, dtypes=dtype)

    # マルチプロセスで起動
    with ProcessPoolExecutor() as executor:
        args = [(code, df) for code in TARGET_CODES]
        processes = {executor.submit(_filter, *arg) for arg in args}

        for p in as_completed(processes):
            p.result()


if __name__ == "__main__":
    main()

原因は子プロセスの作り方

調査したところ、公式Docsにマルチプロセスの説明がありました。

ChatGPTに要約してもらった結果がこちら。

重要なポイントとして、「fork」メソッドを使用すると、Polarsが提供するマルチスレッド機能と競合する可能性があるため、代わりに「spawn」または「forkserver」メソッドを使用するように推奨されています。これにより、マルチスレッド機能との競合を避けつつ、マルチプロセッシングが安全に行われます。
例外的なケースを除いては「spawn」メソッドが最も安全で推奨されると説明されています。

spawnで子プロセスを作成すれば良いとのこと。
先程のコードを直しました。

main.py
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import get_context

import polars as pl

TARGET_CODES = ["0001", "00002", "0005"]
CSV_PATH = "sample.csv"


def _filter(code: str, df: pl.DataFrame) -> None:
    df = df.filter(pl.col("code") == code)
    print(df)


def main() -> None:
    dtype = {"code": pl.String, "name": pl.String, "age": pl.Int64}
    df = pl.read_csv(CSV_PATH, dtypes=dtype)

    # マルチプロセスで起動
    # get_context追加
    with ProcessPoolExecutor(mp_context=get_context("spawn")) as executor:
        args = [(code, df) for code in TARGET_CODES]
        processes = {executor.submit(_filter, *arg) for arg in args}

        for p in as_completed(processes):
            p.result()


if __name__ == "__main__":
    main()

実行したところ、MacOSとLinuxで動作しました!

MacOSとLinuxでは子プロセスのデフォルトが違う

spawnを指定するとMacOS・Linuxともに動作しました。
何故MacOSとLinuxで挙動が変わったのでしょうか。
調査したところ、標準ライブラリの説明にありました。

multiprocessing コンテキストと開始

spawn

Available on POSIX and Windows platforms. The default on Windows and macOS.
POSIXおよびWindowsプラットフォームで利用可能です。WindowsおよびmacOSではデフォルトの設定です。

fork

Available on POSIX systems. Currently the default on POSIX except macOS.
POSIXシステムで利用可能です。現在、macOS以外のPOSIXシステムではデフォルトの設定です。

MacOSはspawn、Linuxだとforkがデフォルトでした。
そのため、Linuxだとデッドロックになり処理が止まってしまったようですね。

MacOSがspawnになった経緯も記述されていました。

On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess as macOS system libraries may start threads. See bpo-33725.
macOS では、 spawn 開始方式がデフォルトになりました。 fork 開始方法は、サブプロセスのクラッシュを引き起こす可能性があるため、安全ではありません。

python3.8で変更。
詳しいことはbpo-33725内で議論されています。
forkで開始するとシステムライブラリのスレッドを子プロセスも引き継ぐため、予期せぬ競合が発生するようです。
(詳しいことはもう少し調べて追記します…)

forkとspawnの違い

MacOSではspawn、Linuxではforkが作成されることがわかりました。
そもそもspawnforkは何が違うのでしょうか。

先程紹介したmultiprocessing コンテキストと開始にそれぞれの挙動についての話がありました。

spawn

親プロセスは新たに Python インタープリタープロセスを開始します。
プロセスはプロセスオブジェクトの run() メソッドの実行に必要なリソースのみ継承します。
親プロセスからの不要なファイル記述子とハンドルは継承されません。

fork

親プロセスは os.fork() を使用して Python インタープリターをフォークします。
子プロセスはそれが開始されるとき、事実上親プロセスと同一になります。
親プロセスのリソースはすべて子プロセスに継承されます。
マルチスレッドプロセスのフォークは安全性に問題があることに注意してください。

spawnは各リソースのインスタンスが生成され、forkは親プロセスのリソースをそのまま継承するみたいです。

forkだとリソースロックが起こりやすい

spawnとforkのプロセスについてなんとなく理解できました。
なぜ、forkで作成されたLinuxのマルチプロセスは動作しなかったのでしょうか。

こちらの記事がクリティカルな答えを出していました
この記事ではMatplotlibでマルチプロセス化する時に動作しなかった原因について記述されています。

https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/

この中のWhy my code was hangingの章に記述があります。

Resources that have been locked by threads in the parent process remain locked when you fork the process. However, the thread that holds the lock (and would eventually release the resource) is not transferred. Anything else that needs the resource is stuck waiting and the process hangs at waiter.acquire(). Using spawn creates of fresh instances of each resource so none are in a locked state.

ChatGPTに説明してもらいました。

親プロセスのスレッドによってロックされたリソースは、プロセスをフォークするときにも引き続きロックされたままになります。ただし、そのロックを保持しているスレッド(そして最終的にはリソースを解放するであろうスレッド)は、新しいプロセスには引き継がれません。これにより、他の部分でそのリソースが必要な場合、その処理は待ち続け、プロセスは waiter.acquire() のような箇所で停滞することがあります。
一方で、spawnを使用すると、各リソースについて新しいインスタンスが作成されます。その結果、どのリソースもロックされた状態ではなく、問題が発生しにくくなります。

forkだとロックされたリソースがそのまま引き継がれるため、子プロセス側でロック解放を待ち続けてしまうみたいです。
spawnだと前述の通りリソースは新しく作られるため、ロックされた状態が起きにくくなります。

紹介した記事はMatplotlibの事例でした。これはpolarsでも同様の問題が発生すると考えられます。
だからspawnにすることで解決できたと推定されます。

紹介した記事にspawnforkによる違いを検証したコードもあります。
気になる方はチェックしてみてください。

まとめ

polarsでマルチプロセス化する時はコンテキストにspawnを指定するようにしましょう。

最後になりますが、polars Docsに次のような記述もありました。

When not to use multiprocessing
Before we dive into the details, it is important to emphasize that Polars has been built from the start to use all your CPU cores.
that the multiprocessing module can improve your code performance in these cases.

ChatGPTによる要約

Polarsは元々CPUコアを最大限に活用するように設計されており、通常の操作ではPythonのMultiprocessingモジュールを使用してもパフォーマンスが向上しづらいです。

通常操作ではマルチプロセス化してもパフォーマンスが上がりにくいとのこと。
今回の例もパフォーマンスが上がってないのかも…。今度検証してみます。
他のシングルスレッドライブラリとかけ合わせる時に使用を検討すると良さそうです。

参考文献

GitHubで編集を提案

Discussion