Open18

Pythonの並列化処理

shimakaze_softshimakaze_soft

プロセスは、1つ以上のスレッドとその実行に必要なリソース群(CPU cores, network, file pointers etc...)、をひとまとめにしているもの。

Python ではこのリソース群のことをGILと呼ぶ。

  • Global Interpreter Lock

1つのプロセスは複数のスレッドを保持することができるが、GILはたった1つしか持つことができない。

Pythonではスレッドが実行される際、そのスレッドがGILを所有しているかがチェックされる。

そのため、1つのプロセスで1度に実行できるスレッドは常に1つだけになる。
また CPU の1つのコアで1度に実行できるスレッドも1つだけです。

どんなにコア数が多いCPUを使用していても、プロセスが1つだけしか動いていない状況下では、Pythonのプログラムで動作しているコアは1つだけということになる。

すなわち、Python におけるマルチスレッドというのは、GILを保有しているスレッドがGILを手放し、それを受け取った他のスレッドが実行され、そのスレッドがまた GIL を手放して...

ということを繰り返す。

これらのGIL の受け渡しによって制御、実現されている。
(Dining philosophers problem - 食事する哲学者の問題 - を解決している)

https://www.hos.co.jp/blog/20150202/

https://qiita.com/vkgtaro/items/09749f003eed534fded6

shimakaze_softshimakaze_soft

マルチスレッドは意味ないのか?

意味を持つときもあるが、意味がないどころか逆効果になることさえある。

Q. Python においてマルチスレッドが意味を成すのはどんな時なのか?

ある1つのスレッドにおいて (CPU の動作速度からすれば) 莫大な待ち時間が発生する場合です。

例えば、ユーザーからの入力を受け付けるファイルにアクセスする、等を含む処理を実行する場合。

このような場合、その時間のかかる操作を待つ間にGIL を一旦開放し、他のスレッドを動作させ、時間がかかる操作が終了した時点で GIL を返して処理を続けることで、シングルスレッド実行では延々と待っていなければならない時間を有効活用できる。

Python におけるスレッド活用法は、「待ち時間を減らす」に尽きる。

そのため、待ち時間がほぼ発生しないような複雑な計算処理を実行するだけの場合、いくら複数のスレッドに分割してもGILの受け渡し等に関わるオーバーヘッドが生じる分、かえって全体の処理速度が落ちてしまう

shimakaze_softshimakaze_soft

百聞は一見に如かずということで実際に動かして見てみる。

sample.py
import time
from threading import Thread

def ask_user():
    start = time.time()
    user_input = input('input name : ')

    print(f'Hello, {user_input}!')
    print(f'ask_user, {time.time() - start}')


def complex_calculation():
    start = time.time()

    print("Start calculating ...")
    [x**2 for x in range(30000000)]
    print(f"complex_calculation, {time.time() - start}")


# シングルスレッドで実行
start = time.time()

ask_user()
complex_calculation()

print(f"Single thread total time, {time.time() - start}\n")

sample.pyを実行しみた結果はこちら。

$ python sample.py
input name : shimakaze
Hello, shimakaze!
ask_user, 1.5849285125732422
Start calculating ...
complex_calculation, 2.773388385772705
Single thread total time, 4.3588409423828125

続けて複数のスレッドで動作させる。

# 複数のスレッドで動作
thread1 = Thread(target=complex_calculation)
thread2 = Thread(target=ask_user)

start = time.time()

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Two threads total time, {time.time() - start}")

実行結果。

$ python sample.py
Start calculating ...
input name : shimakaze
Hello, shimakaze!
ask_user, 2.1318256855010986
complex_calculation, 3.1480064392089844
Two threads total time, 3.1487057209014893

total timecomplex_calculation の実行時間がほぼ変わらないことに注目。

すなわち、ask_user の実行時間のほとんど全てはユーザーからの入力待ち時間で、このような状況の場合は複数スレッドにおける実行は非常に有効であることが分かる。


2つのスレッドとも複雑な計算をしなければいけない場合を見てみる。

まずはシングルスレッド

start = time.time()
complex_calculation()
complex_calculation()
print(f"Single thread total time, {time.time() - start}\n")

