🎉
【Python】非同期処理イベントの実装
はじめに
これまで非同期処理が必用な実装はあまり行ってこなかった
そこで、今回は勉強も兼ねて簡単な非同期処理を含むコードを作ってみた
処理の概要は次の「実装する処理のイメージ」を参照
実装する処理のイメージ
-
適当なkey入力で非同期タスクを作成する
非同期タスクにはidをつけて、何番目のキー入力に対応したタスクかわかるようにしておく
また、各タスクの実行時間はランダム(今回は、タスクの部分はランダムなsleepで置き換えた)とした
→ タスクごとに処理負荷と実行時間が異なる状況を想定 -
タスクが完了したらprintで標準出力に完了を知らせるようにする
-
1,2をループ
→ ただし、非同期実行にしたいので、1を実行中でも別のタスクを受け付けられるようにする
当然だが、実行時間がバラバラなので、後に開始したタスクが先に終了することも考慮する -
1~3を処理しつつ、適当なキーでループを抜けて、すべてのタスク完了を待って終了
実装
最終的なコードはこちら
import random
import aioconsole
import asyncio
async def task_random_sleep(idx):
timer = random.uniform(0.1, 2)
await asyncio.sleep(timer)
result = "===== task id {} is awake! =====".format(idx)
return result
async def read_key_input(queue):
while True:
key = await aioconsole.ainput()
await queue.put(key)
if key == 'q':
break
async def watcher_task(tasks, idx, timeout):
new_task = asyncio.create_task(task_random_sleep(idx))
tasks.append(new_task)
try:
result = await asyncio.wait_for(new_task, timeout=timeout)
print(result)
tasks.remove(new_task)
return True
except asyncio.TimeoutError:
print("The task took too long and was cancelled.")
new_task.cancel()
return False
async def main_task_manager(queue):
delay = 0.5
timeout = 600
tasks = []
task_id = 0
while True:
if not queue.empty():
key = await queue.get()
print(f"Received key: {key}")
if key == 'q':
print("Exiting...")
break
if key == 'c':
print("Creaing Task...")
new_watcher_task = asyncio.create_task(watcher_task(tasks, task_id,timeout=timeout))
task_id += 1
await asyncio.sleep(delay)
print("task manager loop")
# すべてのタスク完了を待って終了
if tasks:
done, pending = await asyncio.wait(tasks)
print("Done tasks:")
for task in done:
print(task.result())
print("Pending tasks:")
for task in pending:
print(task)
print("All tasks completed")
async def main():
queue = asyncio.Queue()
input_task = asyncio.create_task(read_key_input(queue))
process_task = asyncio.create_task(main_task_manager(queue))
await asyncio.gather(input_task, process_task)
if __name__=="__main__":
asyncio.run(main())
所感
非同期処理をほぼ初めてゼロから実装したので、最初は思ってないところで実行が止まったりして苦戦した。。。ただ、なんとなく型がわかってくると、そこそこ応用もきくようになったように思う。
自分的にはどこで別タスクに別れて、どこでもとのタスクと合流させるか、を意識すると少し作りやすくなったように感じた
Discussion