[Python] スレッドについて
はじめに
時間が掛かる処理がありどうにか速く出来ないかと、スレッドの処理について調べてみることにしました。
特に、
-
subprocess
の呼び出しの時間を何とかしたい - 1 つ 1 つは、1 秒程度で終了する。
- 条件を満たすまで実行を続ける。
- 実行が終わったら値を表示する。
の場合に適した方法が知りたかったです。
環境
Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)]
threding
threading
を使った場合から調べてみる。
threading を起動させる
Thread.start()
簡単な使い方。
thread = threading.Thread(target=None, args=(), deamon=False)
でスレッドを作成する。
target
には、スレッドで処理させる関数を指定する。
args
には、関数に与える引数を渡す。
thread.start()
で、別のスレッドでtarget
の関数が引数にargs
を与えられて実行される。
thread1 = threading.Thread(target=func, args=("thread", i, 1))
thread1.start()
以下で簡単に動きを見ていく事にする。
後々のことを考えて汎用的な関数の定義もしておく。
import time
import random
import threading
import queue
import ctypes
import sys
import datetime
import functools
# 現在時刻の取得
def getTime():
return datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
# 現在時刻付きprint
def printTime(string):
print(f"{getTime()}: {string}")
# 関数名と起動時刻の表示するデコレータ
def printName(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"{getTime()}: {func.__name__} start")
result = func(*args, **kwargs)
print(f"{getTime()}: {func.__name__} end")
return result
return wrapper
def check():
return True if random.random() < 0.1 else False
def ff(name, i, n):
time.sleep(n)
result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
printTime(result)
return result
@printName
def test1():
for i in range(3):
printTime(f"test start {i}")
thread1 = threading.Thread(target=ff, args=("thread1", i, 1))
thread2 = threading.Thread(target=ff, args=("thread2", i, 2))
thread1.start()
thread2.start()
test1()
同時並行で実行される。
呼び出した順番と結果が表示されるのに多少の前後がある。
18:28:21.281: test1 start
18:28:21.297: test start 0
18:28:21.359: test start 1
18:28:21.366: test start 2
18:28:21.367: test1 end
18:28:22.349: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:28:22.381: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:28:22.382: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:28:23.369: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': False}
18:28:23.369: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': True}
18:28:23.369: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}
Thread.run()を直接呼んでみる
@printName
def test2():
for i in range(3):
printTime(f"test run {i}")
thread1 = threading.Thread(target=ff, args=("thread1", i, 1))
thread2 = threading.Thread(target=ff, args=("thread2", i, 2))
thread1.run()
thread2.run()
test2()
こうするとメインスレッドでtarget
の関数を実行しているようだ。
間違って使ってしまったが価値はないと思う。
18:29:49.284: test2 start
18:29:49.318: test run 0
18:29:50.320: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:29:52.333: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': False}
18:29:52.335: test run 1
18:29:53.347: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:29:55.384: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': False}
18:29:55.413: test run 2
18:29:56.436: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:29:58.441: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}
18:29:58.459: test2 end
スレッドの終了まで待機する
join()
以下のような場合、メインスレッドの方がthread1
より速く終了する。
@printName
def test3():
thread1 = threading.Thread(target=ff, args=("thread1", 0, 5))
thread2 = threading.Thread(target=ff, args=("thread2", 0, 1))
printTime("start main")
thread1.start()
thread2.start()
printTime("end main")
test3()
18:35:16.626: test3 start
18:35:16.639: start main
18:35:16.661: end main
18:35:16.661: test3 end
18:35:17.674: {'name': 'thread2', 'index': 0, 'sleep': 1, 'result': False}
18:35:21.686: {'name': 'thread1', 'index': 0, 'sleep': 5, 'result': False}
特定のスレッドの終了を待ちたい時には、join()
を使う。
@printName
def test4():
thread1 = threading.Thread(target=ff, args=("thread1", 0, 5))
thread2 = threading.Thread(target=ff, args=("thread2", 0, 1))
printTime("start main")
thread1.start()
thread2.start()
thread1.join()
printTime("end main")
test4()
今度は、時間のかかるthread1
の終了を待ってからメインスレッドが終了した。
18:36:05.990: test4 start
18:36:05.999: start main
18:36:07.046: {'name': 'thread2', 'index': 0, 'sleep': 1, 'result': False}
18:36:11.044: {'name': 'thread1', 'index': 0, 'sleep': 5, 'result': False}
18:36:11.050: end main
18:36:11.050: test4 end
スレッドを終了させる
daemon=True
作成時にdaemon=True
として作成されたスレッドは、メインスレッドも含むdaemon=False
のスレッドが全ての終了したら強制終了する。
def watch(name, n):
while True:
printTime(f"{name}: alive")
time.sleep(n)
@printName
def test5():
daemon = threading.Thread(target=watch, args=("daemon", 0.5), daemon=True)
daemon.start()
printTime("start main")
test1()
printTime("end main")
test5()
他のスレッドが終了したら、daemon は、勝手に終了している。
18:37:33.974: test5 start
18:37:34.047: daemon: alive
18:37:34.047: start main
18:37:34.051: test1 start
18:37:34.054: test start 0
18:37:34.071: test start 1
18:37:34.106: test start 2
18:37:34.178: test1 end
18:37:34.185: end main
18:37:34.185: test5 end
18:37:34.564: daemon: alive
18:37:35.071: daemon: alive
18:37:35.071: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:37:35.107: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:37:35.168: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:37:35.600: daemon: alive
18:37:36.081: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': False}
18:37:36.123: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': True}
18:37:36.142: daemon: alive
18:37:36.183: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}
daemon を 2 つ作っても、daemon 以外が終了したら強制終了される。
@printName
def test6():
daemon1 = threading.Thread(target=watch, args=("daemon1", 0.5), daemon=True)
daemon1.start()
daemon2 = threading.Thread(target=watch, args=("daemon1", 0.5), daemon=True)
daemon2.start()
printTime("start main")
test1()
printTime("end main")
test6()
個人的には、daemon という名前から最後まで残って後処理をする役割なのかと思ってしまった。
しかし、daemon に全てのスレッドが終わった後の処理をさせようと思っても出来ない。
なぜならば、他のスレッドが終了した時点で強制終了されるので後処理が実行されないから。
そのような目的では使えないようだ。
余談だが、join()
を使って daemon の終了を待つのが通常の使用法との記述を見たが、決してそんな事はないと思う。
それでは、何が目的か分からなくなってしまう。
終了させる仕組みを作る
スレッドに例外を送ると終了させることが出来る。
ctypes.pythonapi.PyThreadState_SetAsyncExc()
を使うと例外を送ることが出来る。
class CustomThread1(threading.Thread):
def __init__(self, name, interval):
super().__init__()
self.name = name
self.interval = interval
@printName
def kill(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.native_id, ctypes.py_object(SystemExit))
if ret > 1: # 状態が変更されたスレッドの数を返す。通常は、1。見つからなかった場合は、0。
ctypes.pythonapi.PyThreadState_SetAsyncExc(self.native_id, None) # たぶん必要ないが念のため。まだ送られていない例外を消去
printTime("failed")
@printName
def run(self):
while True:
printTime(f"{self.name}: alive")
time.sleep(self.interval)
@printName
def test7():
thread1 = CustomThread1("thread1", 1)
thread1.start()
time.sleep(3)
thread1.kill()
test7()
thread1
は、何らかの手段で終了させないと無限ループで実行を続ける。
kill()
を実行して例外を発生させると終了したのが分かる。
18:44:04.264: test7 start
18:44:04.309: run start
18:44:04.333: thread1: alive
18:44:05.364: thread1: alive
18:44:06.374: thread1: alive
18:44:07.320: kill start
18:44:07.322: kill end
18:44:07.323: test7 end
ctypes.pythonapi.PyThreadState_SetAsyncExc()
でctypes.py_object(SystemExit)
以外のctypes.py_object(ValueError)
なども送ってもスレッドは終了する。
その場合は、run()
の中で適切に例外を処理してやらないとエラーメッセージが表示される。
上記のコードは、動作を調べてみると以下のようなコードと同じような動きとなる。
class CustomThread2(threading.Thread):
def __init__(self, name, interval):
super().__init__()
self.name = name
self.interval = interval
self.killed = False
@printName
def kill(self):
self.killed = True
@printName
def run(self):
while True:
if self.killed:
sys.exit(0)
printTime(f"{self.name}: alive")
time.sleep(self.interval)
もっと簡単にすれば、以下の様になる。
class CustomThread3(threading.Thread):
def __init__(self, name, interval):
super().__init__()
self.name = name
self.interval = interval
self.alive = True
@printName
def kill(self):
self.alive = False
@printName
def run(self):
while self.alive:
printTime(f"{self.name}: alive")
time.sleep(self.interval)
現実の問題としてこんな感じで終了させられる処理も珍しいかもしれないが、途中で終了させる意図があれば終了処理をしっかりと作った方が良いと思う。
ctypes.pythonapi.PyThreadState_SetAsyncExc()
の利点は、特別な仕組みが無くてもほとんどのスレッドを終了させられる事にある。
ただし、SystemExit
を処理しているスレッドや、"bare except"を使っているスレッドは、意図通りに終了しないかもしれない。
もっと汎用的に使うには、以下のようなコードが考えられる。
threading.enumerate()
は、メインスレッドを含めたスレッドのリストを返す。
threading.main_thread()
は、メインスレッドを返す。
@printName
def killThemAll():
for thread in threading.enumerate():
if thread != threading.main_thread():
printTime(f"kill: {thread.name}")
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.native_id, ctypes.py_object(SystemExit))
def ff2(name, interval):
while True:
printTime(f"{name}: alive")
time.sleep(interval)
@printName
def test8():
thread1 = threading.Thread(target=ff2, args=("thread1", 1))
thread2 = threading.Thread(target=ff2, args=("thread2", 1))
thread1.start()
thread2.start()
time.sleep(3)
killThemAll()
test8()
18:48:49.684: test8 start
18:48:49.730: thread1: alive
18:48:49.731: thread2: alive
18:48:50.734: thread2: alive
18:48:50.734: thread1: alive
18:48:51.744: thread2: alive
18:48:51.752: thread1: alive
18:48:52.741: killThemAll start
18:48:52.753: kill: Thread-1 (ff2)
18:48:52.769: thread2: alive
18:48:52.773: kill: Thread-2 (ff2)
18:48:52.773: killThemAll end
18:48:52.774: test8 end
スレッドを一時停止させる
スレッドを終了する方法を応用すれば、スレッドを一時停止する仕組みを作れる。
そこで使うのが、threading.Event
Event.wait()
は、内部の値がTrue
になるまでスレッドの動作を停止させる。
停止状態では、全ての動作が停止するのでしっかりと管理しなければスレッドが終了しなくなる恐れがある。
詳しくは、後述する。
class CustomThread4(threading.Thread):
def __init__(self, name, interval):
super().__init__()
self.name = name
self.interval = interval
self.alive = True
self.moving = threading.Event()
self.moving.clear()
self.start()
@printName
def begin(self):
self.moving.set()
@printName
def stop(self):
self.moving.clear()
@printName
def end(self):
self.alive = False
self.moving.set()
@printName
def run(self):
self.moving.wait()
while self.alive:
printTime(f"{self.name}: alive")
time.sleep(self.interval)
self.moving.wait()
@printName
def test9():
thread = CustomThread4("thread", 1)
thread.begin()
time.sleep(3)
thread.stop()
time.sleep(3)
thread.begin()
time.sleep(3)
thread.end()
test9()
開始、停止、終了全て上手くいっている。
18:50:35.232: test9 start
18:50:35.233: run start
18:50:35.233: begin start
18:50:35.234: begin end
18:50:35.234: thread: alive
18:50:36.238: thread: alive
18:50:37.258: thread: alive
18:50:38.246: stop start
18:50:38.259: stop end
18:50:41.272: begin start
18:50:41.275: begin end
18:50:41.275: thread: alive
18:50:42.291: thread: alive
18:50:43.311: thread: alive
18:50:44.279: end start
18:50:44.307: end end
18:50:44.315: test9 end
18:50:44.326: run end
ここで、以前作ったkillThemAll()
を使ってみる。
@printName
def test10():
thread = CustomThread4("thread", 1)
thread.begin()
time.sleep(3)
thread.stop()
time.sleep(3)
thread.begin()
time.sleep(3)
killThemAll()
test10()
この条件では、問題なく終了する。
18:52:37.885: test10 start
18:52:37.930: run start
18:52:37.930: begin start
18:52:37.931: begin end
18:52:37.931: thread: alive
18:52:38.951: thread: alive
18:52:39.986: thread: alive
18:52:40.932: stop start
18:52:40.935: stop end
18:52:43.945: begin start
18:52:43.951: begin end
18:52:43.951: thread: alive
18:52:44.986: thread: alive
18:52:45.998: thread: alive
18:52:46.982: killThemAll start
18:52:46.985: kill: thread
18:52:46.988: killThemAll end
18:52:46.989: test10 end
一方、以下のようにすると"thread"が終了しなくなってしまう。
上記との違いは、thread.stop()
を呼んだ後、つまりスレッドの停止状態でkillThemAll()
を呼んでいる事。
これは、threading.Event
が例外の処理も停止してしまっているから。
これを回避するには、thread.begin()
を呼んでself.moving.clear()
を実行する必要がある。
この場合は、thread.end()
を使うべきだろうけど。
threading.Event
は、しっかりと管理しなくてはいけない。
threading.Event.wati()
にタイムアウトを設定するのも考えた方が良いかも。
# 無限ループ
@printName
def test11():
thread = CustomThread4("thread", 1)
thread.begin()
time.sleep(3)
thread.stop()
time.sleep(3)
killThemAll()
スレッドとの値のやり取り
スレッド間で値をやり取りする場合を考える。
-
threading.RLock
オブジェクト -
threading.Condition
オブジェクト -
threading.Semaphore
オブジェクト -
threading.Barrier
オブジェクト
などを使うと安全にやり取りが出来るが、ここではqueue
だけを考える。
queue を使う
時間が掛かるスレッドが数個ある場合(join()
が使える)
複数のスレッドの間で安全に値をやり取りする場合は、queue
を使うと良いらしい。
大域変数としてresultQueue
を用意して、それに結果を入れてみる。
今回は、時間が掛かるスレッドが数個ある場合を想定してテストコードを作ってみた。
それぞれランダムな時間待機した後にresultQueue
に入れるスレッドを 2 つ用意する。
両方のスレッドが終了するまで待機してからresultQueue
を確かめている。
resultQueue = queue.SimpleQueue()
@printName
def getResult():
result = []
while not resultQueue.empty():
result.append(resultQueue.get())
return result
def putResult(name):
for i in range(3):
printTime(f"{name} {i}")
resultQueue.put(ff(name, i, random.uniform(1, 5)))
@printName
def test12():
thread1 = threading.Thread(target=putResult, args=("thread1",))
thread2 = threading.Thread(target=putResult, args=("thread2",))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[printTime(f"result: {x}") for x in getResult()]
test12()
ランダムに待機しているので"thread1"と"thread2"が逆転することがある。
正常に格納されているようだ。
19:54:17.727: test12 start
19:54:17.751: thread1 0
19:54:17.771: thread2 0
19:54:20.409: {'name': 'thread2', 'index': 0, 'sleep': '2.60', 'result': False}
19:54:20.409: thread2 1
19:54:20.577: {'name': 'thread1', 'index': 0, 'sleep': '2.77', 'result': False}
19:54:20.578: thread1 1
19:54:21.910: {'name': 'thread1', 'index': 1, 'sleep': '1.29', 'result': False}
19:54:21.917: thread1 2
19:54:23.383: {'name': 'thread1', 'index': 2, 'sleep': '1.43', 'result': False}
19:54:24.920: {'name': 'thread2', 'index': 1, 'sleep': '4.50', 'result': False}
19:54:24.939: thread2 2
19:54:29.625: {'name': 'thread2', 'index': 2, 'sleep': '4.67', 'result': False}
19:54:29.629: getResult start
19:54:29.630: getResult end
19:54:29.630: result: {'name': 'thread2', 'index': 0, 'sleep': '2.60', 'result': False}
19:54:29.630: result: {'name': 'thread1', 'index': 0, 'sleep': '2.77', 'result': False}
19:54:29.631: result: {'name': 'thread1', 'index': 1, 'sleep': '1.29', 'result': False}
19:54:29.631: result: {'name': 'thread1', 'index': 2, 'sleep': '1.43', 'result': False}
19:54:29.631: result: {'name': 'thread2', 'index': 1, 'sleep': '4.50', 'result': False}
19:54:29.632: result: {'name': 'thread2', 'index': 2, 'sleep': '4.67', 'result': False}
19:54:29.633: test12 end
スレッドが無数にありjoin()
が使えない場合
今回は、スレッドの中でスレッドを複数回呼んでいる。
そのため、join()
を使うのも手間が掛かる。
(スレッドをリストに入れて後で全部join()
するとか。)
全てのスレッドが終了を監視するwatcher
スレッドを用意する。
watcher
は、実行中のスレッドが 2 以下(メインスレッドとwatcher
自身)でresultQueue
に値が格納されていると値を出力して実行を停止する。
watcher
からthreading.active_count()
を実行すると、最低値は、メインスレッド(停止しているかに関わらない)とwatcher
自身で 2 となる。
def ffQ(name, i, n):
time.sleep(n)
result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
printTime(result)
resultQueue.put(result)
@printName
def startThread(name, n):
for i in range(n):
printTime(f"{name} {i}")
thread = threading.Thread(target=ffQ, args=(name, i, random.uniform(1, 5)))
thread.start()
def watch2(n):
while True:
if threading.active_count() <= 2 and not resultQueue.empty():
printTime("all end")
[printTime(f"result: {x}") for x in getResult()]
return
time.sleep(n)
@printName
def test13():
watcher = threading.Thread(target=watch2, args=(1,))
watcher.start()
thread1 = threading.Thread(target=startThread, args=("thread1", 3))
thread2 = threading.Thread(target=startThread, args=("thread2", 3))
thread1.start()
thread2.start()
test12()
上手くいったようだ。
19:58:45.189: test13 start
19:58:45.245: startThread start
19:58:45.247: thread1 0
19:58:45.247: startThread start
19:58:45.248: thread2 0
19:58:45.248: test13 end
19:58:45.271: thread1 1
19:58:45.271: thread2 1
19:58:45.276: thread1 2
19:58:45.276: thread2 2
19:58:45.282: startThread end
19:58:45.282: startThread end
19:58:46.623: {'name': 'thread2', 'index': 1, 'sleep': '1.33', 'result': False}
19:58:47.109: {'name': 'thread1', 'index': 0, 'sleep': '1.83', 'result': False}
19:58:48.150: {'name': 'thread2', 'index': 2, 'sleep': '2.87', 'result': False}
19:58:49.133: {'name': 'thread2', 'index': 0, 'sleep': '3.85', 'result': False}
19:58:49.932: {'name': 'thread1', 'index': 2, 'sleep': '4.61', 'result': False}
19:58:50.215: {'name': 'thread1', 'index': 1, 'sleep': '4.93', 'result': False}
19:58:50.309: all end
19:58:50.317: getResult start
19:58:50.336: getResult end
19:58:50.341: result: {'name': 'thread2', 'index': 1, 'sleep': '1.33', 'result': False}
19:58:50.351: result: {'name': 'thread1', 'index': 0, 'sleep': '1.83', 'result': False}
19:58:50.365: result: {'name': 'thread2', 'index': 2, 'sleep': '2.87', 'result': False}
19:58:50.374: result: {'name': 'thread2', 'index': 0, 'sleep': '3.85', 'result': False}
19:58:50.389: result: {'name': 'thread1', 'index': 2, 'sleep': '4.61', 'result': False}
19:58:50.405: result: {'name': 'thread1', 'index': 1, 'sleep': '4.93', 'result': False}
特定の値が得られた時点で全てのスレッドの実行を停止する。daemon=True
今回は、条件に合った結果が得られるまで実行を続け、結果が得られたら全てのスレッドを停止する事を考えてみる。
監視するwatcher
スレッド以外は、daemon=True
とする。
こうすることで、メインスレッドとwatcher
スレッドが終了した時点で他のスレッドは、終了する。
以下、2 つの理由でprint()
は削除する。
-
deamon=True
のスレッドからprint()
で出力するとエラーメッセージが表示されることがある - 途中経過のメッセージと、実行終了時のメッセージと区別が付かなくなる。
(メッセージが混ざってしまい、しばらく混乱しました。)
def ffQ2(name, i, n):
time.sleep(n)
result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
resultQueue.put(result)
@printName
def startThread2(name, n):
for i in range(n):
thread = threading.Thread(target=ffQ2, args=(name, i, random.uniform(1, 5)))
thread.start()
def watch3(n):
while True:
if not resultQueue.empty():
result = [x for x in getResult() if x["result"] == True]
if len(result) > 0:
printTime("all end")
[printTime(f"result: {x}") for x in result]
return
time.sleep(n)
@printName
def test14():
watcher = threading.Thread(target=watch3, args=(0.5,))
watcher.start()
thread1 = threading.Thread(target=startThread2, args=("thread1", 1000), daemon=True)
thread2 = threading.Thread(target=startThread2, args=("thread2", 1000), daemon=True)
thread1.start()
thread2.start()
test14()
問題ないようだ。
20:01:07.734: test14 start
20:01:07.734: startThread2 start
20:01:07.749: startThread2 start
20:01:07.749: test14 end
20:01:09.315: getResult start
20:01:09.359: getResult end
20:01:09.911: getResult start
20:01:09.961: getResult end
20:01:09.984: all end
20:01:09.985: result: {'name': 'thread1', 'index': 45, 'sleep': '1.42', 'result': True}
20:01:09.986: result: {'name': 'thread2', 'index': 248, 'sleep': '1.17', 'result': True}
20:01:09.986: result: {'name': 'thread2', 'index': 159, 'sleep': '1.32', 'result': True}
concurrent.futures
concurrent.futures
について調べてみる。
ThreadPoolExecutor
のみを扱うこととする
ThreadPoolExecutor
の基本
ThreadPoolExecutor()
でスレッドの作成。(ワーカーと言った方が良いか)
submit()
で実行。
shutdown()
でスレッドの解放。
shutdown()
は、デフォルトではwait=True
となり、実行が完了するまで待機してから解放する。
待機したくなければ、shutdown(wait=False)
とする。
まずは、threading.Thread
と同等の動きをさせてみる。
def ff(name, i, n):
time.sleep(n)
result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
printTime(result)
return result
@printName
def test1():
for i in range(3):
printTime(f"test submit {i}")
thread1 = ThreadPoolExecutor()
thread1.submit(ff, *("thread1", i, 1))
thread1.shutdown(wait=False)
thread2 = ThreadPoolExecutor()
thread2.submit(ff, *("thread2", i, 2))
thread2.shutdown(wait=False)
test1()
同じような結果になった。
20:05:28.011: test1 start
20:05:28.011: test submit 0
20:05:28.127: test submit 1
20:05:28.145: test submit 2
20:05:28.149: test1 end
20:05:29.113: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:05:29.157: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:05:29.158: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:05:30.154: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:05:30.167: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:05:30.169: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
shutdown(wait=False)
を試してみる
@printName
def test2():
for i in range(3):
printTime(f"test submit {i}")
thread1 = ThreadPoolExecutor()
thread1.submit(ff, *("thread1", i, 1))
thread1.shutdown(wait=True)
thread2 = ThreadPoolExecutor()
thread2.submit(ff, *("thread2", i, 2))
thread2.shutdown(wait=True)
test2()
終了を待ってから、次の処理に行っているのが分かる。
20:07:32.468: test2 start
20:07:32.469: test submit 0
20:07:33.562: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:07:35.620: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:07:35.629: test submit 1
20:07:36.634: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:07:38.654: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:07:38.665: test submit 2
20:07:39.677: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:07:41.699: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:07:41.701: test2 end
値の取得
submit()
は、Future
オブジェクトを返す。
Future
からは、result()
でsubmit()
で指定した関数からの戻り値を取得できる。
@printName
def test3():
for i in range(3):
printTime(f"test submit {i}")
thread1 = ThreadPoolExecutor()
future = thread1.submit(ff, *("thread1", i, 1))
printTime(f"test: {future.result()}")
thread1.shutdown(wait=False)
thread2 = ThreadPoolExecutor()
future = thread2.submit(ff, *("thread2", i, 2))
printTime(f"test: {future.result()}")
thread1.shutdown(wait=False)
test3()
result()
は、実行完了まで待つようだ。
20:08:48.826: test3 start
20:08:48.828: test submit 0
20:08:49.869: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:08:49.882: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:08:51.889: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:08:51.889: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:08:51.889: test submit 1
20:08:52.905: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:08:52.905: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:08:54.926: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:08:54.931: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:08:54.931: test submit 2
20:08:55.946: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:08:55.953: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:08:57.968: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:08:57.981: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:08:57.981: test3 end
with
文
with
文を使えば、shutdown()
を省略出来る。
@printName
def test4():
for i in range(3):
printTime(f"test submit {i}")
with ThreadPoolExecutor() as thread1:
thread1.submit(ff, *("thread1", i, 1))
with ThreadPoolExecutor() as thread2:
thread2.submit(ff, *("thread2", i, 2))
test4()
with
文では、デフォルトのshutdown(wait=True)
が実行される。
20:09:45.531: test4 start
20:09:45.547: test submit 0
20:09:46.602: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:09:48.611: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:09:48.611: test submit 1
20:09:49.628: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:09:51.667: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:09:51.696: test submit 2
20:09:52.700: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:09:54.709: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:09:54.710: test4 end
蛇足だが、with
文で、shutdown(wait=False)
にするには以下の様にする。
class CustomThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()):
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
# def __enter__(self):
# return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=False)
return False
@printName
def test5():
for i in range(3):
printTime(f"test submit {i}")
with CustomThreadPoolExecutor() as thread1:
thread1.submit(ff, *("thread1", i, 1))
with CustomThreadPoolExecutor() as thread2:
thread2.submit(ff, *("thread2", i, 2))
test5()
shutdown(wait=False)
と同等になった。
20:10:30.910: test5 start
20:10:30.910: test submit 0
20:10:30.919: test submit 1
20:10:30.925: test submit 2
20:10:30.939: test5 end
20:10:31.929: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:10:31.937: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:10:31.950: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:10:32.923: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:10:32.941: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:10:32.955: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
map()
ThreadPoolExecutor()
では、map()
で複数の処理を実行できる。
1 つの処理を 1 つのスレッド処理するイメージなようだ。
定義を見てみるとsubmit()
を組み込み関数のmap()
で呼んでいるだけ。
def ff2(v):
return ff(*v)
@printName
def test6():
args = []
for i in range(3):
args.append(("thread1", i, 1))
for i in range(3):
args.append(("thread2", i, 2))
with CustomThreadPoolExecutor() as thread:
future = thread.map(ff2, args)
for i in future:
printTime(f"result: {i}")
test6()
result()
と同じように、処理が終わってから値を取り出している。
20:15:27.643: test6 start
20:15:28.680: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:15:28.680: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:15:28.690: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:15:28.696: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:15:28.697: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:15:28.697: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:15:29.688: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:15:29.688: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:15:29.688: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:15:29.688: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:15:29.704: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:15:29.706: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:15:29.707: test6 end
下記の様にも引数を渡せる。
map()
に渡している関数は、ff()
である事に注意。
result = thread.map(ff, args1, args2, args3)
は、result= thread.map(ff2, zip(args1, args2, args3))
と同等
@printName
def test7():
args1 = ["thread1"] * 3 + ["thread2"] * 3
args2 = list(range(3)) * 2
args3 = [1] * 3 + [2] * 3
with CustomThreadPoolExecutor() as thread:
results = thread.map(ff, args1, args2, args3)
for result in results:
printTime(f"result: {result}")
test7()
20:20:11.325: test7 start
20:20:12.401: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:20:12.401: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:20:12.401: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:20:12.436: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:20:12.437: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:20:12.437: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:20:13.409: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:20:13.411: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': True}
20:20:13.411: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:20:13.414: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:20:13.415: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': True}
20:20:13.415: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:20:13.415: test7 end
Future
について
submit()
は、Future
オブジェクトを返す。
そのFuture
についてもう少し詳しく調べる。
add_cone_callback()
add_cone_callback()
で、実行後に呼び出されるコールバックを指定できる。
def done(future):
printTime(f"done: {future.result()}")
@printName
def test8():
with ThreadPoolExecutor() as thread:
for i in range(6):
printTime(f"test submit {i}")
future = thread.submit(ff, *("thread", i, random.uniform(1, 5)))
future.add_done_callback(done)
test8()
実行後に呼び出されている。
実行完了をわざわざ待たなくてよいので色々使えそう。
20:21:00.987: test8 start
20:21:01.033: test submit 0
20:21:01.074: test submit 1
20:21:01.075: test submit 2
20:21:01.076: test submit 3
20:21:01.096: test submit 4
20:21:01.102: test submit 5
20:21:02.354: {'name': 'thread', 'index': 0, 'sleep': '1.27', 'result': False}
20:21:02.354: done: {'name': 'thread', 'index': 0, 'sleep': '1.27', 'result': False}
20:21:02.696: {'name': 'thread', 'index': 5, 'sleep': '1.59', 'result': False}
20:21:02.705: done: {'name': 'thread', 'index': 5, 'sleep': '1.59', 'result': False}
20:21:03.063: {'name': 'thread', 'index': 3, 'sleep': '1.96', 'result': False}
20:21:03.063: done: {'name': 'thread', 'index': 3, 'sleep': '1.96', 'result': False}
20:21:03.209: {'name': 'thread', 'index': 1, 'sleep': '2.12', 'result': False}
20:21:03.212: done: {'name': 'thread', 'index': 1, 'sleep': '2.12', 'result': False}
20:21:03.854: {'name': 'thread', 'index': 4, 'sleep': '2.74', 'result': False}
20:21:03.857: done: {'name': 'thread', 'index': 4, 'sleep': '2.74', 'result': False}
20:21:05.584: {'name': 'thread', 'index': 2, 'sleep': '4.50', 'result': False}
20:21:05.584: done: {'name': 'thread', 'index': 2, 'sleep': '4.50', 'result': False}
20:21:05.584: test8 end
cancel()
cancel()
で、実行前の処理をキャンセル出来る。
実行途中のものを、強制終了させるわけではない。
@printName
def test9():
with ThreadPoolExecutor() as thread:
for i in range(10000):
future = thread.submit(ff, *("thread", i, random.uniform(1, 5)))
future.cancel()
test9()
10000 個のうち 20 個だけで実行され、それ以外はキャンセルされた。
論理プロセッサ数(16)の値が影響しているのか、偶然か。
20:21:47.488: test9 start
20:21:48.790: {'name': 'thread', 'index': 15, 'sleep': '1.17', 'result': False}
20:21:48.978: {'name': 'thread', 'index': 16, 'sleep': '1.35', 'result': False}
20:21:49.022: {'name': 'thread', 'index': 19, 'sleep': '1.39', 'result': False}
20:21:49.284: {'name': 'thread', 'index': 8, 'sleep': '1.68', 'result': False}
20:21:49.362: {'name': 'thread', 'index': 11, 'sleep': '1.75', 'result': False}
20:21:49.383: {'name': 'thread', 'index': 6, 'sleep': '1.81', 'result': False}
20:21:49.435: {'name': 'thread', 'index': 9, 'sleep': '1.82', 'result': False}
20:21:49.464: {'name': 'thread', 'index': 5, 'sleep': '1.90', 'result': False}
20:21:49.843: {'name': 'thread', 'index': 13, 'sleep': '2.23', 'result': False}
20:21:50.006: {'name': 'thread', 'index': 0, 'sleep': '2.47', 'result': False}
20:21:50.054: {'name': 'thread', 'index': 18, 'sleep': '2.43', 'result': False}
20:21:50.081: {'name': 'thread', 'index': 4, 'sleep': '2.52', 'result': False}
20:21:50.176: {'name': 'thread', 'index': 17, 'sleep': '2.53', 'result': False}
20:21:50.812: {'name': 'thread', 'index': 10, 'sleep': '3.19', 'result': False}
20:21:51.125: {'name': 'thread', 'index': 14, 'sleep': '3.51', 'result': False}
20:21:51.800: {'name': 'thread', 'index': 3, 'sleep': '4.23', 'result': False}
20:21:52.176: {'name': 'thread', 'index': 2, 'sleep': '4.62', 'result': False}
20:21:52.224: {'name': 'thread', 'index': 12, 'sleep': '4.61', 'result': False}
20:21:52.405: {'name': 'thread', 'index': 1, 'sleep': '4.86', 'result': False}
20:21:52.525: {'name': 'thread', 'index': 7, 'sleep': '4.94', 'result': False}
20:21:52.545: test9 end
Future
を扱うための関数
as_completed()
as_completed()
は、処理が終わったものを返してくれるジェネレータを返す。
@printName
def test10():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(6):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
printTime("as_completed start")
for future in as_completed(futures):
printTime(f"completed: {future.result()}")
printTime("as_completed end")
test10()
as_completed()
で実行完了まで待機し、完了順に返されているのが分かる。
20:26:08.072: test10 start
20:26:08.123: as_completed start
20:26:10.421: {'name': 'thread', 'index': 2, 'sleep': '2.31', 'result': False}
20:26:10.421: completed: {'name': 'thread', 'index': 2, 'sleep': '2.31', 'result': False}
20:26:10.908: {'name': 'thread', 'index': 3, 'sleep': '2.78', 'result': False}
20:26:10.913: completed: {'name': 'thread', 'index': 3, 'sleep': '2.78', 'result': False}
20:26:11.737: {'name': 'thread', 'index': 0, 'sleep': '3.62', 'result': False}
20:26:11.764: completed: {'name': 'thread', 'index': 0, 'sleep': '3.62', 'result': False}
20:26:11.972: {'name': 'thread', 'index': 4, 'sleep': '3.85', 'result': False}
20:26:11.978: completed: {'name': 'thread', 'index': 4, 'sleep': '3.85', 'result': False}
20:26:12.412: {'name': 'thread', 'index': 1, 'sleep': '4.30', 'result': False}
20:26:12.412: completed: {'name': 'thread', 'index': 1, 'sleep': '4.30', 'result': False}
20:26:12.835: {'name': 'thread', 'index': 5, 'sleep': '4.70', 'result': False}
20:26:12.838: completed: {'name': 'thread', 'index': 5, 'sleep': '4.70', 'result': False}
20:26:12.838: as_completed end
20:26:12.839: test10 end
一方、以下の様にresult()
を使うと呼び出し順で返される
@printName
def test11():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(6):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
printTime("result start")
for future in futures:
printTime(f"result: {future.result()}")
printTime("result end")
test11()
20:28:05.979: test11 start
20:28:06.197: result start
20:28:07.120: {'name': 'thread', 'index': 1, 'sleep': '1.02', 'result': False}
20:28:07.358: {'name': 'thread', 'index': 4, 'sleep': '1.19', 'result': False}
20:28:07.668: {'name': 'thread', 'index': 0, 'sleep': '1.59', 'result': False}
20:28:07.693: result: {'name': 'thread', 'index': 0, 'sleep': '1.59', 'result': False}
20:28:07.693: result: {'name': 'thread', 'index': 1, 'sleep': '1.02', 'result': False}
20:28:08.531: {'name': 'thread', 'index': 2, 'sleep': '2.42', 'result': True}
20:28:08.539: result: {'name': 'thread', 'index': 2, 'sleep': '2.42', 'result': True}
20:28:10.116: {'name': 'thread', 'index': 3, 'sleep': '3.96', 'result': False}
20:28:10.116: result: {'name': 'thread', 'index': 3, 'sleep': '3.96', 'result': False}
20:28:10.116: result: {'name': 'thread', 'index': 4, 'sleep': '1.19', 'result': False}
20:28:10.197: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
20:28:10.198: result: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
20:28:10.199: result end
20:28:10.199: test11 end
wait()
wait()
は、指定時間まで待機した後に、完了したものと完了していないものを含むnamedtuple
を返す。
完了したものがdone
で、完了していないものがnot_done
.
@printName
def test12():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(6):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
printTime("wait start")
for future in wait(futures, timeout=3).done:
printTime(f"completed: {future.result()}")
printTime("wait end")
test12()
待機時間内に終了したものだけが返っているのが分かる。
20:21:25.871: test12 start
20:21:25.872: wait start
20:21:27.416: {'name': 'thread', 'index': 3, 'sleep': '1.5', 'result': True}
20:21:27.447: {'name': 'thread', 'index': 4, 'sleep': '1.6', 'result': False}
20:21:28.878: completed: {'name': 'thread', 'index': 3, 'sleep': '1.5', 'result': True}
20:21:28.878: completed: {'name': 'thread', 'index': 4, 'sleep': '1.6', 'result': False}
20:21:28.878: wait end
20:21:29.130: {'name': 'thread', 'index': 5, 'sleep': '3.2', 'result': False}
20:21:29.518: {'name': 'thread', 'index': 1, 'sleep': '3.6', 'result': False}
20:21:29.782: {'name': 'thread', 'index': 2, 'sleep': '3.9', 'result': False}
20:21:30.488: {'name': 'thread', 'index': 0, 'sleep': '4.6', 'result': False}
20:21:30.488: test12 end
注意点が1つある。
以下の様にwait()
の指定時間前に完了している処理が複数あると取り出される順番が不定になるという事。
@printName
def test12_():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(6):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 3))))
printTime("wait start")
for future in wait(futures, timeout=3).done:
printTime(f"completed: {future.result()}")
printTime("wait end")
test12_()
実行完了順と取り出し順が異なっている。
20:31:05.074: test12_ start
20:31:05.177: wait start
20:31:06.360: {'name': 'thread', 'index': 4, 'sleep': '1.16', 'result': False}
20:31:06.534: {'name': 'thread', 'index': 3, 'sleep': '1.34', 'result': False}
20:31:06.878: {'name': 'thread', 'index': 1, 'sleep': '1.71', 'result': False}
20:31:06.893: {'name': 'thread', 'index': 2, 'sleep': '1.73', 'result': False}
20:31:07.411: {'name': 'thread', 'index': 5, 'sleep': '2.22', 'result': True}
20:31:07.912: {'name': 'thread', 'index': 0, 'sleep': '2.76', 'result': False}
20:31:07.918: completed: {'name': 'thread', 'index': 3, 'sleep': '1.34', 'result': False}
20:31:07.918: completed: {'name': 'thread', 'index': 0, 'sleep': '2.76', 'result': False}
20:31:07.921: completed: {'name': 'thread', 'index': 4, 'sleep': '1.16', 'result': False}
20:31:07.921: completed: {'name': 'thread', 'index': 2, 'sleep': '1.73', 'result': False}
20:31:07.922: completed: {'name': 'thread', 'index': 1, 'sleep': '1.71', 'result': False}
20:31:07.923: completed: {'name': 'thread', 'index': 5, 'sleep': '2.22', 'result': True}
20:31:07.923: wait end
20:31:07.924: test12_ end
wait()
にreturn_when=FIRST_COMPLETED
を渡すと最初に完了したものだけが取り出される。
@printName
def test13():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(6):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
printTime("wait start")
for future in wait(futures, return_when=FIRST_COMPLETED).done:
printTime(f"completed: {future.result()}")
printTime("wait end")
20:35:32.865: test13 start
20:35:33.056: wait start
20:35:34.457: {'name': 'thread', 'index': 4, 'sleep': '1.40', 'result': False}
20:35:34.471: completed: {'name': 'thread', 'index': 4, 'sleep': '1.40', 'result': False}
20:35:34.471: wait end
20:35:34.476: {'name': 'thread', 'index': 2, 'sleep': '1.46', 'result': False}
20:35:34.534: {'name': 'thread', 'index': 3, 'sleep': '1.49', 'result': True}
20:35:35.309: {'name': 'thread', 'index': 0, 'sleep': '2.34', 'result': True}
20:35:37.304: {'name': 'thread', 'index': 1, 'sleep': '4.33', 'result': True}
20:35:37.444: {'name': 'thread', 'index': 5, 'sleep': '4.37', 'result': False}
20:35:37.454: test13 end
特定の値が得られた時点で全てのスレッドの実行を停止する。その 1
想定している値を見つけたら残りの処理をキャンセルする事を考えてみる。
@printName
def watch(futures):
for future in as_completed(futures):
result = future.result()
if result["result"]:
printTime(f"success: {result}")
break
for future in futures:
future.cancel()
@printName
def test14():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(1000):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
watch(futures)
test14()
1000 個の処理をsubmit()
して、その後に値を調べている。
しかし、大部分がキャンセルされて実行されていない。
20:49:10.823: test14 start
20:49:10.975: watch start
20:49:12.283: {'name': 'thread', 'index': 11, 'sleep': '1.32', 'result': False}
20:49:12.345: {'name': 'thread', 'index': 7, 'sleep': '1.43', 'result': False}
20:49:12.409: {'name': 'thread', 'index': 2, 'sleep': '1.53', 'result': False}
20:49:12.431: {'name': 'thread', 'index': 13, 'sleep': '1.47', 'result': False}
20:49:12.461: {'name': 'thread', 'index': 14, 'sleep': '1.50', 'result': False}
20:49:12.642: {'name': 'thread', 'index': 18, 'sleep': '1.67', 'result': True}
20:49:12.645: success: {'name': 'thread', 'index': 18, 'sleep': '1.67', 'result': True}
20:49:12.651: watch end
20:49:13.359: {'name': 'thread', 'index': 16, 'sleep': '2.38', 'result': False}
20:49:13.732: {'name': 'thread', 'index': 5, 'sleep': '2.84', 'result': False}
20:49:13.763: {'name': 'thread', 'index': 24, 'sleep': '1.27', 'result': False}
20:49:13.960: {'name': 'thread', 'index': 9, 'sleep': '2.99', 'result': False}
20:49:14.124: {'name': 'thread', 'index': 8, 'sleep': '3.19', 'result': False}
20:49:14.156: {'name': 'thread', 'index': 17, 'sleep': '3.17', 'result': False}
20:49:14.203: {'name': 'thread', 'index': 0, 'sleep': '3.33', 'result': False}
20:49:14.255: {'name': 'thread', 'index': 1, 'sleep': '3.37', 'result': False}
20:49:14.279: {'name': 'thread', 'index': 4, 'sleep': '3.38', 'result': False}
20:49:14.324: {'name': 'thread', 'index': 22, 'sleep': '1.89', 'result': False}
20:49:14.885: {'name': 'thread', 'index': 20, 'sleep': '2.60', 'result': True}
20:49:14.931: {'name': 'thread', 'index': 10, 'sleep': '3.97', 'result': False}
20:49:15.140: {'name': 'thread', 'index': 15, 'sleep': '4.18', 'result': False}
20:49:15.176: {'name': 'thread', 'index': 12, 'sleep': '4.21', 'result': True}
20:49:15.219: {'name': 'thread', 'index': 23, 'sleep': '2.78', 'result': False}
20:49:15.254: {'name': 'thread', 'index': 3, 'sleep': '4.37', 'result': False}
20:49:15.283: {'name': 'thread', 'index': 25, 'sleep': '2.63', 'result': False}
20:49:15.481: {'name': 'thread', 'index': 6, 'sleep': '4.59', 'result': False}
20:49:15.892: {'name': 'thread', 'index': 19, 'sleep': '4.90', 'result': False}
20:49:16.486: {'name': 'thread', 'index': 21, 'sleep': '4.12', 'result': True}
20:49:16.489: test14 end
蛇足だが、以下の様にも出来る。
@printName
def test14_1():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(1000):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
with ThreadPoolExecutor() as watcher:
watcher.submit(watch, futures)
@printName
def test14_2():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(1000):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
watcher = threading.Thread(target=watch, args=(futures,))
watcher.start()
特定の値が得られた時点で全てのスレッドの実行を停止する。その 2
その 1 だと 1000 個のリストを作っているので無駄が多い。
もう少し効率が良さそうなものを考えてみる。
以下の様に、10 個ずつ判定してみる。
@printName
def watch2(futures):
finish = False
for future in as_completed(futures):
result = future.result()
if result["result"]:
printTime(f"success: {result}")
finish = True
for future in futures:
future.cancel()
break
return finish
@printName
def test15():
with ThreadPoolExecutor() as thread:
futures = []
for i in range(1000):
futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
if (i + 1) % 10 == 0:
if watch2(futures):
futures = []
break
futures = []
watch2(futures)
メモリ効率が良くなったが、as_completed()
による待機が気になる。
21:15:05.143: test15 start
21:15:05.187: watch2 start
21:15:06.661: {'name': 'thread', 'index': 7, 'sleep': '1.46', 'result': False}
21:15:07.093: {'name': 'thread', 'index': 4, 'sleep': '1.91', 'result': False}
21:15:07.500: {'name': 'thread', 'index': 9, 'sleep': '2.29', 'result': True}
21:15:07.521: success: {'name': 'thread', 'index': 9, 'sleep': '2.29', 'result': True}
21:15:07.522: watch2 end
21:15:07.522: watch2 start
21:15:07.522: watch2 end
21:15:07.692: {'name': 'thread', 'index': 0, 'sleep': '2.52', 'result': False}
21:15:07.828: {'name': 'thread', 'index': 5, 'sleep': '2.65', 'result': False}
21:15:08.341: {'name': 'thread', 'index': 6, 'sleep': '3.16', 'result': False}
21:15:08.588: {'name': 'thread', 'index': 1, 'sleep': '3.42', 'result': False}
21:15:09.251: {'name': 'thread', 'index': 3, 'sleep': '4.06', 'result': False}
21:15:09.472: {'name': 'thread', 'index': 2, 'sleep': '4.27', 'result': False}
21:15:09.879: {'name': 'thread', 'index': 8, 'sleep': '4.68', 'result': True}
21:15:09.879: test15 end
特定の値が得られた時点で全てのスレッドの実行を停止する。その 3
値を監視するwatcher
スレッドを追加してそれで監視する。
値のやり取りには、queue
を使う。
time.sleep()
を入れないと、処理が速すぎてbreak
せずに 1000 回thread.submit()
を実行してしまう。
まあ、メモリの消費を考えなければ問題ないかも。
現実のプログラムでは、もっと時間が掛かってbreak
するかもしれない。
def getQueue(queue):
results = []
while not queue.empty():
results.append(queue.get())
return results
def getFlag(queue):
flag = queue.get()
queue.put(flag)
return flag
def putFlag(queue, flag):
queue.get()
queue.put(flag)
def allCancel(futures):
for future in futures:
if not future.done():
future.cancel()
@printName
def watch4(queue, endFlag):
try:
flag = getFlag(endFlag)
while not flag:
futures = getQueue(queue)
for future in as_completed(futures):
result = future.result()
if result["result"]:
printTime(f"success: {result}")
putFlag(endFlag, True)
flag = True
allCancel(futures)
break
time.sleep(0.05)
except BaseException as e:
print(f"except {type(e)}, {e}")
@printName
def test17():
futuresQueue = queue.SimpleQueue()
endFlag = queue.Queue(1)
endFlag.put(False)
with ThreadPoolExecutor() as watcher:
with ThreadPoolExecutor() as thread:
watcher.submit(watch4, futuresQueue, endFlag)
for i in range(1000):
if getFlag(endFlag):
printTime("break")
break
futuresQueue.put(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
time.sleep(0.1)
printTime(f"qsize: {futuresQueue.qsize()}")
if getFlag(endFlag):
futures = getQueue(futuresQueue)
allCancel(futures)
test17()
23:41:31.648: test17 start
23:41:31.655: watch4 start
23:41:33.651: {'name': 'thread', 'index': 2, 'sleep': '1.72', 'result': False}
23:41:34.369: {'name': 'thread', 'index': 9, 'sleep': '1.50', 'result': False}
23:41:34.670: {'name': 'thread', 'index': 1, 'sleep': '2.85', 'result': False}
23:41:34.702: {'name': 'thread', 'index': 0, 'sleep': '3.03', 'result': False}
23:41:34.871: {'name': 'thread', 'index': 10, 'sleep': '1.88', 'result': False}
23:41:34.947: {'name': 'thread', 'index': 15, 'sleep': '1.33', 'result': False}
23:41:34.984: {'name': 'thread', 'index': 14, 'sleep': '1.50', 'result': False}
23:41:35.198: {'name': 'thread', 'index': 19, 'sleep': '1.14', 'result': False}
23:41:35.436: {'name': 'thread', 'index': 21, 'sleep': '1.05', 'result': False}
23:41:35.456: {'name': 'thread', 'index': 4, 'sleep': '3.29', 'result': False}
23:41:35.602: {'name': 'thread', 'index': 12, 'sleep': '2.36', 'result': False}
23:41:35.637: {'name': 'thread', 'index': 13, 'sleep': '2.27', 'result': False}
23:41:35.878: {'name': 'thread', 'index': 3, 'sleep': '3.81', 'result': False}
23:41:36.083: {'name': 'thread', 'index': 22, 'sleep': '1.39', 'result': True}
23:41:36.083: success: {'name': 'thread', 'index': 22, 'sleep': '1.39', 'result': True}
23:41:36.136: break
23:41:36.143: qsize: 12
23:41:36.161: watch4 end
23:41:36.167: {'name': 'thread', 'index': 6, 'sleep': '3.69', 'result': False}
23:41:36.276: {'name': 'thread', 'index': 7, 'sleep': '3.66', 'result': False}
23:41:36.295: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
23:41:36.304: {'name': 'thread', 'index': 25, 'sleep': '1.30', 'result': False}
23:41:36.335: {'name': 'thread', 'index': 26, 'sleep': '1.34', 'result': False}
23:41:36.626: {'name': 'thread', 'index': 24, 'sleep': '1.75', 'result': False}
23:41:36.720: {'name': 'thread', 'index': 17, 'sleep': '2.88', 'result': True}
23:41:37.176: {'name': 'thread', 'index': 18, 'sleep': '3.23', 'result': False}
23:41:37.191: {'name': 'thread', 'index': 8, 'sleep': '4.44', 'result': False}
23:41:37.247: {'name': 'thread', 'index': 23, 'sleep': '2.52', 'result': False}
23:41:37.693: {'name': 'thread', 'index': 20, 'sleep': '3.53', 'result': False}
23:41:38.039: {'name': 'thread', 'index': 11, 'sleep': '4.91', 'result': False}
23:41:38.415: {'name': 'thread', 'index': 31, 'sleep': '2.76', 'result': False}
23:41:38.699: {'name': 'thread', 'index': 16, 'sleep': '4.98', 'result': False}
23:41:38.809: {'name': 'thread', 'index': 32, 'sleep': '2.90', 'result': False}
23:41:39.374: {'name': 'thread', 'index': 28, 'sleep': '3.93', 'result': False}
23:41:39.764: {'name': 'thread', 'index': 27, 'sleep': '4.55', 'result': False}
23:41:39.828: {'name': 'thread', 'index': 30, 'sleep': '4.21', 'result': False}
23:41:39.891: {'name': 'thread', 'index': 29, 'sleep': '4.43', 'result': False}
23:41:40.187: {'name': 'thread', 'index': 33, 'sleep': '4.08', 'result': False}
23:41:40.187: test17 end
Discussion