# Start calculating ...
# complex_calculation, 11.879278659820557
# Start calculating ...
# complex_calculation, 11.791674613952637
# Single thread total time, 23.671953201293945

続いて複数のスレッド

thread1 = Thread(target=complex_calculation)
thread2 = Thread(target=complex_calculation)

start = time.time()

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Two threads total time, {time.time() - start}")

# Start calculating ...
# Start calculating ...
# complex_calculation, 24.083377838134766
# complex_calculation, 24.088377952575684
# Two threads total time, 24.089377880096436

複数スレッドでの実行の方が合計処理時間が増加している。
これは GIL の受け渡し等によるオーバーヘッドが生じている反面、待ち時間がほぼないため逆効果になってしまっている。

Python におけるマルチスレッドの利用は、待ち時間を有効活用するためのものといえる。

shimakaze_softshimakaze_soft

マルチプロセスの場合はどうなるか

プロセスベースの並列処理の場合、GILはプロセス単位で作用するので fork して生成された子プロセスは親プロセスの GIL の影響を受けずに並列処理ができる。

multiprocessing モジュールで、スレッドとよく似たインターフェースでプロセスを扱える。

ちなみにPython にはプロセスベースの並列処理における子プロセスの生成方法がいくつか用意されているが、ここでは Linux 環境のデフォルトである fork を前提とする。


スレッドはメモリ空間を共有するが、プロセスはメモリ空間も独立している

スレッドは親プロセスとメモリ空間を共有するので、例えばグローバル変数の値を変更すればスレッドの呼び出し元もその変更結果を取得できる。

一方で子プロセスは fork のタイミングで親プロセスからコピーされたメモリ空間を持つため、親プロセスの持っていた変数を参照することができるが、子プロセス側で変数を変更したとしても、その子プロセスが終了すれば破棄されて親プロセスには影響しない。

sample.py
from multiprocessing import Process
from threading import Thread

g_value = 0


def worker() -> None:
    global g_value
    g_value += 1
    print(f"in worker               : {g_value=}")


print(f"before thread execution : {g_value=}")
thread = Thread(target=worker)
thread.start()
thread.join()
print(f"after thread execution  : {g_value=}")
print('--' * 20)

print(f"before process execution: {g_value=}")
process = Process(target=worker)
process.start()
process.join()
print(f"after process execution : {g_value=}")
print('--' * 20)

実行結果は以下のようになる。

$ python sample.py
before thread execution : global_value=0
in worker               : global_value=1
after thread execution  : global_value=1
----------------------------------------
before process execution: global_value=1
in worker               : global_value=2
after process execution : global_value=1
----------------------------------------

スレッドベースの並列処理の方は、スレッドが起動して g_value に 1 加算した結果を呼び出し側でも受け取れた。

次にプロセスベースの並列処理の方は、worker 関数内の出力は g_value=2 となっているのでグローバル変数の値を参照することはできる。
しかし worker 関数が終了(子プロセスが終了)して親プロセスに制御が戻ると g_value=1 に戻ってしまった。
なぜならば g_value=2 に更新されたのは子プロセスの中だけで、親プロセス側の g_value は影響を受けない

ちなみにこのサンプルプログラムは1つのプログラム内でスレッドとプロセスを両方とも生成してるが、プロセス生成のタイミングで複数スレッドが動作している状況は本質的に安全ではない点は注意。

プロセスベースの並列処理を起動する前にスレッドを終了しておくと安全。

https://docs.python.org/ja/3/library/multiprocessing.html#contexts-and-start-methods

shimakaze_softshimakaze_soft

データの受け渡しはプロセス間通信

データの受け渡しに利用した Queue クラスについて考える。

スレッドの場合はメモリ空間を共有しているので、データのやりとりは同じメモリ空間内で直接行うことができる。

一方でプロセスベースの並列処理で生成される子プロセスは親子関係こそあるものの相互に独立したプロセスなので、データのやりとりにはプロセス間通信を利用してバイト列を送受信する。

スレッドとプロセスは両方とも似たインターフェースを持つ Queue クラスでやりとりするが、実はその内部実装は大きく異なる。

from collections import defaultdict
from threading import Thread
from queue import Queue

def calc(v: int = 10000000, queue: Queue):
    s: int = sum([x**2 for x in range(v)])
    queue.put(s)
    print(f's {s}')

