Open2
asyncioのライブラリの検証
wove:
asyncとsyncの書き方を混在して実装できる
初期の評価スレッド。
forkの検証
woveのforkは、各タスクが別プロセスでうごいて「並列処理 (Parallelism)」を行うものとは思えない。
import time
from wove import weave
import os
import threading
print('start... ', os.getpid())
def my_callback(result):
print(f"Background weave complete! Final result: {result.final}")
# Run in a background thread
with weave(background=True, fork=True, on_done=my_callback) as w:
@w.do
def long_running_task1():
print('start... long_running_task1', os.getpid(), threading.get_ident(), flush=True)
while True:
i = i + 1
if i > 1000000:
break
print('end... long_running_task1', os.getpid(), threading.get_ident(), flush=True)
return "Done!1"
@w.do
def long_running_task2():
print('start... long_running_task2', os.getpid(), threading.get_ident(), flush=True)
# time.sleep(2)
while True:
i = i + 1
if i > 1000000:
break
print('end... long_running_task2', os.getpid(), threading.get_ident(), flush=True)
return "Done!2"
print("Main program continues to run...")
結果:
main. 85052
start... count_primes 900000 85052 123145447804928
start... count_primes 800000 85052 123145431015424
end... count_primes 800000 5.017432355991332 85052 123145431015424
end... count_primes 900000 5.457055067003239 85052 123145447804928
result1=63951, result2=71274
elapsed=5.46s
プロセスIDは変わらず、long_running_task1とlong_running_task2は別スレッドでうごく
aiomultiprocess
マルチプロセスで非同期処理を動かす
サンプル
別プロセスでcount_primesが同時実行することが確認可能
# pip install aiomultiprocess
import asyncio, os, threading, math, time
from aiomultiprocess import Pool
from asyncio import gather, sleep
async def count_primes(limit: int) -> int:
start = time.perf_counter()
print('start... count_primes', limit, os.getpid(), threading.get_ident(), flush=True)
cnt = 0
for n in range(2, limit):
root = int(math.isqrt(n))
for d in range(2, root + 1):
if n % d == 0:
break
else:
cnt += 1
print('end... count_primes', limit, time.perf_counter() - start, os.getpid(), threading.get_ident(), flush=True)
return cnt
async def main():
async with Pool(processes=2, childconcurrency=1) as pool:
result = await gather(
pool.apply(count_primes, args=(800_000,)),
pool.apply(count_primes, args=(800_000,))
)
print(f"result1={result}")
if __name__ == "__main__":
asyncio.run(main())
start... count_primes 800000 51650 140704398269376
start... count_primes 800000 51651 140704398269376
end... count_primes 800000 2.2692830149899237 51650 140704398269376
end... count_primes 800000 2.3335574050142895 51651 140704398269376
result1=[63951, 63951]
別プロセスIDで動いていることが観測できる。