Open9

Pythonでの並行処理

concurrent.futures.Futureは直接インスタンス化しないこと! 必ず、concurrent.futures.Executorを利用する

https://docs.python.org/ja/3/library/concurrent.futures.html#concurrent.futures.Future

呼び出し可能オブジェクトの非同期実行をカプセル化します。 Future インスタンスは Executor.submit() で生成され、テストを除いて直接生成すべきではありません。

class Future(object):
    """Represents the result of an asynchronous computation."""

    def __init__(self):
        """Initializes the future. Should not be called by clients."""
        self._condition = threading.Condition()

future.result(): 引数なしなら、処理の完了を待つ

import time
from concurrent.futures import ThreadPoolExecutor


def fetch_html(url: str) -> str:
    """非同期で行いたい処理"""
    time.sleep(3)  # 処理時間の待機時間を表現
    return url


if __name__ == '__main__':
    executor = ThreadPoolExecutor()

    # 未来に行う(≒実行が先送りされる)からFutureクラスっぽい
    future = executor.submit(fetch_html, url='https://exmaple.com')

    print(f'{future=}')  # <Future at 0x1012f2910 state=running>

    print(f'{future.result()=}')  # 'https://exmaple.com'

future.result(timeout): 一定時間待って完了しないならconcurrent.futures.TimeoutError例外送出

import time
from concurrent.futures import ThreadPoolExecutor


def fetch_html(url: str) -> str:
    """非同期で行いたい処理"""
    time.sleep(3)  # 処理時間の待機時間を表現
    return url


if __name__ == '__main__':
    executor = ThreadPoolExecutor()

    # 未来に行う(≒実行が先送りされる)からFutureクラスっぽい
    future = executor.submit(fetch_html, url='https://exmaple.com')

    print(f'{future.result(timeout=1)}')
Traceback (most recent call last):
  File "/Users/mohira/PycharmProjects/concurrency-samples/10.1/1.py", line 16, in <module>
    print(f'{future.result(timeout=1)}')
  File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 447, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

`futre

import concurrent.futures
import time
from concurrent.futures import ThreadPoolExecutor


def fetch_html(url: str) -> str:
    """非同期で行いたい処理"""
    time.sleep(3)  # 処理時間の待機時間を表現
    return url


if __name__ == '__main__':
    executor = ThreadPoolExecutor()
    future = executor.submit(fetch_html, url='https://exmaple.com')

    try:
        print(f'{future.result(timeout=1)}')
    except concurrent.futures.TimeoutError:
        print('timeout')
        print(future.running())  # True
        print(future.done())  # False
        print(future.cancelled())  # False

でも、待機している

cancelってどうやるん? 実行前ならおkってどういうこと?

逐次処理 vs マルチスレッド

import time
from concurrent.futures import as_completed, ThreadPoolExecutor
from typing import Callable

import requests


def fetch(url: str) -> str:
    response = requests.get(url)

    time.sleep(1)

    return f'{url}: {response.status_code} {response.reason}'


def elapsed_time(f: Callable):
    def wrapper(*args, **kwargs):
        start = time.time()
        v = f(*args, **kwargs)
        elapsed = time.time() - start

        print(f'{f.__name__}: {elapsed}s')

        return v

    return wrapper


@elapsed_time
def get_sequential(urls: list[str]) -> None:
    for url in urls:
        print(fetch(url))


@elapsed_time
def get_multi_thread(urls: list[str]) -> None:
    with ThreadPoolExecutor(max_workers=5) as e:
        futures = [e.submit(fetch, url) for url in urls]

        for f in as_completed(futures):
            print(f.result())


def main():
    urls = [
        'https://example.com',
        'https://python.org',
        'https://google.com',
    ]

    get_sequential(urls)
    get_multi_thread(urls)


if __name__ == '__main__':
    main()
https://example.com: 200 OK
https://python.org: 200 OK
https://google.com: 200 OK
get_sequential: 4.676729917526245s

https://google.com: 200 OK
https://example.com: 200 OK
https://python.org: 200 OK
get_multi_thread: 1.738647222518921s

Profileをみればわかる

「並行性は性質」の意見みつけた

“並列処理の同義語だと考えている人は驚くかもしれませんが、並行性と並列性は同じものではありません。並行性はアプリケーションの実装に関するものではなく、プログラムやアルゴリズム、問題が持つ性質です。そして並列処理は、並行問題に対するアプローチの1つにすぎません。”

抜粋:: Michal Jaworski、Tarek Ziade 著 稲田直哉、芝田 将、渋川よしき、清水川貴之、森本哲也 翻訳 “エキスパートPythonプログラミング 改訂2版(for shukatsudaison@gmail.com)”。 Apple Books

import asyncio
from pprint import pprint
from typing import Coroutine


async def print_number(n: int) -> None:
    print(n)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    coroutines: list[Coroutine] = []
    for i in range(10):
        coroutines.append(print_number(i))

    pprint(coroutines)

    for c in coroutines:
        print(isinstance(c, Coroutine))

    loop.run_until_complete(asyncio.wait(coroutines))

    loop.close()

ログインするとコメントできます