queue = Queue()
thread = Thread(target=calc, args=(100000, queue))
thread.start()
thread.join()
s = queue.get()
print(s)

子プロセスにデータを渡す方法

プロセスベースの並列処理で子プロセスを作成する負荷(つまり fork の負荷)はそれほど高くない。
そこで親から子に情報を受け渡す場合は親プロセス側で渡したいデータを生成した後に子プロセスを生成する方法がパフォーマンス的に優れている。

ちなみに子プロセスはリソースをコピーするものの、CoW(コピーオンライト)という仕組みのおかげで読み取りだけなら実際のメモリ領域を消費しないため、その面でもデメリットはない。

Processargs1GBの巨大な文字列を渡してみる。

from multiprocessing import Process, Queue

def worker(target: str, queue: Queue):
    queue.put(len(target))

# 1GB相当の文字列を生成
data_str = "A" * 1 * 1024 * 1024 * 1024

queue = Queue()

# 1GB相当の文字列を渡して子プロセスを開始
process = Process(target=worker, args=(data_str, queue))
process.start()
process.join()

# 実行結果の受け取り
print(queue.get())

巨大なデータだが、親プロセスが保持するメモリ空間をそのまま持つ子プロセスは実質コピーすることなくデータの受け渡しが可能なので1秒もかからず処理が終了した。

1073741824

real    0m1.322s
user    0m0.896s
sys     0m0.427s

次はプロセスを生成した後に Queue で子プロセスに1GBのデータを送信した所、9秒以上かかってしまう。

from multiprocessing import Process, Queue

def worker(queue: Queue):
    queue.put(len(queue.get()))

queue = Queue()

# 子プロセスを開始
process = Process(target=worker, args=(queue,))
process.start()

# 1GB相当の文字列を生成して子プロセスに送信
data_str = "A" * 1 * 1024 * 1024 * 1024
queue.put(data_str)
process.join()

# 実行結果の受け取り
print(queue.get())
1073741824

real    0m9.562s
user    0m3.744s
sys     0m7.014s

親プロセス側はシリアライズして子プロセスにデータを送信し、子プロセスはそれをデシリアライズして受け取るという流れで、同じ変数に対するメモリ領域を何度も確保してコピーした結果パフォーマンスが劣化する。

親プロセスから子プロセスへデータを渡す場合は親プロセス側でデータを作成した後に子プロセスを生成する方法が CoW を効率的に利用できて良さそう。

ただし子プロセスから親プロセスへデータを渡す場合は同じ方法を使うことができないので、multiprocessing.Queue などを利用したプロセス間通信に頼らざるをえない。


ちなみにスレッドで問題になった GIL はシステムコール実行中は解放される。
そのためストレージIOやDB操作がボトルネックならばスレッドによる並列処理でもある程度のパフォーマンス改善が可能
スレッドならば扱いやすいので両者の特徴をしっかり考えた上で適切な並列処理の方法を選ぶべき。

shimakaze_softshimakaze_soft

そもそも並列処理と並行処理とは

どちらも複数処理を同時に行う方法を意味してはいる。

並列処理

コンピュータに複数の処理装置を内蔵し、複数の命令の流れを同時に実行すること

https://e-words.jp/w/並列処理.html

1 ==処理A==処理A==処理A==処理A==処理A==
2 ==処理B==処理B==処理B==処理B==処理B==
<========== time ==========>

並行処理

コンピュータの単一の処理装置を複数の命令の流れで共有し、同時に実行状態に置くこと

https://e-words.jp/w/並列処理.html#Section_並行処理

1 ==処理A==処理B==処理B==処理A==処理B==
<========== time ==========>

I / Oバウンドとは

処理の実行時間がI/Oの速度に起因する場合、その処理をI/Oバウンドな処理という。

一般的に入出力に依存するので、これらはI/Oバウンドな処理と言える。

  • ファイルの読み書き
  • データの検索
  • ネットワークの待ち時間

計算の完了にかかる時間が、主に入出力操作の完了を待機するために費やされる期間によって決定される条件

