🐍

[Python] スレッドについて

2022/05/27に公開

はじめに

時間が掛かる処理がありどうにか速く出来ないかと、スレッドの処理について調べてみることにしました。

特に、

  • subprocessの呼び出しの時間を何とかしたい
  • 1 つ 1 つは、1 秒程度で終了する。
  • 条件を満たすまで実行を続ける。
  • 実行が終わったら値を表示する。

の場合に適した方法が知りたかったです。

環境

python
Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)]

threding

threadingを使った場合から調べてみる。

threading を起動させる

Thread.start()

簡単な使い方。
thread = threading.Thread(target=None, args=(), deamon=False)
でスレッドを作成する。
targetには、スレッドで処理させる関数を指定する。
argsには、関数に与える引数を渡す。
thread.start()で、別のスレッドでtargetの関数が引数にargsを与えられて実行される。

thread1 = threading.Thread(target=func, args=("thread", i, 1))
thread1.start()

以下で簡単に動きを見ていく事にする。
後々のことを考えて汎用的な関数の定義もしておく。

import time
import random
import threading
import queue
import ctypes
import sys

import datetime
import functools


# 現在時刻の取得
def getTime():
  return datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]


# 現在時刻付きprint
def printTime(string):
  print(f"{getTime()}: {string}")


# 関数名と起動時刻の表示するデコレータ
def printName(func):

  @functools.wraps(func)
  def wrapper(*args, **kwargs):
    print(f"{getTime()}: {func.__name__} start")
    result = func(*args, **kwargs)
    print(f"{getTime()}: {func.__name__} end")
    return result

  return wrapper


def check():
  return True if random.random() < 0.1 else False


def ff(name, i, n):
  time.sleep(n)
  result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
  printTime(result)
  return result


@printName
def test1():
  for i in range(3):
    printTime(f"test start {i}")
    thread1 = threading.Thread(target=ff, args=("thread1", i, 1))
    thread2 = threading.Thread(target=ff, args=("thread2", i, 2))
    thread1.start()
    thread2.start()


test1()

同時並行で実行される。
呼び出した順番と結果が表示されるのに多少の前後がある。

実行結果
18:28:21.281: test1 start
18:28:21.297: test start 0
18:28:21.359: test start 1
18:28:21.366: test start 2
18:28:21.367: test1 end
18:28:22.349: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:28:22.381: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:28:22.382: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:28:23.369: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': False}
18:28:23.369: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': True}
18:28:23.369: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}

Thread.run()を直接呼んでみる

@printName
def test2():
  for i in range(3):
    printTime(f"test run {i}")
    thread1 = threading.Thread(target=ff, args=("thread1", i, 1))
    thread2 = threading.Thread(target=ff, args=("thread2", i, 2))
    thread1.run()
    thread2.run()

test2()

こうするとメインスレッドでtargetの関数を実行しているようだ。
間違って使ってしまったが価値はないと思う。

実行結果
18:29:49.284: test2 start
18:29:49.318: test run 0
18:29:50.320: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:29:52.333: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': False}
18:29:52.335: test run 1
18:29:53.347: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:29:55.384: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': False}
18:29:55.413: test run 2
18:29:56.436: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:29:58.441: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}
18:29:58.459: test2 end

スレッドの終了まで待機する

join()

以下のような場合、メインスレッドの方がthread1より速く終了する。

@printName
def test3():
  thread1 = threading.Thread(target=ff, args=("thread1", 0, 5))
  thread2 = threading.Thread(target=ff, args=("thread2", 0, 1))
  printTime("start main")
  thread1.start()
  thread2.start()
  printTime("end main")

test3()
実行結果
18:35:16.626: test3 start
18:35:16.639: start main
18:35:16.661: end main
18:35:16.661: test3 end
18:35:17.674: {'name': 'thread2', 'index': 0, 'sleep': 1, 'result': False}
18:35:21.686: {'name': 'thread1', 'index': 0, 'sleep': 5, 'result': False}

特定のスレッドの終了を待ちたい時には、join()を使う。

@printName
def test4():
  thread1 = threading.Thread(target=ff, args=("thread1", 0, 5))
  thread2 = threading.Thread(target=ff, args=("thread2", 0, 1))
  printTime("start main")
  thread1.start()
  thread2.start()
  thread1.join()
  printTime("end main")

test4()

今度は、時間のかかるthread1の終了を待ってからメインスレッドが終了した。

実行結果
18:36:05.990: test4 start
18:36:05.999: start main
18:36:07.046: {'name': 'thread2', 'index': 0, 'sleep': 1, 'result': False}
18:36:11.044: {'name': 'thread1', 'index': 0, 'sleep': 5, 'result': False}
18:36:11.050: end main
18:36:11.050: test4 end

スレッドを終了させる

daemon=True

作成時にdaemon=Trueとして作成されたスレッドは、メインスレッドも含むdaemon=Falseのスレッドが全ての終了したら強制終了する。

def watch(name, n):
  while True:
    printTime(f"{name}: alive")
    time.sleep(n)


@printName
def test5():
  daemon = threading.Thread(target=watch, args=("daemon", 0.5), daemon=True)
  daemon.start()
  printTime("start main")
  test1()
  printTime("end main")

test5()

他のスレッドが終了したら、daemon は、勝手に終了している。

実行結果
18:37:33.974: test5 start
18:37:34.047: daemon: alive
18:37:34.047: start main
18:37:34.051: test1 start
18:37:34.054: test start 0
18:37:34.071: test start 1
18:37:34.106: test start 2
18:37:34.178: test1 end
18:37:34.185: end main
18:37:34.185: test5 end
18:37:34.564: daemon: alive
18:37:35.071: daemon: alive
18:37:35.071: {'name': 'thread1', 'index': 0, 'sleep': 1, 'result': False}
18:37:35.107: {'name': 'thread1', 'index': 1, 'sleep': 1, 'result': False}
18:37:35.168: {'name': 'thread1', 'index': 2, 'sleep': 1, 'result': False}
18:37:35.600: daemon: alive
18:37:36.081: {'name': 'thread2', 'index': 0, 'sleep': 2, 'result': False}
18:37:36.123: {'name': 'thread2', 'index': 1, 'sleep': 2, 'result': True}
18:37:36.142: daemon: alive
18:37:36.183: {'name': 'thread2', 'index': 2, 'sleep': 2, 'result': False}

daemon を 2 つ作っても、daemon 以外が終了したら強制終了される。

@printName
def test6():
  daemon1 = threading.Thread(target=watch, args=("daemon1", 0.5), daemon=True)
  daemon1.start()
  daemon2 = threading.Thread(target=watch, args=("daemon1", 0.5), daemon=True)
  daemon2.start()
  printTime("start main")
  test1()
  printTime("end main")

test6()

個人的には、daemon という名前から最後まで残って後処理をする役割なのかと思ってしまった。
しかし、daemon に全てのスレッドが終わった後の処理をさせようと思っても出来ない。
なぜならば、他のスレッドが終了した時点で強制終了されるので後処理が実行されないから。
そのような目的では使えないようだ。

