Open2

asyncioのライブラリの検証

mima_itamima_ita

wove:

https://github.com/curvedinf/wove/

asyncとsyncの書き方を混在して実装できる

初期の評価スレッド。
https://www.reddit.com/r/Python/comments/1mv4tyb/wove_beautiful_python_async/

forkの検証

woveのforkは、各タスクが別プロセスでうごいて「並列処理 (Parallelism)」を行うものとは思えない。

import time
from wove import weave
import os
import threading

print('start... ', os.getpid())
def my_callback(result):
    print(f"Background weave complete! Final result: {result.final}")

# Run in a background thread
with weave(background=True, fork=True, on_done=my_callback) as w:
    @w.do
    def long_running_task1():
        print('start... long_running_task1', os.getpid(), threading.get_ident(), flush=True)
        while True:
            i = i + 1
            if i > 1000000:
                break
        print('end... long_running_task1', os.getpid(), threading.get_ident(), flush=True)
        return "Done!1"
    @w.do
    def long_running_task2():
        print('start... long_running_task2', os.getpid(), threading.get_ident(), flush=True)
        # time.sleep(2)
        while True:
            i = i + 1
            if i > 1000000:
                break
        print('end... long_running_task2', os.getpid(), threading.get_ident(), flush=True)
        return "Done!2"

print("Main program continues to run...")

結果:

main. 85052
start... count_primes 900000 85052 123145447804928
start... count_primes 800000 85052 123145431015424
end... count_primes 800000 5.017432355991332 85052 123145431015424
end... count_primes 900000 5.457055067003239 85052 123145447804928
result1=63951, result2=71274
elapsed=5.46s

プロセスIDは変わらず、long_running_task1とlong_running_task2は別スレッドでうごく

mima_itamima_ita

aiomultiprocess

https://github.com/omnilib/aiomultiprocess/tree/main
マルチプロセスで非同期処理を動かす

サンプル

別プロセスでcount_primesが同時実行することが確認可能

# pip install aiomultiprocess
import asyncio, os, threading, math, time
from aiomultiprocess import Pool
from asyncio import gather, sleep

async def count_primes(limit: int) -> int:
    start = time.perf_counter()
    print('start... count_primes', limit, os.getpid(), threading.get_ident(), flush=True)
    cnt = 0
    for n in range(2, limit):
        root = int(math.isqrt(n))
        for d in range(2, root + 1):
            if n % d == 0:
                break
        else:
            cnt += 1
    print('end... count_primes', limit, time.perf_counter() - start, os.getpid(), threading.get_ident(), flush=True)
    return cnt

async def main():
    async with Pool(processes=2, childconcurrency=1) as pool:
        result = await gather(
            pool.apply(count_primes, args=(800_000,)),
            pool.apply(count_primes, args=(800_000,))
        )
        print(f"result1={result}")

if __name__ == "__main__":
    asyncio.run(main())
start... count_primes 800000 51650 140704398269376
start... count_primes 800000 51651 140704398269376
end... count_primes 800000 2.2692830149899237 51650 140704398269376
end... count_primes 800000 2.3335574050142895 51651 140704398269376
result1=[63951, 63951]

別プロセスIDで動いていることが観測できる。