https://en.wikipedia.org/wiki/I/O_bound

  • プログラムのディスクとの入出力(Input/Output、I / O)による負担のこと
  • ファイルの読み書き、DBへの接続、ネットワーク通信で発生することが多い
  • 処理よりもデータの読み書きに多くの時間を費やす場合

青はプログラムが作業を行っている時間であり、赤はI/O操作が完了するまでの待ち時間

https://files.realpython.com/media/IOBound.4810a888b457.png

https://realpython.com/python-concurrency/

CPUバウンド

CPUバウンドとは、処理の実行時間がCPUの計算速度に左右されるような状態のことを指す。

CPUバウンドな処理を行なっている場合、CPUはひっきりなしに計算を行なっているので、アイドル時間が無い。

  • プログラムが原因で、CPUにかかる負荷のこと
  • 処理速度がCPUに制約される
  • 外部リソースからの入出力は行わないが、処理が完了するまでに相当の時間を要する場合

数値計算のようにCPUを使い続けるような処理とかで発生することが多い

https://files.realpython.com/media/CPUBound.d2d32cb2626c.png

https://realpython.com/python-concurrency/#when-is-concurrency-useful

https://realpython.com/python-concurrency/

https://qiita.com/nyax/items/659b07cd755f2ced563f


I/Oバウンドな処理の場合、CPUの待ち時間が発生するので、処理速度を向上させるには並行処理が有効。

一方、CPUバウンドな処理の場合、CPUの遊びは無い。

この場合は並行処理を行なっても処理速度は向上しない。CPUバウンドな処理の時間短縮には、並列処理を行うしか無い。

shimakaze_softshimakaze_soft

マルチプロセスとマルチスレッドのコードの比較

以下のシングルスレッドの普通のコードをもとに並列化処理を施していく

sample.py
import math
import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
]


def is_prime(n: int):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    startTime = time.time()
    for number, prime in zip(PRIMES, map(is_prime, PRIMES)):
        print(f'{number} is prime: {prime}')
    endTime = time.time()
    runTime = endTime - startTime
    print(f'Time:{runTime}[sec]')


if __name__ == '__main__':
    main()

自分の環境での実行結果は以下になる

$ python sample.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Time:0.5858039855957031[sec]

ProcessPoolExecutorでマルチプロセス

sample.py
import concurrent.futures
import math
import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
]


def is_prime(n: int):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    startTime = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print(f'{number} is prime: {prime}')
    endTime = time.time()
    runTime = endTime - startTime
    print(f'Time:{runTime}[sec]')


if __name__ == '__main__':
    main()

実行結果は更に短縮されていることがわかる。

$ python sample.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Time:0.2622056007385254[sec]

マルチスレッドを試す

上記のようなCPUバウンドな処理では、マルチスレッドではGILの制約があるため、高速化を行うことは基本的にできません。

GILの制約があるマルチスレッドではあるものの、I / Oは制約が解除される。今回はI/Oバウンドなコードを書いて、それの並列化のためにconcurrent.futuresを試してみる。

sample.py
import urllib.request
import time

urls: list[str] = [
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.com/'
]

# Retrieve a single page and report the URL and contents


def load_url(url: str, timeout: int):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


def get_detail():
    # Start the load operations and mark each future with its URL
    for url in urls:
        try:
            data: bytes = load_url(url, 60)
        except Exception as exc:
            print(f'{url} generated an exception: {exc}')
        else:
            print(f'{url} page is {len(data)} bytes')


def main():
    startTime = time.time()
    get_detail()
    endTime = time.time()
    runTime = endTime - startTime
    print(f'Time:{runTime}[sec]')


if __name__ == '__main__':
    main()

実行結果は更に短縮されていることがわかる。

$ python sample.py
http://www.foxnews.com/ page is 283042 bytes
http://www.cnn.com/ page is 1145006 bytes
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.bbc.co.uk/ page is 490155 bytes
http://some-made-up-domain.com/ page is 484 bytes
Time:1.290802001953125[sec]

並列化で高速化する

ThreadPoolExecutorでマルチスレッドにする

sample.py
import concurrent.futures
import urllib.request
import time


urls: list[str] = [
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.com/'
]

# Retrieve a single page and report the URL and contents


def load_url(url: str, timeout: int):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