余談だが、join()を使って daemon の終了を待つのが通常の使用法との記述を見たが、決してそんな事はないと思う。
それでは、何が目的か分からなくなってしまう。

終了させる仕組みを作る

スレッドに例外を送ると終了させることが出来る。
ctypes.pythonapi.PyThreadState_SetAsyncExc()を使うと例外を送ることが出来る。

class CustomThread1(threading.Thread):

  def __init__(self, name, interval):
    super().__init__()
    self.name = name
    self.interval = interval

  @printName
  def kill(self):
    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.native_id, ctypes.py_object(SystemExit))
    if ret > 1:  # 状態が変更されたスレッドの数を返す。通常は、1。見つからなかった場合は、0。
      ctypes.pythonapi.PyThreadState_SetAsyncExc(self.native_id, None)  # たぶん必要ないが念のため。まだ送られていない例外を消去
      printTime("failed")

  @printName
  def run(self):
    while True:
      printTime(f"{self.name}: alive")
      time.sleep(self.interval)


@printName
def test7():
  thread1 = CustomThread1("thread1", 1)
  thread1.start()
  time.sleep(3)
  thread1.kill()

test7()

thread1は、何らかの手段で終了させないと無限ループで実行を続ける。
kill()を実行して例外を発生させると終了したのが分かる。

実行結果
18:44:04.264: test7 start
18:44:04.309: run start
18:44:04.333: thread1: alive
18:44:05.364: thread1: alive
18:44:06.374: thread1: alive
18:44:07.320: kill start
18:44:07.322: kill end
18:44:07.323: test7 end

ctypes.pythonapi.PyThreadState_SetAsyncExc()ctypes.py_object(SystemExit)以外のctypes.py_object(ValueError)なども送ってもスレッドは終了する。
その場合は、run()の中で適切に例外を処理してやらないとエラーメッセージが表示される。

上記のコードは、動作を調べてみると以下のようなコードと同じような動きとなる。

class CustomThread2(threading.Thread):

  def __init__(self, name, interval):
    super().__init__()
    self.name = name
    self.interval = interval
    self.killed = False

  @printName
  def kill(self):
    self.killed = True

  @printName
  def run(self):
    while True:
      if self.killed:
        sys.exit(0)
      printTime(f"{self.name}: alive")
      time.sleep(self.interval)

もっと簡単にすれば、以下の様になる。

class CustomThread3(threading.Thread):

  def __init__(self, name, interval):
    super().__init__()
    self.name = name
    self.interval = interval
    self.alive = True

  @printName
  def kill(self):
    self.alive = False

  @printName
  def run(self):
    while self.alive:
      printTime(f"{self.name}: alive")
      time.sleep(self.interval)

現実の問題としてこんな感じで終了させられる処理も珍しいかもしれないが、途中で終了させる意図があれば終了処理をしっかりと作った方が良いと思う。

ctypes.pythonapi.PyThreadState_SetAsyncExc()の利点は、特別な仕組みが無くてもほとんどのスレッドを終了させられる事にある。
ただし、SystemExitを処理しているスレッドや、"bare except"を使っているスレッドは、意図通りに終了しないかもしれない。

もっと汎用的に使うには、以下のようなコードが考えられる。
threading.enumerate()は、メインスレッドを含めたスレッドのリストを返す。
threading.main_thread()は、メインスレッドを返す。

@printName
def killThemAll():
  for thread in threading.enumerate():
    if thread != threading.main_thread():
      printTime(f"kill: {thread.name}")
      ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.native_id, ctypes.py_object(SystemExit))


def ff2(name, interval):
  while True:
    printTime(f"{name}: alive")
    time.sleep(interval)


@printName
def test8():
  thread1 = threading.Thread(target=ff2, args=("thread1", 1))
  thread2 = threading.Thread(target=ff2, args=("thread2", 1))
  thread1.start()
  thread2.start()
  time.sleep(3)
  killThemAll()

test8()
実行結果
18:48:49.684: test8 start
18:48:49.730: thread1: alive
18:48:49.731: thread2: alive
18:48:50.734: thread2: alive
18:48:50.734: thread1: alive
18:48:51.744: thread2: alive
18:48:51.752: thread1: alive
18:48:52.741: killThemAll start
18:48:52.753: kill: Thread-1 (ff2)
18:48:52.769: thread2: alive
18:48:52.773: kill: Thread-2 (ff2)
18:48:52.773: killThemAll end
18:48:52.774: test8 end

スレッドを一時停止させる

スレッドを終了する方法を応用すれば、スレッドを一時停止する仕組みを作れる。
そこで使うのが、threading.Event
Event.wait()は、内部の値がTrueになるまでスレッドの動作を停止させる。
停止状態では、全ての動作が停止するのでしっかりと管理しなければスレッドが終了しなくなる恐れがある。
詳しくは、後述する。

class CustomThread4(threading.Thread):

  def __init__(self, name, interval):
    super().__init__()
    self.name = name
    self.interval = interval
    self.alive = True
    self.moving = threading.Event()
    self.moving.clear()
    self.start()

  @printName
  def begin(self):
    self.moving.set()

  @printName
  def stop(self):
    self.moving.clear()

  @printName
  def end(self):
    self.alive = False
    self.moving.set()

  @printName
  def run(self):
    self.moving.wait()
    while self.alive:
      printTime(f"{self.name}: alive")
      time.sleep(self.interval)
      self.moving.wait()


@printName
def test9():
  thread = CustomThread4("thread", 1)
  thread.begin()
  time.sleep(3)
  thread.stop()
  time.sleep(3)
  thread.begin()
  time.sleep(3)
  thread.end()

test9()

開始、停止、終了全て上手くいっている。

実行結果
18:50:35.232: test9 start
18:50:35.233: run start
18:50:35.233: begin start
18:50:35.234: begin end
18:50:35.234: thread: alive
18:50:36.238: thread: alive
18:50:37.258: thread: alive
18:50:38.246: stop start
18:50:38.259: stop end
18:50:41.272: begin start
18:50:41.275: begin end
18:50:41.275: thread: alive
18:50:42.291: thread: alive
18:50:43.311: thread: alive
18:50:44.279: end start
18:50:44.307: end end
18:50:44.315: test9 end
18:50:44.326: run end

ここで、以前作ったkillThemAll()を使ってみる。

@printName
def test10():
  thread = CustomThread4("thread", 1)
  thread.begin()
  time.sleep(3)
  thread.stop()
  time.sleep(3)
  thread.begin()
  time.sleep(3)
  killThemAll()

test10()

この条件では、問題なく終了する。

