Pythonの並列化処理
プロセスは、1つ以上のスレッドとその実行に必要なリソース群(CPU cores, network, file pointers etc...)、をひとまとめにしているもの。
Python ではこのリソース群のことをGIL
と呼ぶ。
Global Interpreter Lock
1つのプロセスは複数のスレッドを保持することができるが、GILはたった1つしか持つことができない。
Pythonではスレッドが実行される際、そのスレッドがGIL
を所有しているかがチェックされる。
そのため、1つのプロセスで1度に実行できるスレッドは常に1つだけになる。
また CPU の1つのコアで1度に実行できるスレッドも1つだけです。
どんなにコア数が多いCPUを使用していても、プロセスが1つだけしか動いていない状況下では、Pythonのプログラムで動作しているコアは1つだけということになる。
すなわち、Python におけるマルチスレッドというのは、GILを保有しているスレッドがGILを手放し、それを受け取った他のスレッドが実行され、そのスレッドがまた GIL を手放して...
ということを繰り返す。
これらのGIL の受け渡しによって制御、実現されている。
(Dining philosophers problem - 食事する哲学者の問題 - を解決している)
マルチスレッドは意味ないのか?
意味を持つときもあるが、意味がないどころか逆効果になることさえある。
Q. Python においてマルチスレッドが意味を成すのはどんな時なのか?
ある1つのスレッドにおいて (CPU の動作速度からすれば) 莫大な待ち時間が発生する場合です。
例えば、ユーザーからの入力を受け付ける
、ファイルにアクセスする
、等を含む処理を実行する場合。
このような場合、その時間のかかる操作を待つ間にGIL を一旦開放
し、他のスレッドを動作させ、時間がかかる操作が終了した時点で GIL を返して処理を続ける
ことで、シングルスレッド実行では延々と待っていなければならない時間を有効活用できる。
Python におけるスレッド活用法は、「待ち時間を減らす」
に尽きる。
そのため、待ち時間がほぼ発生しないような複雑な計算処理を実行するだけの場合、いくら複数のスレッドに分割してもGILの受け渡し等に関わるオーバーヘッドが生じる分
、かえって全体の処理速度が落ちてしまう
。
百聞は一見に如かずということで実際に動かして見てみる。
import time
from threading import Thread
def ask_user():
start = time.time()
user_input = input('input name : ')
print(f'Hello, {user_input}!')
print(f'ask_user, {time.time() - start}')
def complex_calculation():
start = time.time()
print("Start calculating ...")
[x**2 for x in range(30000000)]
print(f"complex_calculation, {time.time() - start}")
# シングルスレッドで実行
start = time.time()
ask_user()
complex_calculation()
print(f"Single thread total time, {time.time() - start}\n")
sample.py
を実行しみた結果はこちら。
$ python sample.py
input name : shimakaze
Hello, shimakaze!
ask_user, 1.5849285125732422
Start calculating ...
complex_calculation, 2.773388385772705
Single thread total time, 4.3588409423828125
続けて複数のスレッドで動作させる。
# 複数のスレッドで動作
thread1 = Thread(target=complex_calculation)
thread2 = Thread(target=ask_user)
start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Two threads total time, {time.time() - start}")
実行結果。
$ python sample.py
Start calculating ...
input name : shimakaze
Hello, shimakaze!
ask_user, 2.1318256855010986
complex_calculation, 3.1480064392089844
Two threads total time, 3.1487057209014893
total time
と complex_calculation
の実行時間がほぼ変わらないことに注目。
すなわち、ask_user
の実行時間のほとんど全てはユーザーからの入力待ち時間で、このような状況の場合は複数スレッドにおける実行は非常に有効であることが分かる。
2つのスレッドとも複雑な計算をしなければいけない場合
を見てみる。
まずはシングルスレッド
start = time.time()
complex_calculation()
complex_calculation()
print(f"Single thread total time, {time.time() - start}\n")
# Start calculating ...
# complex_calculation, 11.879278659820557
# Start calculating ...
# complex_calculation, 11.791674613952637
# Single thread total time, 23.671953201293945
続いて複数のスレッド
thread1 = Thread(target=complex_calculation)
thread2 = Thread(target=complex_calculation)
start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Two threads total time, {time.time() - start}")
# Start calculating ...
# Start calculating ...
# complex_calculation, 24.083377838134766
# complex_calculation, 24.088377952575684
# Two threads total time, 24.089377880096436
複数スレッドでの実行の方が合計処理時間が増加している。
これは GIL の受け渡し等によるオーバーヘッドが生じている反面、待ち時間がほぼないため逆効果になってしまっている。
Python におけるマルチスレッドの利用は、待ち時間を有効活用するためのものといえる。
マルチプロセスの場合はどうなるか
プロセスベースの並列処理の場合、GIL
はプロセス単位で作用するので fork して生成された子プロセスは親プロセスの GIL の影響を受けずに並列処理ができる。
multiprocessing
モジュールで、スレッドとよく似たインターフェースでプロセスを扱える。
ちなみにPython にはプロセスベースの並列処理における子プロセスの生成方法がいくつか用意
されているが、ここでは Linux 環境のデフォルトである fork を前提とする。
スレッドはメモリ空間を共有するが、プロセスはメモリ空間も独立している
スレッドは親プロセスとメモリ空間を共有するので、例えばグローバル変数の値を変更すればスレッドの呼び出し元もその変更結果を取得できる。
一方で子プロセスは fork のタイミングで親プロセスからコピーされたメモリ空間を持つ
ため、親プロセスの持っていた変数を参照することができるが、子プロセス側で変数を変更したとしても、その子プロセスが終了すれば破棄されて親プロセスには影響しない。
from multiprocessing import Process
from threading import Thread
g_value = 0
def worker() -> None:
global g_value
g_value += 1
print(f"in worker : {g_value=}")
print(f"before thread execution : {g_value=}")
thread = Thread(target=worker)
thread.start()
thread.join()
print(f"after thread execution : {g_value=}")
print('--' * 20)
print(f"before process execution: {g_value=}")
process = Process(target=worker)
process.start()
process.join()
print(f"after process execution : {g_value=}")
print('--' * 20)
実行結果は以下のようになる。
$ python sample.py
before thread execution : global_value=0
in worker : global_value=1
after thread execution : global_value=1
----------------------------------------
before process execution: global_value=1
in worker : global_value=2
after process execution : global_value=1
----------------------------------------
スレッドベースの並列処理の方は、スレッドが起動して g_value に 1 加算した結果を呼び出し側でも受け取れた。
次にプロセスベースの並列処理の方は、worker 関数内の出力は g_value=2
となっているのでグローバル変数の値を参照することはできる。
しかし worker 関数が終了(子プロセスが終了)して親プロセスに制御が戻ると g_value=1
に戻ってしまった。
なぜならば g_value=2
に更新されたのは子プロセスの中だけで、親プロセス側の g_value は影響を受けない
。
ちなみにこのサンプルプログラムは1つのプログラム内でスレッドとプロセスを両方とも生成してるが、プロセス生成のタイミングで複数スレッドが動作している状況は本質的に安全ではない点は注意。
プロセスベースの並列処理を起動する前にスレッドを終了しておくと安全。
データの受け渡しはプロセス間通信
データの受け渡しに利用した Queue クラスについて考える。
スレッドの場合はメモリ空間を共有している
ので、データのやりとりは同じメモリ空間内で直接行うことができる。
一方でプロセスベースの並列処理で生成される子プロセスは親子関係こそあるものの相互に独立したプロセス
なので、データのやりとりにはプロセス間通信を利用してバイト列を送受信する。
スレッドとプロセスは両方とも似たインターフェースを持つ Queue クラスでやりとりするが、実はその内部実装は大きく異なる。
from collections import defaultdict
from threading import Thread
from queue import Queue
def calc(v: int = 10000000, queue: Queue):
s: int = sum([x**2 for x in range(v)])
queue.put(s)
print(f's {s}')
queue = Queue()
thread = Thread(target=calc, args=(100000, queue))
thread.start()
thread.join()
s = queue.get()
print(s)
子プロセスにデータを渡す方法
プロセスベースの並列処理で子プロセスを作成する負荷(つまり fork の負荷)はそれほど高くない。
そこで親から子に情報を受け渡す場合は親プロセス側で渡したいデータを生成した後に子プロセスを生成する方法がパフォーマンス的に優れている。
ちなみに子プロセスはリソースをコピーするものの、CoW(コピーオンライト)
という仕組みのおかげで読み取りだけなら実際のメモリ領域を消費しないため、その面でもデメリットはない。
Process
の args
で1GB
の巨大な文字列を渡してみる。
from multiprocessing import Process, Queue
def worker(target: str, queue: Queue):
queue.put(len(target))
# 1GB相当の文字列を生成
data_str = "A" * 1 * 1024 * 1024 * 1024
queue = Queue()
# 1GB相当の文字列を渡して子プロセスを開始
process = Process(target=worker, args=(data_str, queue))
process.start()
process.join()
# 実行結果の受け取り
print(queue.get())
巨大なデータだが、親プロセスが保持するメモリ空間をそのまま持つ子プロセスは実質コピーすることなくデータの受け渡しが可能なので1秒もかからず処理が終了した。
1073741824
real 0m1.322s
user 0m0.896s
sys 0m0.427s
次はプロセスを生成した後に Queue
で子プロセスに1GB
のデータを送信した所、9秒以上かかってしまう。
from multiprocessing import Process, Queue
def worker(queue: Queue):
queue.put(len(queue.get()))
queue = Queue()
# 子プロセスを開始
process = Process(target=worker, args=(queue,))
process.start()
# 1GB相当の文字列を生成して子プロセスに送信
data_str = "A" * 1 * 1024 * 1024 * 1024
queue.put(data_str)
process.join()
# 実行結果の受け取り
print(queue.get())
1073741824
real 0m9.562s
user 0m3.744s
sys 0m7.014s
親プロセス側はシリアライズして子プロセスにデータを送信し、子プロセスはそれをデシリアライズして受け取るという流れで、同じ変数に対するメモリ領域を何度も確保してコピーした結果パフォーマンスが劣化する。
親プロセスから子プロセスへデータを渡す場合は親プロセス側でデータを作成した後に子プロセスを生成する方法が CoW
を効率的に利用できて良さそう。
ただし子プロセスから親プロセスへデータを渡す場合は同じ方法を使うことができないので、multiprocessing.Queue
などを利用したプロセス間通信に頼らざるをえない。
ちなみにスレッドで問題になった GIL はシステムコール実行中は解放される。
そのためストレージIOやDB操作がボトルネックならばスレッドによる並列処理でもある程度のパフォーマンス改善が可能
。
スレッドならば扱いやすいので両者の特徴をしっかり考えた上で適切な並列処理の方法を選ぶべき。
そもそも並列処理と並行処理とは
どちらも複数処理を同時に行う方法を意味してはいる。
並列処理
コンピュータに複数の処理装置を内蔵し、複数の命令の流れを同時に実行すること
1 ==処理A==処理A==処理A==処理A==処理A==
2 ==処理B==処理B==処理B==処理B==処理B==
<========== time ==========>
並行処理
コンピュータの単一の処理装置を複数の命令の流れで共有し、同時に実行状態に置くこと
1 ==処理A==処理B==処理B==処理A==処理B==
<========== time ==========>
I / Oバウンドとは
処理の実行時間がI/Oの速度に起因する場合、その処理をI/Oバウンドな処理という。
一般的に入出力に依存するので、これらはI/Oバウンドな処理と言える。
- ファイルの読み書き
- データの検索
- ネットワークの待ち時間
計算の完了にかかる時間が、主に入出力操作の完了を待機するために費やされる期間によって決定される条件
- プログラムのディスクとの入出力
(Input/Output、I / O)
による負担のこと - ファイルの読み書き、DBへの接続、ネットワーク通信で発生することが多い
- 処理よりもデータの読み書きに多くの時間を費やす場合
青はプログラムが作業を行っている時間であり、赤はI/O操作が完了するまでの待ち時間
CPUバウンド
CPUバウンドとは、処理の実行時間がCPUの計算速度に左右されるような状態のことを指す。
CPUバウンドな処理を行なっている場合、CPUはひっきりなしに計算を行なっているので、アイドル時間が無い。
- プログラムが原因で、
CPUにかかる負荷
のこと - 処理速度がCPUに制約される
- 外部リソースからの入出力は行わないが、処理が完了するまでに相当の時間を要する場合
数値計算のようにCPUを使い続けるような処理とかで発生することが多い
https://realpython.com/python-concurrency/#when-is-concurrency-useful
I/Oバウンドな処理の場合、CPUの待ち時間が発生するので、処理速度を向上させるには並行処理が有効。
一方、CPUバウンドな処理の場合、CPUの遊びは無い。
この場合は並行処理を行なっても処理速度は向上しない。CPUバウンドな処理の時間短縮には、並列処理を行うしか無い。
マルチプロセスとマルチスレッドのコードの比較
以下のシングルスレッドの普通のコードをもとに並列化処理を施していく
import math
import time
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
]
def is_prime(n: int):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
startTime = time.time()
for number, prime in zip(PRIMES, map(is_prime, PRIMES)):
print(f'{number} is prime: {prime}')
endTime = time.time()
runTime = endTime - startTime
print(f'Time:{runTime}[sec]')
if __name__ == '__main__':
main()
自分の環境での実行結果は以下になる
$ python sample.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Time:0.5858039855957031[sec]
ProcessPoolExecutorでマルチプロセス
import concurrent.futures
import math
import time
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
]
def is_prime(n: int):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
startTime = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print(f'{number} is prime: {prime}')
endTime = time.time()
runTime = endTime - startTime
print(f'Time:{runTime}[sec]')
if __name__ == '__main__':
main()
実行結果は更に短縮されていることがわかる。
$ python sample.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Time:0.2622056007385254[sec]
マルチスレッドを試す
上記のようなCPUバウンドな処理では、マルチスレッドではGILの制約があるため、高速化を行うことは基本的にできません。
GILの制約があるマルチスレッドではあるものの、I / O
は制約が解除される。今回はI/Oバウンドなコードを書いて、それの並列化のためにconcurrent.futures
を試してみる。
import urllib.request
import time
urls: list[str] = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/'
]
# Retrieve a single page and report the URL and contents
def load_url(url: str, timeout: int):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
def get_detail():
# Start the load operations and mark each future with its URL
for url in urls:
try:
data: bytes = load_url(url, 60)
except Exception as exc:
print(f'{url} generated an exception: {exc}')
else:
print(f'{url} page is {len(data)} bytes')
def main():
startTime = time.time()
get_detail()
endTime = time.time()
runTime = endTime - startTime
print(f'Time:{runTime}[sec]')
if __name__ == '__main__':
main()
実行結果は更に短縮されていることがわかる。
$ python sample.py
http://www.foxnews.com/ page is 283042 bytes
http://www.cnn.com/ page is 1145006 bytes
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.bbc.co.uk/ page is 490155 bytes
http://some-made-up-domain.com/ page is 484 bytes
Time:1.290802001953125[sec]
並列化で高速化する
ThreadPoolExecutor
でマルチスレッドにする
import concurrent.futures
import urllib.request
import time
urls: list[str] = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/'
]
# Retrieve a single page and report the URL and contents
def load_url(url: str, timeout: int):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
def get_detail():
# Start the load operations and mark each future with its URL
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data: bytes = future.result()
except Exception as exc:
print(f'{url} generated an exception: {exc}')
else:
print(f'{url} page is {len(data)} bytes')
def main():
startTime = time.time()
get_detail()
endTime = time.time()
runTime = endTime - startTime
print(f'Time:{runTime}[sec]')
if __name__ == '__main__':
main()
実行結果は更に短縮されていることがわかる。
$ python sample.py
root@b0a88061d4ca:/workspaces/recustomer-auth# python .github/workflows/sample.py
http://europe.wsj.com/ generated an exception: HTTP Error 403: Forbidden
http://www.bbc.co.uk/ page is 490155 bytes
http://www.cnn.com/ page is 1145006 bytes
http://www.foxnews.com/ page is 283043 bytes
http://some-made-up-domain.com/ page is 484 bytes
Time:0.5555202960968018[sec]
並列処理
並行処理
これらは常に必要であるとは限らず、必要でない時に使用してもその恩恵を受けることはできない。
スレッドとプロセスの動作とか、並行処理と並列処理への理解が、コードのボトルネックの発見だったり実装に役立つ。
マルチプロセス実装時の注意点
マルチプロセスの場合はCPUバウンドな処理の場合は処理時間は短くできる。しかし、プログラムの実行時に プロセスを起動するために若干の時間(オーバーヘッド)が生じる
。
ものによってはオーバーヘッドにより、通常の処理よりも遅くなる場合もある。
またメモリをダイレクトに共有することができない。
そのままだとプロセス間で変数などをやりとりする際にはpickle化されてコピーが生成され受け渡しがされる。
それだと、大きなデータを渡したりするとメモリを瞬間的に膨大に消費したり、pickle化などのオーバーヘッドで期待したほど速くならないケースもある。
これを改善するためには、ビルトインの共有メモリ(shared memory)
の機能を使ったり、共有メモリ用のライブラリなどを利用する。
同時に更新したりしないように注意は必要になるものの、pickle化を省けるので処理時間が短くなり省メモリで扱える。
Pythonの場合、正確に言えばCPythonの場合はマルチスレッド=並行処理
、マルチプロセス=並列処理
になる。
並行処理がGILによってCPUバウンドな処理の高速化面で制限がある一方、並列処理は基本的にGILの制限がかからないのでCPUバウンドな処理の高速化が期待できる。
並行処理の各モジュール
並列/並行処理
を実現するために以下のようなモジュールを提供。
- threading -
マルチスレッド処理を行うためのモジュール
- multiprocessing -
マルチプロセス処理を行うためのモジュール
- concurrent.futures -
並列タスク実行モジュール
- ThreadPoolExecutor -
マルチスレッド実装の並列タスク実行クラス
- ProcessPoolExecutor -
マルチプロセス実装の並列タスク実行クラス
- ThreadPoolExecutor -
concurrent.futures
はざっくりと説明するとthreading
及びmultiprocessing
をより扱いやすくしたようなモジュール。
大量の処理を効率よく並列に処理させたい場合、threading
やmultiprocessing
を使う場合よりもconcurrent.futures
の方が同じことを短いコード量で実現できるため、特に理由がない限りはconcurrent.futures
を使うのが良い。
thread
古いPython2のバージョンだとこのモジュールしかなかったりするものの、基本的には使い勝手が悪いので使わない。
Python3では間違って使わないように、_threadとアンダースコアがつけられている。
threading
thread上位互換。並行処理のベーシックなビルトインモジュール。
インターフェイスが大分親切になった。
Python3はもちろんのこと、2.7とかでももう使えるので、基本的にはthreadを使うくらいならthreadingを使うことになる。
concurrent.futures
Python3.2以降に登場。基本的にthreadingよりもさらに優秀。
なお、futureは並列処理のFutureパターンに由来。(1960~1970年代などに発展し、提案された結構昔からあるもの)
-
スレッド数の上限を指定して、スレッドの使いまわしなどをしたりしてくれる
-
また、
マルチスレッドとマルチプロセスの切り替えも1行変える程度
で、このモジュールで扱えるので、途中で変えたくなったり比較してみる際などにも便利。 -
マルチスレッドの際には
ThreadPoolExecutorクラス
を指定。マルチプロセスに切り替えたい場合にはこれをProcessPoolExecutor
に変更するだけで切り替えができる。 -
max_workersの引数に、最大スレッド数(もしくはプロセス数)を指定。
指定を省略した場合、CPU数から算出されたスレッド数が指定
される
import os
from concurrent import futures
max_workers = os.cpu_count()
futures.ThreadPoolExecutor(max_workers=max_workers)
並列処理の各モジュール
multiprocessing
Process
やPool
といった選択肢がある。
Process
のほうが生成コストが低く、並列化して実行する関数毎にオブジェクトが生成される。
そのため、呼び出す関数の数が膨大だとその生成コストが高くなったりメモリを食ったりする。
Pool
は基本的には引数でコア数などでworker数を指定して、各worker数ごとにタスクの処理が開始し、1つの処理が終わったらキューにある次のタスクの処理を開始する。
そのため、呼び出す関数が膨大でもProcess
のように膨大にオブジェクトが作られたりはしない。
使い分けの基準
呼び出す関数の数が膨大で、1つ1つのタスクがライトな場合 -> Processで大量にオブジェクトを生成するのはコストが高いのでPoolを使う。
呼び出す関数は少ないけど、1つ1つの関数の処理時間が長い場合 -> 単体の生成コストが低いProcessを使う。
※そもそも、一度に大量にProcessオブジェクトを走らせ始めるとエラーで怒られる。そういった制限も含め、呼び出す関数の数が多くなる場合にはPoolのほうが制御がシンプルになる。
他にもPool側が「前のタスクが終わるまで次のタスクに移らない」
という挙動なため、I/O関係の操作が重いものがあるとProcessのほうが有利な場合
がある。
例えとして、Processで20個同時にファイルを開いたりして操作ができたりする一方で、Poolでworkerの数を4にした場合には4個までしか同時
には開かれない。
そのため、I/O関係での待ち時間が結構ある場合にはProcessを使ったほうが速くなるケースがある
。
concurrent.futures
ProcessPoolExecutor
というクラスを指定する。
multiprocessing.Procesのコードのサンプル
返却値が必要な場合には、Manager
クラスを使って、プロセス間で共有される変数を使って、そちらに設定する必要がある。
このあたりはPoolの方がシンプルで直感的。
辞書の共有される変数を使用して結果をメインのプロセスで参照するサンプルは以下になる。
from multiprocessing import Manager, Process
def sample(initial_num: int, returned_dict, process_index: int):
for _ in range(100):
initial_num += 1
returned_dict[process_index] = initial_num
manager = Manager()
returned_dict = manager.dict()
process_list = []
for i in range(4):
kwargs = {
'initial_num': 50,
'returned_dict': returned_dict,
'process_index': i,
}
process = Process(
target=sample,
kwargs=kwargs
)
process.start()
process.join()
print(str(returned_dict))
# {0: 150, 1: 150, 2: 150, 3: 150}
メインプロセスで正常に別プロセスの結果が設定されていることが分かる。
multiprocessing.Poolのコードのサンプル
Pool
のコンストラクタで、最大のプロセス数を指定す。
使う環境のCPUのコア数などに応じて設定。concurrent.futures
と同じように、withステートメントで扱うとシンプルに利用できる。
処理の指定には以下の複数のメソッドが存在する
- map : 単体の関数を何度も実行するケースで、且つその関数が1つの引数のみを受け取る場合
- map_async : 複数の関数を何度も実行するケースで、且つその関数が1つの引数のみを受け取るケース
- starmap : 単体の関数を何度も実行するケースで、且つその関数が複数の引数を必要とする場合
- starmap_async : 複数の関数を何度も実行するケースで、且つその関数が複数の引数を必要とする場合
Process
のサンプルと比べて、呼び出し回数が多く1回の関数の処理がライトなケースを想定してサンプルを試す。
from multiprocessing import Pool
def sample(initial_num: int):
for _ in range(100):
initial_num += 1
return initial_num
result_list = None
initial_num_list = list(range(10))
with Pool(processes=4) as p:
result_list = p.map(func=sample, iterable=initial_num_list)
print(result_list)
共有メモリ
同時マルチスレッディング - SMT
従来、一つのコアは一度に一つのスレッドしか実行できなかった。
しかしスレッドを実行している間、CPUの回路全てを使うことはなく、空きが生じてしまう。
そのため、空いている部分利用するため、一つのコアに対して一度に複数のスレッドを実行させた仕組みが同時マルチスレッディング
になる。
同時に複数スレッドを処理できることから、従来より性能が上がる。
CPUの空きを利用してスレッドをこなしているので、単純に性能が2倍になるわけではない。
仮想メモリ空間、仮想アドレス
話をプロセスに戻して、プロセスにはOSからメモリ空間が割り当てられる。
メモリにはアドレスが割り振られているが、実際に使えるメモリのアドレスは飛び飛びになっている場合が多い。
しかし、プロセスからはさも一つのメモリ空間のようにみえている。
これを仮想メモリ空間といい、この仮想メモリのアドレスを仮想アドレス(または論理アドレス)という。
この仕組みにより、プロセスは実際のメモリのアドレス(物理アドレス)を意識する必要がなく、仮想アドレスのみ意識しておけばよい
MMU - Memory Management Unit
プロセスにメモリを用意し、物理アドレスと仮想アドレスの変換を行う部分。
スレッドセーフ
MMU
によって、プロセスにはプロセス単位でメモリが割り当てられ、他のプロセスのメモリには干渉できないようになっている。
しかし、スレッドはプロセス単位に割り当てられたメモリを利用する。
そのためマルチスレッドの場合、スレッド同士でメモリを共有することになる。
この仕様によって、意図せずデータが書き換えられてしまったり、互いに処理待ちとなるデッドロックが発生してしまったりする。
このような不具合が発生しないようになっていることを、スレッドセーフ
という。
マルチスレッド
では同じメモリを使う分、データの受け渡しが用意にできる、というメリットがある。
TLBフラッシュ
プロセスを切り替える際、MMUは以前のプロセスのキャッシュをクリアする。
そうしないと、以前のプロセスの領域に干渉できてしまう。
このキャッシュのクリアのことを、TLBフラッシュ
という。
マルチスレッドの場合、メモリの切り替えを必要としない
のでTLBフラッシュは発生しない。
コンテキストスイッチ
またまたプロセスを切り替える際の話。
コンテキストとはCPUの状態のこと。
これをメモリに保存しておくことで、プロセスを切り替えたとしても、戻ってきた際にプロセスをロードして復元できる。
コンテキストスイッチとは、この切り替えのことを指す。
asyncio
import asyncio
# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
for period in periods:
await asyncio.sleep(period)
periods = [1.0, 2.0, 3.0]
# コルーチンの実行
asyncio.run(async_multi_sleep(periods))
上記の通り、大まかの手順としては以下の通りです。
- async defでコルーチン関数を定義する
- asyncioでコルーチンを実行する
ただし、上記を実行しても合計時間は6秒かかる。
コルーチンを並行処理するには Task を作成する必要がある。
import asyncio
# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
tasks = []
# タスクの作成
for period in periods:
task = asyncio.create_task(asyncio.sleep(period))
tasks.append(task)
´# 他のタスクに順番を譲る
for task in tasks:
await task
periods = [1.0, 2.0, 3.0]
# コルーチンの実行
asyncio.run(async_multi_sleep(periods))
上記で実装した場合、3秒で処理が終了する。
また、上記の内容はasyncio.gatherを用いて以下のように実装することもできる。
async def async_multi_sleep(periods: list[float]) -> None:
await asyncio.gather(*[asyncio.sleep(period) for period in periods])
periods = [1.0, 2.0, 3.0]
# コルーチンの実行
asyncio.run(async_multi_sleep(periods))
- スリープ : asyncio.sleep
- File I/O : aiofiles
- Network I/O - HTTP : aiohttp
- Network I/O - DB : aiosqlite
Parsl
並列パイプライン