def get_detail():
    # Start the load operations and mark each future with its URL
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data: bytes = future.result()
            except Exception as exc:
                print(f'{url} generated an exception: {exc}')
            else:
                print(f'{url} page is {len(data)} bytes')


def main():
    startTime = time.time()
    get_detail()
    endTime = time.time()
    runTime = endTime - startTime
    print(f'Time:{runTime}[sec]')


if __name__ == '__main__':
    main()

実行結果は更に短縮されていることがわかる。

$ python sample.py
root@b0a88061d4ca:/workspaces/recustomer-auth# python .github/workflows/sample.py 
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.bbc.co.uk/ page is 490155 bytes
http://www.cnn.com/ page is 1145006 bytes
http://www.foxnews.com/ page is 283043 bytes
http://some-made-up-domain.com/ page is 484 bytes
Time:0.5555202960968018[sec]

  • 並列処理
  • 並行処理

これらは常に必要であるとは限らず、必要でない時に使用してもその恩恵を受けることはできない。

スレッドとプロセスの動作とか、並行処理と並列処理への理解が、コードのボトルネックの発見だったり実装に役立つ。

マルチプロセス実装時の注意点

マルチプロセスの場合はCPUバウンドな処理の場合は処理時間は短くできる。しかし、プログラムの実行時に プロセスを起動するために若干の時間(オーバーヘッド)が生じる

ものによってはオーバーヘッドにより、通常の処理よりも遅くなる場合もある。

またメモリをダイレクトに共有することができない。
そのままだとプロセス間で変数などをやりとりする際にはpickle化されてコピーが生成され受け渡しがされる。
それだと、大きなデータを渡したりするとメモリを瞬間的に膨大に消費したり、pickle化などのオーバーヘッドで期待したほど速くならないケースもある。

これを改善するためには、ビルトインの共有メモリ(shared memory)の機能を使ったり、共有メモリ用のライブラリなどを利用する。

同時に更新したりしないように注意は必要になるものの、pickle化を省けるので処理時間が短くなり省メモリで扱える。

shimakaze_softshimakaze_soft

Pythonの場合、正確に言えばCPythonの場合はマルチスレッド=並行処理マルチプロセス=並列処理になる。

並行処理がGILによってCPUバウンドな処理の高速化面で制限がある一方、並列処理は基本的にGILの制限がかからないのでCPUバウンドな処理の高速化が期待できる。

並行処理の各モジュール

並列/並行処理を実現するために以下のようなモジュールを提供。

  • threading - マルチスレッド処理を行うためのモジュール
  • multiprocessing - マルチプロセス処理を行うためのモジュール
  • concurrent.futures - 並列タスク実行モジュール
    • ThreadPoolExecutor - マルチスレッド実装の並列タスク実行クラス
    • ProcessPoolExecutor - マルチプロセス実装の並列タスク実行クラス

concurrent.futuresはざっくりと説明するとthreading及びmultiprocessingをより扱いやすくしたようなモジュール。

大量の処理を効率よく並列に処理させたい場合、threadingmultiprocessingを使う場合よりもconcurrent.futuresの方が同じことを短いコード量で実現できるため、特に理由がない限りはconcurrent.futuresを使うのが良い。

thread

古いPython2のバージョンだとこのモジュールしかなかったりするものの、基本的には使い勝手が悪いので使わない。
Python3では間違って使わないように、_threadとアンダースコアがつけられている。

threading

thread上位互換。並行処理のベーシックなビルトインモジュール。
インターフェイスが大分親切になった。
Python3はもちろんのこと、2.7とかでももう使えるので、基本的にはthreadを使うくらいならthreadingを使うことになる。

concurrent.futures

Python3.2以降に登場。基本的にthreadingよりもさらに優秀。
なお、futureは並列処理のFutureパターンに由来。(1960~1970年代などに発展し、提案された結構昔からあるもの)

  • スレッド数の上限を指定して、スレッドの使いまわしなどをしたりしてくれる

  • また、マルチスレッドとマルチプロセスの切り替えも1行変える程度で、このモジュールで扱えるので、途中で変えたくなったり比較してみる際などにも便利。

  • マルチスレッドの際にはThreadPoolExecutorクラスを指定。マルチプロセスに切り替えたい場合にはこれをProcessPoolExecutorに変更するだけで切り替えができる。

  • max_workersの引数に、最大スレッド数(もしくはプロセス数)を指定。指定を省略した場合、CPU数から算出されたスレッド数が指定される