実行結果
18:52:37.885: test10 start
18:52:37.930: run start
18:52:37.930: begin start
18:52:37.931: begin end
18:52:37.931: thread: alive
18:52:38.951: thread: alive
18:52:39.986: thread: alive
18:52:40.932: stop start
18:52:40.935: stop end
18:52:43.945: begin start
18:52:43.951: begin end
18:52:43.951: thread: alive
18:52:44.986: thread: alive
18:52:45.998: thread: alive
18:52:46.982: killThemAll start
18:52:46.985: kill: thread
18:52:46.988: killThemAll end
18:52:46.989: test10 end

一方、以下のようにすると"thread"が終了しなくなってしまう。
上記との違いは、thread.stop()を呼んだ後、つまりスレッドの停止状態でkillThemAll()を呼んでいる事。
これは、threading.Eventが例外の処理も停止してしまっているから。
これを回避するには、thread.begin()を呼んでself.moving.clear()を実行する必要がある。
この場合は、thread.end()を使うべきだろうけど。
threading.Eventは、しっかりと管理しなくてはいけない。
threading.Event.wati()にタイムアウトを設定するのも考えた方が良いかも。

# 無限ループ
@printName
def test11():
  thread = CustomThread4("thread", 1)
  thread.begin()
  time.sleep(3)
  thread.stop()
  time.sleep(3)
  killThemAll()

スレッドとの値のやり取り

スレッド間で値をやり取りする場合を考える。

  • threading.RLock オブジェクト
  • threading.Condition オブジェクト
  • threading.Semaphore オブジェクト
  • threading.Barrier オブジェクト

などを使うと安全にやり取りが出来るが、ここではqueueだけを考える。

queue を使う

時間が掛かるスレッドが数個ある場合(join()が使える)

複数のスレッドの間で安全に値をやり取りする場合は、queueを使うと良いらしい。
大域変数としてresultQueueを用意して、それに結果を入れてみる。

今回は、時間が掛かるスレッドが数個ある場合を想定してテストコードを作ってみた。
それぞれランダムな時間待機した後にresultQueueに入れるスレッドを 2 つ用意する。
両方のスレッドが終了するまで待機してからresultQueueを確かめている。

resultQueue = queue.SimpleQueue()


@printName
def getResult():
  result = []
  while not resultQueue.empty():
    result.append(resultQueue.get())
  return result


def putResult(name):
  for i in range(3):
    printTime(f"{name} {i}")
    resultQueue.put(ff(name, i, random.uniform(1, 5)))


@printName
def test12():
  thread1 = threading.Thread(target=putResult, args=("thread1",))
  thread2 = threading.Thread(target=putResult, args=("thread2",))
  thread1.start()
  thread2.start()
  thread1.join()
  thread2.join()
  [printTime(f"result: {x}") for x in getResult()]

test12()

ランダムに待機しているので"thread1"と"thread2"が逆転することがある。
正常に格納されているようだ。

実行結果例
19:54:17.727: test12 start
19:54:17.751: thread1 0
19:54:17.771: thread2 0
19:54:20.409: {'name': 'thread2', 'index': 0, 'sleep': '2.60', 'result': False}
19:54:20.409: thread2 1
19:54:20.577: {'name': 'thread1', 'index': 0, 'sleep': '2.77', 'result': False}
19:54:20.578: thread1 1
19:54:21.910: {'name': 'thread1', 'index': 1, 'sleep': '1.29', 'result': False}
19:54:21.917: thread1 2
19:54:23.383: {'name': 'thread1', 'index': 2, 'sleep': '1.43', 'result': False}
19:54:24.920: {'name': 'thread2', 'index': 1, 'sleep': '4.50', 'result': False}
19:54:24.939: thread2 2
19:54:29.625: {'name': 'thread2', 'index': 2, 'sleep': '4.67', 'result': False}
19:54:29.629: getResult start
19:54:29.630: getResult end
19:54:29.630: result: {'name': 'thread2', 'index': 0, 'sleep': '2.60', 'result': False}
19:54:29.630: result: {'name': 'thread1', 'index': 0, 'sleep': '2.77', 'result': False}
19:54:29.631: result: {'name': 'thread1', 'index': 1, 'sleep': '1.29', 'result': False}
19:54:29.631: result: {'name': 'thread1', 'index': 2, 'sleep': '1.43', 'result': False}
19:54:29.631: result: {'name': 'thread2', 'index': 1, 'sleep': '4.50', 'result': False}
19:54:29.632: result: {'name': 'thread2', 'index': 2, 'sleep': '4.67', 'result': False}
19:54:29.633: test12 end
スレッドが無数にありjoin()が使えない場合

今回は、スレッドの中でスレッドを複数回呼んでいる。
そのため、join()を使うのも手間が掛かる。
(スレッドをリストに入れて後で全部join()するとか。)

全てのスレッドが終了を監視するwatcherスレッドを用意する。
watcherは、実行中のスレッドが 2 以下(メインスレッドとwatcher自身)でresultQueueに値が格納されていると値を出力して実行を停止する。
watcherからthreading.active_count()を実行すると、最低値は、メインスレッド(停止しているかに関わらない)とwatcher自身で 2 となる。

def ffQ(name, i, n):
  time.sleep(n)
  result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
  printTime(result)
  resultQueue.put(result)


@printName
def startThread(name, n):
  for i in range(n):
    printTime(f"{name} {i}")
    thread = threading.Thread(target=ffQ, args=(name, i, random.uniform(1, 5)))
    thread.start()


def watch2(n):
  while True:
    if threading.active_count() <= 2 and not resultQueue.empty():
      printTime("all end")
      [printTime(f"result: {x}") for x in getResult()]
      return
    time.sleep(n)


@printName
def test13():
  watcher = threading.Thread(target=watch2, args=(1,))
  watcher.start()
  thread1 = threading.Thread(target=startThread, args=("thread1", 3))
  thread2 = threading.Thread(target=startThread, args=("thread2", 3))
  thread1.start()
  thread2.start()

test12()

上手くいったようだ。

実行結果例
19:58:45.189: test13 start
19:58:45.245: startThread start
19:58:45.247: thread1 0
19:58:45.247: startThread start
19:58:45.248: thread2 0
19:58:45.248: test13 end
19:58:45.271: thread1 1
19:58:45.271: thread2 1
19:58:45.276: thread1 2
19:58:45.276: thread2 2
19:58:45.282: startThread end
19:58:45.282: startThread end
19:58:46.623: {'name': 'thread2', 'index': 1, 'sleep': '1.33', 'result': False}
19:58:47.109: {'name': 'thread1', 'index': 0, 'sleep': '1.83', 'result': False}
19:58:48.150: {'name': 'thread2', 'index': 2, 'sleep': '2.87', 'result': False}
19:58:49.133: {'name': 'thread2', 'index': 0, 'sleep': '3.85', 'result': False}
19:58:49.932: {'name': 'thread1', 'index': 2, 'sleep': '4.61', 'result': False}
19:58:50.215: {'name': 'thread1', 'index': 1, 'sleep': '4.93', 'result': False}
19:58:50.309: all end
19:58:50.317: getResult start
19:58:50.336: getResult end
19:58:50.341: result: {'name': 'thread2', 'index': 1, 'sleep': '1.33', 'result': False}
19:58:50.351: result: {'name': 'thread1', 'index': 0, 'sleep': '1.83', 'result': False}
19:58:50.365: result: {'name': 'thread2', 'index': 2, 'sleep': '2.87', 'result': False}
19:58:50.374: result: {'name': 'thread2', 'index': 0, 'sleep': '3.85', 'result': False}
19:58:50.389: result: {'name': 'thread1', 'index': 2, 'sleep': '4.61', 'result': False}
19:58:50.405: result: {'name': 'thread1', 'index': 1, 'sleep': '4.93', 'result': False}
特定の値が得られた時点で全てのスレッドの実行を停止する。daemon=True

