Open9
Pythonでの並行処理
README
参考
この記事がたぶんいちばんよくまとまっている
concurrent.futures.Future
は直接インスタンス化しないこと! 必ず、concurrent.futures.Executor
を利用する
呼び出し可能オブジェクトの非同期実行をカプセル化します。 Future インスタンスは Executor.submit() で生成され、テストを除いて直接生成すべきではありません。
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
future.result()
: 引数なしなら、処理の完了を待つ
import time
from concurrent.futures import ThreadPoolExecutor
def fetch_html(url: str) -> str:
"""非同期で行いたい処理"""
time.sleep(3) # 処理時間の待機時間を表現
return url
if __name__ == '__main__':
executor = ThreadPoolExecutor()
# 未来に行う(≒実行が先送りされる)からFutureクラスっぽい
future = executor.submit(fetch_html, url='https://exmaple.com')
print(f'{future=}') # <Future at 0x1012f2910 state=running>
print(f'{future.result()=}') # 'https://exmaple.com'
future.result(timeout)
: 一定時間待って完了しないならconcurrent.futures.TimeoutError
例外送出
import time
from concurrent.futures import ThreadPoolExecutor
def fetch_html(url: str) -> str:
"""非同期で行いたい処理"""
time.sleep(3) # 処理時間の待機時間を表現
return url
if __name__ == '__main__':
executor = ThreadPoolExecutor()
# 未来に行う(≒実行が先送りされる)からFutureクラスっぽい
future = executor.submit(fetch_html, url='https://exmaple.com')
print(f'{future.result(timeout=1)}')
Traceback (most recent call last):
File "/Users/mohira/PycharmProjects/concurrency-samples/10.1/1.py", line 16, in <module>
print(f'{future.result(timeout=1)}')
File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 447, in result
raise TimeoutError()
concurrent.futures._base.TimeoutError
`futre
import concurrent.futures
import time
from concurrent.futures import ThreadPoolExecutor
def fetch_html(url: str) -> str:
"""非同期で行いたい処理"""
time.sleep(3) # 処理時間の待機時間を表現
return url
if __name__ == '__main__':
executor = ThreadPoolExecutor()
future = executor.submit(fetch_html, url='https://exmaple.com')
try:
print(f'{future.result(timeout=1)}')
except concurrent.futures.TimeoutError:
print('timeout')
print(future.running()) # True
print(future.done()) # False
print(future.cancelled()) # False
でも、待機している
cancelってどうやるん? 実行前ならおkってどういうこと?
逐次処理 vs マルチスレッド
import time
from concurrent.futures import as_completed, ThreadPoolExecutor
from typing import Callable
import requests
def fetch(url: str) -> str:
response = requests.get(url)
time.sleep(1)
return f'{url}: {response.status_code} {response.reason}'
def elapsed_time(f: Callable):
def wrapper(*args, **kwargs):
start = time.time()
v = f(*args, **kwargs)
elapsed = time.time() - start
print(f'{f.__name__}: {elapsed}s')
return v
return wrapper
@elapsed_time
def get_sequential(urls: list[str]) -> None:
for url in urls:
print(fetch(url))
@elapsed_time
def get_multi_thread(urls: list[str]) -> None:
with ThreadPoolExecutor(max_workers=5) as e:
futures = [e.submit(fetch, url) for url in urls]
for f in as_completed(futures):
print(f.result())
def main():
urls = [
'https://example.com',
'https://python.org',
'https://google.com',
]
get_sequential(urls)
get_multi_thread(urls)
if __name__ == '__main__':
main()
https://example.com: 200 OK
https://python.org: 200 OK
https://google.com: 200 OK
get_sequential: 4.676729917526245s
https://google.com: 200 OK
https://example.com: 200 OK
https://python.org: 200 OK
get_multi_thread: 1.738647222518921s
Profileをみればわかる
「並行性は性質」の意見みつけた
“並列処理の同義語だと考えている人は驚くかもしれませんが、並行性と並列性は同じものではありません。並行性はアプリケーションの実装に関するものではなく、プログラムやアルゴリズム、問題が持つ性質です。そして並列処理は、並行問題に対するアプローチの1つにすぎません。”
抜粋:: Michal Jaworski、Tarek Ziade 著 稲田直哉、芝田 将、渋川よしき、清水川貴之、森本哲也 翻訳 “エキスパートPythonプログラミング 改訂2版(for shukatsudaison@gmail.com)”。 Apple Books
import asyncio
from pprint import pprint
from typing import Coroutine
async def print_number(n: int) -> None:
print(n)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coroutines: list[Coroutine] = []
for i in range(10):
coroutines.append(print_number(i))
pprint(coroutines)
for c in coroutines:
print(isinstance(c, Coroutine))
loop.run_until_complete(asyncio.wait(coroutines))
loop.close()