import os
from concurrent import futures

max_workers = os.cpu_count()
futures.ThreadPoolExecutor(max_workers=max_workers)

並列処理の各モジュール

multiprocessing

ProcessPoolといった選択肢がある。

Processのほうが生成コストが低く、並列化して実行する関数毎にオブジェクトが生成される。
そのため、呼び出す関数の数が膨大だとその生成コストが高くなったりメモリを食ったりする。

Poolは基本的には引数でコア数などでworker数を指定して、各worker数ごとにタスクの処理が開始し、1つの処理が終わったらキューにある次のタスクの処理を開始する。
そのため、呼び出す関数が膨大でもProcessのように膨大にオブジェクトが作られたりはしない

使い分けの基準

呼び出す関数の数が膨大で、1つ1つのタスクがライトな場合 -> Processで大量にオブジェクトを生成するのはコストが高いのでPoolを使う。

呼び出す関数は少ないけど、1つ1つの関数の処理時間が長い場合 -> 単体の生成コストが低いProcessを使う。

※そもそも、一度に大量にProcessオブジェクトを走らせ始めるとエラーで怒られる。そういった制限も含め、呼び出す関数の数が多くなる場合にはPoolのほうが制御がシンプルになる。

他にもPool側が「前のタスクが終わるまで次のタスクに移らない」という挙動なため、I/O関係の操作が重いものがあるとProcessのほうが有利な場合がある。

例えとして、Processで20個同時にファイルを開いたりして操作ができたりする一方で、Poolでworkerの数を4にした場合には4個までしか同時には開かれない。

そのため、I/O関係での待ち時間が結構ある場合にはProcessを使ったほうが速くなるケースがある

concurrent.futures

ProcessPoolExecutorというクラスを指定する。

multiprocessing.Procesのコードのサンプル

返却値が必要な場合には、Managerクラスを使って、プロセス間で共有される変数を使って、そちらに設定する必要がある。

このあたりはPoolの方がシンプルで直感的。

辞書の共有される変数を使用して結果をメインのプロセスで参照するサンプルは以下になる。

sample.py
from multiprocessing import Manager, Process


def sample(initial_num: int, returned_dict, process_index: int):
    for _ in range(100):
        initial_num += 1
    returned_dict[process_index] = initial_num


manager = Manager()
returned_dict = manager.dict()
process_list = []

for i in range(4):
    kwargs = {
        'initial_num': 50,
        'returned_dict': returned_dict,
        'process_index': i,
    }
    process = Process(
        target=sample,
        kwargs=kwargs
    )

    process.start()
    process.join()

print(str(returned_dict))
# {0: 150, 1: 150, 2: 150, 3: 150}

メインプロセスで正常に別プロセスの結果が設定されていることが分かる。

multiprocessing.Poolのコードのサンプル

Poolのコンストラクタで、最大のプロセス数を指定す。
使う環境のCPUのコア数などに応じて設定。concurrent.futuresと同じように、withステートメントで扱うとシンプルに利用できる。

処理の指定には以下の複数のメソッドが存在する

  • map : 単体の関数を何度も実行するケースで、且つその関数が1つの引数のみを受け取る場合
  • map_async : 複数の関数を何度も実行するケースで、且つその関数が1つの引数のみを受け取るケース
  • starmap : 単体の関数を何度も実行するケースで、且つその関数が複数の引数を必要とする場合
  • starmap_async : 複数の関数を何度も実行するケースで、且つその関数が複数の引数を必要とする場合

Processのサンプルと比べて、呼び出し回数が多く1回の関数の処理がライトなケースを想定してサンプルを試す。

sample.py
from multiprocessing import Pool


def sample(initial_num: int):
    for _ in range(100):
        initial_num += 1
    return initial_num


result_list = None
initial_num_list = list(range(10))
with Pool(processes=4) as p:
    result_list = p.map(func=sample, iterable=initial_num_list)

print(result_list)

shimakaze_softshimakaze_soft

同時マルチスレッディング - SMT

従来、一つのコアは一度に一つのスレッドしか実行できなかった。