今回は、条件に合った結果が得られるまで実行を続け、結果が得られたら全てのスレッドを停止する事を考えてみる。

監視するwatcherスレッド以外は、daemon=Trueとする。
こうすることで、メインスレッドとwatcherスレッドが終了した時点で他のスレッドは、終了する。

以下、2 つの理由でprint()は削除する。

  • deamon=Trueのスレッドからprint()で出力するとエラーメッセージが表示されることがある
  • 途中経過のメッセージと、実行終了時のメッセージと区別が付かなくなる。
    (メッセージが混ざってしまい、しばらく混乱しました。)
def ffQ2(name, i, n):
  time.sleep(n)
  result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
  resultQueue.put(result)


@printName
def startThread2(name, n):
  for i in range(n):
    thread = threading.Thread(target=ffQ2, args=(name, i, random.uniform(1, 5)))
    thread.start()


def watch3(n):
  while True:
    if not resultQueue.empty():
      result = [x for x in getResult() if x["result"] == True]
      if len(result) > 0:
        printTime("all end")
        [printTime(f"result: {x}") for x in result]
        return
    time.sleep(n)


@printName
def test14():
  watcher = threading.Thread(target=watch3, args=(0.5,))
  watcher.start()
  thread1 = threading.Thread(target=startThread2, args=("thread1", 1000), daemon=True)
  thread2 = threading.Thread(target=startThread2, args=("thread2", 1000), daemon=True)
  thread1.start()
  thread2.start()

test14()

問題ないようだ。

実行結果
20:01:07.734: test14 start
20:01:07.734: startThread2 start
20:01:07.749: startThread2 start
20:01:07.749: test14 end
20:01:09.315: getResult start
20:01:09.359: getResult end
20:01:09.911: getResult start
20:01:09.961: getResult end
20:01:09.984: all end
20:01:09.985: result: {'name': 'thread1', 'index': 45, 'sleep': '1.42', 'result': True}
20:01:09.986: result: {'name': 'thread2', 'index': 248, 'sleep': '1.17', 'result': True}
20:01:09.986: result: {'name': 'thread2', 'index': 159, 'sleep': '1.32', 'result': True}

concurrent.futures

concurrent.futuresについて調べてみる。
ThreadPoolExecutorのみを扱うこととする

ThreadPoolExecutorの基本

ThreadPoolExecutor()でスレッドの作成。(ワーカーと言った方が良いか)
submit()で実行。
shutdown()でスレッドの解放。
shutdown()は、デフォルトではwait=Trueとなり、実行が完了するまで待機してから解放する。
待機したくなければ、shutdown(wait=False)とする。

まずは、threading.Threadと同等の動きをさせてみる。

def ff(name, i, n):
  time.sleep(n)
  result = {"name": name, "index": i, "sleep": f"{n:.2f}", "result": check()}
  printTime(result)
  return result


@printName
def test1():
  for i in range(3):
    printTime(f"test submit {i}")
    thread1 = ThreadPoolExecutor()
    thread1.submit(ff, *("thread1", i, 1))
    thread1.shutdown(wait=False)
    thread2 = ThreadPoolExecutor()
    thread2.submit(ff, *("thread2", i, 2))
    thread2.shutdown(wait=False)

test1()

同じような結果になった。

実行結果
20:05:28.011: test1 start
20:05:28.011: test submit 0
20:05:28.127: test submit 1
20:05:28.145: test submit 2
20:05:28.149: test1 end
20:05:29.113: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:05:29.157: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:05:29.158: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:05:30.154: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:05:30.167: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:05:30.169: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}

shutdown(wait=False)を試してみる

@printName
def test2():
  for i in range(3):
    printTime(f"test submit {i}")
    thread1 = ThreadPoolExecutor()
    thread1.submit(ff, *("thread1", i, 1))
    thread1.shutdown(wait=True)
    thread2 = ThreadPoolExecutor()
    thread2.submit(ff, *("thread2", i, 2))
    thread2.shutdown(wait=True)

test2()

終了を待ってから、次の処理に行っているのが分かる。

実行結果
20:07:32.468: test2 start
20:07:32.469: test submit 0
20:07:33.562: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:07:35.620: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:07:35.629: test submit 1
20:07:36.634: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:07:38.654: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:07:38.665: test submit 2
20:07:39.677: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:07:41.699: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:07:41.701: test2 end

値の取得

submit()は、Future オブジェクトを返す。
Futureからは、result()submit()で指定した関数からの戻り値を取得できる。

@printName
def test3():
  for i in range(3):
    printTime(f"test submit {i}")
    thread1 = ThreadPoolExecutor()
    future = thread1.submit(ff, *("thread1", i, 1))
    printTime(f"test: {future.result()}")
    thread1.shutdown(wait=False)
    thread2 = ThreadPoolExecutor()
    future = thread2.submit(ff, *("thread2", i, 2))
    printTime(f"test: {future.result()}")
    thread1.shutdown(wait=False)

test3()

result()は、実行完了まで待つようだ。

実行結果
20:08:48.826: test3 start
20:08:48.828: test submit 0
20:08:49.869: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:08:49.882: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:08:51.889: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:08:51.889: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:08:51.889: test submit 1
20:08:52.905: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:08:52.905: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:08:54.926: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:08:54.931: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:08:54.931: test submit 2
20:08:55.946: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:08:55.953: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:08:57.968: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:08:57.981: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:08:57.981: test3 end

with

with文を使えば、shutdown()を省略出来る。

@printName
def test4():
  for i in range(3):
    printTime(f"test submit {i}")
    with ThreadPoolExecutor() as thread1:
      thread1.submit(ff, *("thread1", i, 1))
    with ThreadPoolExecutor() as thread2:
      thread2.submit(ff, *("thread2", i, 2))

test4()

with文では、デフォルトのshutdown(wait=True)が実行される。

実行結果
20:09:45.531: test4 start
20:09:45.547: test submit 0
20:09:46.602: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:09:48.611: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:09:48.611: test submit 1
20:09:49.628: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:09:51.667: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:09:51.696: test submit 2
20:09:52.700: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:09:54.709: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:09:54.710: test4 end

蛇足だが、with文で、shutdown(wait=False)にするには以下の様にする。

class CustomThreadPoolExecutor(ThreadPoolExecutor):

  def __init__(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()):
    super().__init__(max_workers, thread_name_prefix, initializer, initargs)

  # def __enter__(self):
  #   return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=False)
    return False


@printName
def test5():
  for i in range(3):
    printTime(f"test submit {i}")
    with CustomThreadPoolExecutor() as thread1:
      thread1.submit(ff, *("thread1", i, 1))
    with CustomThreadPoolExecutor() as thread2:
      thread2.submit(ff, *("thread2", i, 2))

test5()

shutdown(wait=False)と同等になった。

実行結果
20:10:30.910: test5 start
20:10:30.910: test submit 0
20:10:30.919: test submit 1
20:10:30.925: test submit 2
20:10:30.939: test5 end
20:10:31.929: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:10:31.937: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:10:31.950: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:10:32.923: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:10:32.941: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:10:32.955: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}

map()

ThreadPoolExecutor()では、map()で複数の処理を実行できる。
1 つの処理を 1 つのスレッド処理するイメージなようだ。
定義を見てみるとsubmit()を組み込み関数のmap()で呼んでいるだけ。

def ff2(v):
  return ff(*v)


@printName
def test6():
  args = []
  for i in range(3):
    args.append(("thread1", i, 1))
  for i in range(3):
    args.append(("thread2", i, 2))

  with CustomThreadPoolExecutor() as thread:
    future = thread.map(ff2, args)
  for i in future:
    printTime(f"result: {i}")

test6()

result()と同じように、処理が終わってから値を取り出している。

実行結果
20:15:27.643: test6 start
20:15:28.680: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:15:28.680: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:15:28.690: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:15:28.696: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:15:28.697: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:15:28.697: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:15:29.688: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:15:29.688: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:15:29.688: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:15:29.688: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': False}
20:15:29.704: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:15:29.706: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:15:29.707: test6 end

下記の様にも引数を渡せる。
map()に渡している関数は、ff()である事に注意。
result = thread.map(ff, args1, args2, args3)は、result= thread.map(ff2, zip(args1, args2, args3))と同等

@printName
def test7():
  args1 = ["thread1"] * 3 + ["thread2"] * 3
  args2 = list(range(3)) * 2
  args3 = [1] * 3 + [2] * 3

  with CustomThreadPoolExecutor() as thread:
    results = thread.map(ff, args1, args2, args3)
  for result in results:
    printTime(f"result: {result}")

test7()
実行結果
20:20:11.325: test7 start
20:20:12.401: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:20:12.401: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:20:12.401: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:20:12.436: result: {'name': 'thread1', 'index': 0, 'sleep': '1.00', 'result': False}
20:20:12.437: result: {'name': 'thread1', 'index': 1, 'sleep': '1.00', 'result': False}
20:20:12.437: result: {'name': 'thread1', 'index': 2, 'sleep': '1.00', 'result': False}
20:20:13.409: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:20:13.411: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': True}
20:20:13.411: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:20:13.414: result: {'name': 'thread2', 'index': 0, 'sleep': '2.00', 'result': False}
20:20:13.415: result: {'name': 'thread2', 'index': 1, 'sleep': '2.00', 'result': True}
20:20:13.415: result: {'name': 'thread2', 'index': 2, 'sleep': '2.00', 'result': False}
20:20:13.415: test7 end

Futureについて

submit()は、Future オブジェクトを返す。
そのFutureについてもう少し詳しく調べる。

add_cone_callback()

add_cone_callback()で、実行後に呼び出されるコールバックを指定できる。

def done(future):
  printTime(f"done: {future.result()}")


@printName
def test8():
  with ThreadPoolExecutor() as thread:
    for i in range(6):
      printTime(f"test submit {i}")
      future = thread.submit(ff, *("thread", i, random.uniform(1, 5)))
      future.add_done_callback(done)

test8()

実行後に呼び出されている。
実行完了をわざわざ待たなくてよいので色々使えそう。

実行結果
20:21:00.987: test8 start
20:21:01.033: test submit 0
20:21:01.074: test submit 1
20:21:01.075: test submit 2
20:21:01.076: test submit 3
20:21:01.096: test submit 4
20:21:01.102: test submit 5
20:21:02.354: {'name': 'thread', 'index': 0, 'sleep': '1.27', 'result': False}
20:21:02.354: done: {'name': 'thread', 'index': 0, 'sleep': '1.27', 'result': False}
20:21:02.696: {'name': 'thread', 'index': 5, 'sleep': '1.59', 'result': False}
20:21:02.705: done: {'name': 'thread', 'index': 5, 'sleep': '1.59', 'result': False}
20:21:03.063: {'name': 'thread', 'index': 3, 'sleep': '1.96', 'result': False}
20:21:03.063: done: {'name': 'thread', 'index': 3, 'sleep': '1.96', 'result': False}
20:21:03.209: {'name': 'thread', 'index': 1, 'sleep': '2.12', 'result': False}
20:21:03.212: done: {'name': 'thread', 'index': 1, 'sleep': '2.12', 'result': False}
20:21:03.854: {'name': 'thread', 'index': 4, 'sleep': '2.74', 'result': False}
20:21:03.857: done: {'name': 'thread', 'index': 4, 'sleep': '2.74', 'result': False}
20:21:05.584: {'name': 'thread', 'index': 2, 'sleep': '4.50', 'result': False}
20:21:05.584: done: {'name': 'thread', 'index': 2, 'sleep': '4.50', 'result': False}
20:21:05.584: test8 end

cancel()

cancel()で、実行前の処理をキャンセル出来る。
実行途中のものを、強制終了させるわけではない。

@printName
def test9():
  with ThreadPoolExecutor() as thread:
    for i in range(10000):
      future = thread.submit(ff, *("thread", i, random.uniform(1, 5)))
      future.cancel()

test9()

10000 個のうち 20 個だけで実行され、それ以外はキャンセルされた。
論理プロセッサ数(16)の値が影響しているのか、偶然か。