しかしスレッドを実行している間、CPUの回路全てを使うことはなく、空きが生じてしまう。

そのため、空いている部分利用するため、一つのコアに対して一度に複数のスレッドを実行させた仕組みが同時マルチスレッディングになる。

同時に複数スレッドを処理できることから、従来より性能が上がる。
CPUの空きを利用してスレッドをこなしているので、単純に性能が2倍になるわけではない。

https://selfsryo-blog.s3.ap-northeast-1.amazonaws.com/media/article/220225/smt.png

仮想メモリ空間、仮想アドレス

話をプロセスに戻して、プロセスにはOSからメモリ空間が割り当てられる。

メモリにはアドレスが割り振られているが、実際に使えるメモリのアドレスは飛び飛びになっている場合が多い。

しかし、プロセスからはさも一つのメモリ空間のようにみえている。

これを仮想メモリ空間といい、この仮想メモリのアドレスを仮想アドレス(または論理アドレス)という。
この仕組みにより、プロセスは実際のメモリのアドレス(物理アドレス)を意識する必要がなく、仮想アドレスのみ意識しておけばよい

https://selfsryo-blog.s3.ap-northeast-1.amazonaws.com/media/article/220225/vertial_memori.png

MMU - Memory Management Unit

プロセスにメモリを用意し、物理アドレスと仮想アドレスの変換を行う部分。

スレッドセーフ

MMUによって、プロセスにはプロセス単位でメモリが割り当てられ、他のプロセスのメモリには干渉できないようになっている。

しかし、スレッドはプロセス単位に割り当てられたメモリを利用する。

そのためマルチスレッドの場合、スレッド同士でメモリを共有することになる。
この仕様によって、意図せずデータが書き換えられてしまったり、互いに処理待ちとなるデッドロックが発生してしまったりする。

このような不具合が発生しないようになっていることを、スレッドセーフという。

https://selfsryo-blog.s3.ap-northeast-1.amazonaws.com/media/article/220225/thread_safe.png

マルチスレッドでは同じメモリを使う分、データの受け渡しが用意にできる、というメリットがある。

TLBフラッシュ

プロセスを切り替える際、MMUは以前のプロセスのキャッシュをクリアする。

そうしないと、以前のプロセスの領域に干渉できてしまう。

このキャッシュのクリアのことを、TLBフラッシュという。

マルチスレッドの場合、メモリの切り替えを必要としないのでTLBフラッシュは発生しない。

コンテキストスイッチ

またまたプロセスを切り替える際の話。

コンテキストとはCPUの状態のこと。

これをメモリに保存しておくことで、プロセスを切り替えたとしても、戻ってきた際にプロセスをロードして復元できる。

コンテキストスイッチとは、この切り替えのことを指す。

https://selfs-ryo.com/detail/python_concurrent_and_parallel

shimakaze_softshimakaze_soft

asyncio

https://speakerdeck.com/jrfk/asgiapurikesiyonru-men-kowakunaiasyncioji-chu-tofei-tong-qi-io

https://testdriven.io/blog/concurrency-parallelism-asyncio

import asyncio

# コルーチン関数の定義


async def async_multi_sleep(periods: list[float]) -> None:
    for period in periods:
        await asyncio.sleep(period)

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))

上記の通り、大まかの手順としては以下の通りです。

  • async defでコルーチン関数を定義する
  • asyncioでコルーチンを実行する

ただし、上記を実行しても合計時間は6秒かかる。
コルーチンを並行処理するには Task を作成する必要がある。

import asyncio

# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
    tasks = []

    # タスクの作成
    for period in periods:
        task = asyncio.create_task(asyncio.sleep(period))
        tasks.append(task)

    ´# 他のタスクに順番を譲る
    for task in tasks:
        await task

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))

上記で実装した場合、3秒で処理が終了する。
また、上記の内容はasyncio.gatherを用いて以下のように実装することもできる。

async def async_multi_sleep(periods: list[float]) -> None:
    await asyncio.gather(*[asyncio.sleep(period) for period in periods])

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))
  • スリープ : asyncio.sleep
  • File I/O : aiofiles
  • Network I/O - HTTP : aiohttp
  • Network I/O - DB : aiosqlite