実行結果
20:21:47.488: test9 start
20:21:48.790: {'name': 'thread', 'index': 15, 'sleep': '1.17', 'result': False}
20:21:48.978: {'name': 'thread', 'index': 16, 'sleep': '1.35', 'result': False}
20:21:49.022: {'name': 'thread', 'index': 19, 'sleep': '1.39', 'result': False}
20:21:49.284: {'name': 'thread', 'index': 8, 'sleep': '1.68', 'result': False}
20:21:49.362: {'name': 'thread', 'index': 11, 'sleep': '1.75', 'result': False}
20:21:49.383: {'name': 'thread', 'index': 6, 'sleep': '1.81', 'result': False}
20:21:49.435: {'name': 'thread', 'index': 9, 'sleep': '1.82', 'result': False}
20:21:49.464: {'name': 'thread', 'index': 5, 'sleep': '1.90', 'result': False}
20:21:49.843: {'name': 'thread', 'index': 13, 'sleep': '2.23', 'result': False}
20:21:50.006: {'name': 'thread', 'index': 0, 'sleep': '2.47', 'result': False}
20:21:50.054: {'name': 'thread', 'index': 18, 'sleep': '2.43', 'result': False}
20:21:50.081: {'name': 'thread', 'index': 4, 'sleep': '2.52', 'result': False}
20:21:50.176: {'name': 'thread', 'index': 17, 'sleep': '2.53', 'result': False}
20:21:50.812: {'name': 'thread', 'index': 10, 'sleep': '3.19', 'result': False}
20:21:51.125: {'name': 'thread', 'index': 14, 'sleep': '3.51', 'result': False}
20:21:51.800: {'name': 'thread', 'index': 3, 'sleep': '4.23', 'result': False}
20:21:52.176: {'name': 'thread', 'index': 2, 'sleep': '4.62', 'result': False}
20:21:52.224: {'name': 'thread', 'index': 12, 'sleep': '4.61', 'result': False}
20:21:52.405: {'name': 'thread', 'index': 1, 'sleep': '4.86', 'result': False}
20:21:52.525: {'name': 'thread', 'index': 7, 'sleep': '4.94', 'result': False}
20:21:52.545: test9 end

Futureを扱うための関数

as_completed()

as_completed()は、処理が終わったものを返してくれるジェネレータを返す。

@printName
def test10():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(6):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))

    printTime("as_completed start")
    for future in as_completed(futures):
      printTime(f"completed: {future.result()}")
    printTime("as_completed end")

test10()

as_completed()で実行完了まで待機し、完了順に返されているのが分かる。

実行結果
20:26:08.072: test10 start
20:26:08.123: as_completed start
20:26:10.421: {'name': 'thread', 'index': 2, 'sleep': '2.31', 'result': False}
20:26:10.421: completed: {'name': 'thread', 'index': 2, 'sleep': '2.31', 'result': False}
20:26:10.908: {'name': 'thread', 'index': 3, 'sleep': '2.78', 'result': False}
20:26:10.913: completed: {'name': 'thread', 'index': 3, 'sleep': '2.78', 'result': False}
20:26:11.737: {'name': 'thread', 'index': 0, 'sleep': '3.62', 'result': False}
20:26:11.764: completed: {'name': 'thread', 'index': 0, 'sleep': '3.62', 'result': False}
20:26:11.972: {'name': 'thread', 'index': 4, 'sleep': '3.85', 'result': False}
20:26:11.978: completed: {'name': 'thread', 'index': 4, 'sleep': '3.85', 'result': False}
20:26:12.412: {'name': 'thread', 'index': 1, 'sleep': '4.30', 'result': False}
20:26:12.412: completed: {'name': 'thread', 'index': 1, 'sleep': '4.30', 'result': False}
20:26:12.835: {'name': 'thread', 'index': 5, 'sleep': '4.70', 'result': False}
20:26:12.838: completed: {'name': 'thread', 'index': 5, 'sleep': '4.70', 'result': False}
20:26:12.838: as_completed end
20:26:12.839: test10 end

一方、以下の様にresult()を使うと呼び出し順で返される

@printName
def test11():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(6):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))

    printTime("result start")
    for future in futures:
      printTime(f"result: {future.result()}")
    printTime("result end")

test11()
実行結果
20:28:05.979: test11 start
20:28:06.197: result start
20:28:07.120: {'name': 'thread', 'index': 1, 'sleep': '1.02', 'result': False}
20:28:07.358: {'name': 'thread', 'index': 4, 'sleep': '1.19', 'result': False}
20:28:07.668: {'name': 'thread', 'index': 0, 'sleep': '1.59', 'result': False}
20:28:07.693: result: {'name': 'thread', 'index': 0, 'sleep': '1.59', 'result': False}
20:28:07.693: result: {'name': 'thread', 'index': 1, 'sleep': '1.02', 'result': False}
20:28:08.531: {'name': 'thread', 'index': 2, 'sleep': '2.42', 'result': True}
20:28:08.539: result: {'name': 'thread', 'index': 2, 'sleep': '2.42', 'result': True}
20:28:10.116: {'name': 'thread', 'index': 3, 'sleep': '3.96', 'result': False}
20:28:10.116: result: {'name': 'thread', 'index': 3, 'sleep': '3.96', 'result': False}
20:28:10.116: result: {'name': 'thread', 'index': 4, 'sleep': '1.19', 'result': False}
20:28:10.197: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
20:28:10.198: result: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
20:28:10.199: result end
20:28:10.199: test11 end

wait()

wait()は、指定時間まで待機した後に、完了したものと完了していないものを含むnamedtupleを返す。
完了したものがdoneで、完了していないものがnot_done.

@printName
def test12():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(6):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))

    printTime("wait start")
    for future in wait(futures, timeout=3).done:
      printTime(f"completed: {future.result()}")
    printTime("wait end")

test12()

待機時間内に終了したものだけが返っているのが分かる。

実行結果
20:21:25.871: test12 start
20:21:25.872: wait start
20:21:27.416: {'name': 'thread', 'index': 3, 'sleep': '1.5', 'result': True}
20:21:27.447: {'name': 'thread', 'index': 4, 'sleep': '1.6', 'result': False}
20:21:28.878: completed: {'name': 'thread', 'index': 3, 'sleep': '1.5', 'result': True}
20:21:28.878: completed: {'name': 'thread', 'index': 4, 'sleep': '1.6', 'result': False}
20:21:28.878: wait end
20:21:29.130: {'name': 'thread', 'index': 5, 'sleep': '3.2', 'result': False}
20:21:29.518: {'name': 'thread', 'index': 1, 'sleep': '3.6', 'result': False}
20:21:29.782: {'name': 'thread', 'index': 2, 'sleep': '3.9', 'result': False}
20:21:30.488: {'name': 'thread', 'index': 0, 'sleep': '4.6', 'result': False}
20:21:30.488: test12 end

注意点が1つある。
以下の様にwait()の指定時間前に完了している処理が複数あると取り出される順番が不定になるという事。

@printName
def test12_():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(6):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 3))))

    printTime("wait start")
    for future in wait(futures, timeout=3).done:
      printTime(f"completed: {future.result()}")
    printTime("wait end")

test12_()

実行完了順と取り出し順が異なっている。

実行結果
20:31:05.074: test12_ start
20:31:05.177: wait start
20:31:06.360: {'name': 'thread', 'index': 4, 'sleep': '1.16', 'result': False}
20:31:06.534: {'name': 'thread', 'index': 3, 'sleep': '1.34', 'result': False}
20:31:06.878: {'name': 'thread', 'index': 1, 'sleep': '1.71', 'result': False}
20:31:06.893: {'name': 'thread', 'index': 2, 'sleep': '1.73', 'result': False}
20:31:07.411: {'name': 'thread', 'index': 5, 'sleep': '2.22', 'result': True}
20:31:07.912: {'name': 'thread', 'index': 0, 'sleep': '2.76', 'result': False}
20:31:07.918: completed: {'name': 'thread', 'index': 3, 'sleep': '1.34', 'result': False}
20:31:07.918: completed: {'name': 'thread', 'index': 0, 'sleep': '2.76', 'result': False}
20:31:07.921: completed: {'name': 'thread', 'index': 4, 'sleep': '1.16', 'result': False}
20:31:07.921: completed: {'name': 'thread', 'index': 2, 'sleep': '1.73', 'result': False}
20:31:07.922: completed: {'name': 'thread', 'index': 1, 'sleep': '1.71', 'result': False}
20:31:07.923: completed: {'name': 'thread', 'index': 5, 'sleep': '2.22', 'result': True}
20:31:07.923: wait end
20:31:07.924: test12_ end

wait()return_when=FIRST_COMPLETEDを渡すと最初に完了したものだけが取り出される。

@printName
def test13():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(6):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))

    printTime("wait start")
    for future in wait(futures, return_when=FIRST_COMPLETED).done:
      printTime(f"completed: {future.result()}")
    printTime("wait end")
実行結果
20:35:32.865: test13 start
20:35:33.056: wait start
20:35:34.457: {'name': 'thread', 'index': 4, 'sleep': '1.40', 'result': False}
20:35:34.471: completed: {'name': 'thread', 'index': 4, 'sleep': '1.40', 'result': False}
20:35:34.471: wait end
20:35:34.476: {'name': 'thread', 'index': 2, 'sleep': '1.46', 'result': False}
20:35:34.534: {'name': 'thread', 'index': 3, 'sleep': '1.49', 'result': True}
20:35:35.309: {'name': 'thread', 'index': 0, 'sleep': '2.34', 'result': True}
20:35:37.304: {'name': 'thread', 'index': 1, 'sleep': '4.33', 'result': True}
20:35:37.444: {'name': 'thread', 'index': 5, 'sleep': '4.37', 'result': False}
20:35:37.454: test13 end

特定の値が得られた時点で全てのスレッドの実行を停止する。その 1

想定している値を見つけたら残りの処理をキャンセルする事を考えてみる。

@printName
def watch(futures):
  for future in as_completed(futures):
    result = future.result()
    if result["result"]:
      printTime(f"success: {result}")
      break
  for future in futures:
    future.cancel()


@printName
def test14():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(1000):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
    watch(futures)

test14()

1000 個の処理をsubmit()して、その後に値を調べている。
しかし、大部分がキャンセルされて実行されていない。

実行結果
20:49:10.823: test14 start
20:49:10.975: watch start
20:49:12.283: {'name': 'thread', 'index': 11, 'sleep': '1.32', 'result': False}
20:49:12.345: {'name': 'thread', 'index': 7, 'sleep': '1.43', 'result': False}
20:49:12.409: {'name': 'thread', 'index': 2, 'sleep': '1.53', 'result': False}
20:49:12.431: {'name': 'thread', 'index': 13, 'sleep': '1.47', 'result': False}
20:49:12.461: {'name': 'thread', 'index': 14, 'sleep': '1.50', 'result': False}
20:49:12.642: {'name': 'thread', 'index': 18, 'sleep': '1.67', 'result': True}
20:49:12.645: success: {'name': 'thread', 'index': 18, 'sleep': '1.67', 'result': True}
20:49:12.651: watch end
20:49:13.359: {'name': 'thread', 'index': 16, 'sleep': '2.38', 'result': False}
20:49:13.732: {'name': 'thread', 'index': 5, 'sleep': '2.84', 'result': False}
20:49:13.763: {'name': 'thread', 'index': 24, 'sleep': '1.27', 'result': False}
20:49:13.960: {'name': 'thread', 'index': 9, 'sleep': '2.99', 'result': False}
20:49:14.124: {'name': 'thread', 'index': 8, 'sleep': '3.19', 'result': False}
20:49:14.156: {'name': 'thread', 'index': 17, 'sleep': '3.17', 'result': False}
20:49:14.203: {'name': 'thread', 'index': 0, 'sleep': '3.33', 'result': False}
20:49:14.255: {'name': 'thread', 'index': 1, 'sleep': '3.37', 'result': False}
20:49:14.279: {'name': 'thread', 'index': 4, 'sleep': '3.38', 'result': False}
20:49:14.324: {'name': 'thread', 'index': 22, 'sleep': '1.89', 'result': False}
20:49:14.885: {'name': 'thread', 'index': 20, 'sleep': '2.60', 'result': True}
20:49:14.931: {'name': 'thread', 'index': 10, 'sleep': '3.97', 'result': False}
20:49:15.140: {'name': 'thread', 'index': 15, 'sleep': '4.18', 'result': False}
20:49:15.176: {'name': 'thread', 'index': 12, 'sleep': '4.21', 'result': True}
20:49:15.219: {'name': 'thread', 'index': 23, 'sleep': '2.78', 'result': False}
20:49:15.254: {'name': 'thread', 'index': 3, 'sleep': '4.37', 'result': False}
20:49:15.283: {'name': 'thread', 'index': 25, 'sleep': '2.63', 'result': False}
20:49:15.481: {'name': 'thread', 'index': 6, 'sleep': '4.59', 'result': False}
20:49:15.892: {'name': 'thread', 'index': 19, 'sleep': '4.90', 'result': False}
20:49:16.486: {'name': 'thread', 'index': 21, 'sleep': '4.12', 'result': True}
20:49:16.489: test14 end

蛇足だが、以下の様にも出来る。

@printName
def test14_1():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(1000):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
    with ThreadPoolExecutor() as watcher:
      watcher.submit(watch, futures)

@printName
def test14_2():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(1000):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
    watcher = threading.Thread(target=watch, args=(futures,))
    watcher.start()

特定の値が得られた時点で全てのスレッドの実行を停止する。その 2

その 1 だと 1000 個のリストを作っているので無駄が多い。
もう少し効率が良さそうなものを考えてみる。
以下の様に、10 個ずつ判定してみる。

@printName
def watch2(futures):
  finish = False
  for future in as_completed(futures):
    result = future.result()
    if result["result"]:
      printTime(f"success: {result}")
      finish = True
      for future in futures:
        future.cancel()
      break
  return finish


@printName
def test15():
  with ThreadPoolExecutor() as thread:
    futures = []
    for i in range(1000):
      futures.append(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
      if (i + 1) % 10 == 0:
        if watch2(futures):
          futures = []
          break
        futures = []
    watch2(futures)

メモリ効率が良くなったが、as_completed()による待機が気になる。

実行結果
21:15:05.143: test15 start
21:15:05.187: watch2 start
21:15:06.661: {'name': 'thread', 'index': 7, 'sleep': '1.46', 'result': False}
21:15:07.093: {'name': 'thread', 'index': 4, 'sleep': '1.91', 'result': False}
21:15:07.500: {'name': 'thread', 'index': 9, 'sleep': '2.29', 'result': True}
21:15:07.521: success: {'name': 'thread', 'index': 9, 'sleep': '2.29', 'result': True}
21:15:07.522: watch2 end
21:15:07.522: watch2 start
21:15:07.522: watch2 end
21:15:07.692: {'name': 'thread', 'index': 0, 'sleep': '2.52', 'result': False}
21:15:07.828: {'name': 'thread', 'index': 5, 'sleep': '2.65', 'result': False}
21:15:08.341: {'name': 'thread', 'index': 6, 'sleep': '3.16', 'result': False}
21:15:08.588: {'name': 'thread', 'index': 1, 'sleep': '3.42', 'result': False}
21:15:09.251: {'name': 'thread', 'index': 3, 'sleep': '4.06', 'result': False}
21:15:09.472: {'name': 'thread', 'index': 2, 'sleep': '4.27', 'result': False}
21:15:09.879: {'name': 'thread', 'index': 8, 'sleep': '4.68', 'result': True}
21:15:09.879: test15 end

特定の値が得られた時点で全てのスレッドの実行を停止する。その 3

値を監視するwatcherスレッドを追加してそれで監視する。
値のやり取りには、queueを使う。
time.sleep()を入れないと、処理が速すぎてbreakせずに 1000 回thread.submit()を実行してしまう。
まあ、メモリの消費を考えなければ問題ないかも。
現実のプログラムでは、もっと時間が掛かってbreakするかもしれない。

def getQueue(queue):
  results = []
  while not queue.empty():
    results.append(queue.get())
  return results


def getFlag(queue):
  flag = queue.get()
  queue.put(flag)
  return flag


def putFlag(queue, flag):
  queue.get()
  queue.put(flag)


def allCancel(futures):
  for future in futures:
    if not future.done():
      future.cancel()


@printName
def watch4(queue, endFlag):
  try:
    flag = getFlag(endFlag)
    while not flag:
      futures = getQueue(queue)
      for future in as_completed(futures):
        result = future.result()
        if result["result"]:
          printTime(f"success: {result}")
          putFlag(endFlag, True)
          flag = True
          allCancel(futures)
          break
    time.sleep(0.05)
  except BaseException as e:
    print(f"except {type(e)}, {e}")


@printName
def test17():
  futuresQueue = queue.SimpleQueue()
  endFlag = queue.Queue(1)
  endFlag.put(False)
  with ThreadPoolExecutor() as watcher:
    with ThreadPoolExecutor() as thread:
      watcher.submit(watch4, futuresQueue, endFlag)
      for i in range(1000):
        if getFlag(endFlag):
          printTime("break")
          break
        futuresQueue.put(thread.submit(ff, *("thread", i, random.uniform(1, 5))))
        time.sleep(0.1)
      printTime(f"qsize: {futuresQueue.qsize()}")
      if getFlag(endFlag):
        futures = getQueue(futuresQueue)
        allCancel(futures)

test17()
実行結果
23:41:31.648: test17 start
23:41:31.655: watch4 start
23:41:33.651: {'name': 'thread', 'index': 2, 'sleep': '1.72', 'result': False}
23:41:34.369: {'name': 'thread', 'index': 9, 'sleep': '1.50', 'result': False}
23:41:34.670: {'name': 'thread', 'index': 1, 'sleep': '2.85', 'result': False}
23:41:34.702: {'name': 'thread', 'index': 0, 'sleep': '3.03', 'result': False}
23:41:34.871: {'name': 'thread', 'index': 10, 'sleep': '1.88', 'result': False}
23:41:34.947: {'name': 'thread', 'index': 15, 'sleep': '1.33', 'result': False}
23:41:34.984: {'name': 'thread', 'index': 14, 'sleep': '1.50', 'result': False}
23:41:35.198: {'name': 'thread', 'index': 19, 'sleep': '1.14', 'result': False}
23:41:35.436: {'name': 'thread', 'index': 21, 'sleep': '1.05', 'result': False}
23:41:35.456: {'name': 'thread', 'index': 4, 'sleep': '3.29', 'result': False}
23:41:35.602: {'name': 'thread', 'index': 12, 'sleep': '2.36', 'result': False}
23:41:35.637: {'name': 'thread', 'index': 13, 'sleep': '2.27', 'result': False}
23:41:35.878: {'name': 'thread', 'index': 3, 'sleep': '3.81', 'result': False}
23:41:36.083: {'name': 'thread', 'index': 22, 'sleep': '1.39', 'result': True}
23:41:36.083: success: {'name': 'thread', 'index': 22, 'sleep': '1.39', 'result': True}
23:41:36.136: break
23:41:36.143: qsize: 12
23:41:36.161: watch4 end
23:41:36.167: {'name': 'thread', 'index': 6, 'sleep': '3.69', 'result': False}
23:41:36.276: {'name': 'thread', 'index': 7, 'sleep': '3.66', 'result': False}
23:41:36.295: {'name': 'thread', 'index': 5, 'sleep': '4.00', 'result': False}
23:41:36.304: {'name': 'thread', 'index': 25, 'sleep': '1.30', 'result': False}
23:41:36.335: {'name': 'thread', 'index': 26, 'sleep': '1.34', 'result': False}
23:41:36.626: {'name': 'thread', 'index': 24, 'sleep': '1.75', 'result': False}
23:41:36.720: {'name': 'thread', 'index': 17, 'sleep': '2.88', 'result': True}
23:41:37.176: {'name': 'thread', 'index': 18, 'sleep': '3.23', 'result': False}
23:41:37.191: {'name': 'thread', 'index': 8, 'sleep': '4.44', 'result': False}
23:41:37.247: {'name': 'thread', 'index': 23, 'sleep': '2.52', 'result': False}
23:41:37.693: {'name': 'thread', 'index': 20, 'sleep': '3.53', 'result': False}
23:41:38.039: {'name': 'thread', 'index': 11, 'sleep': '4.91', 'result': False}
23:41:38.415: {'name': 'thread', 'index': 31, 'sleep': '2.76', 'result': False}
23:41:38.699: {'name': 'thread', 'index': 16, 'sleep': '4.98', 'result': False}
23:41:38.809: {'name': 'thread', 'index': 32, 'sleep': '2.90', 'result': False}
23:41:39.374: {'name': 'thread', 'index': 28, 'sleep': '3.93', 'result': False}
23:41:39.764: {'name': 'thread', 'index': 27, 'sleep': '4.55', 'result': False}
23:41:39.828: {'name': 'thread', 'index': 30, 'sleep': '4.21', 'result': False}
23:41:39.891: {'name': 'thread', 'index': 29, 'sleep': '4.43', 'result': False}
23:41:40.187: {'name': 'thread', 'index': 33, 'sleep': '4.08', 'result': False}
23:41:40.187: test17 end

Discussion