🎉

【Python】非同期処理イベントの実装

2024/07/07に公開

はじめに

これまで非同期処理が必用な実装はあまり行ってこなかった
そこで、今回は勉強も兼ねて簡単な非同期処理を含むコードを作ってみた

処理の概要は次の「実装する処理のイメージ」を参照

実装する処理のイメージ

  1. 適当なkey入力で非同期タスクを作成する
    非同期タスクにはidをつけて、何番目のキー入力に対応したタスクかわかるようにしておく
    また、各タスクの実行時間はランダム(今回は、タスクの部分はランダムなsleepで置き換えた)とした
    → タスクごとに処理負荷と実行時間が異なる状況を想定

  2. タスクが完了したらprintで標準出力に完了を知らせるようにする

  3. 1,2をループ
    → ただし、非同期実行にしたいので、1を実行中でも別のタスクを受け付けられるようにする
    当然だが、実行時間がバラバラなので、後に開始したタスクが先に終了することも考慮する

  4. 1~3を処理しつつ、適当なキーでループを抜けて、すべてのタスク完了を待って終了

実装

最終的なコードはこちら

import random
import aioconsole
import asyncio

async def task_random_sleep(idx):
    timer = random.uniform(0.1, 2)
    await asyncio.sleep(timer)
    result = "===== task id {} is awake! =====".format(idx)
    return result

async def read_key_input(queue):
    while True:
        key = await aioconsole.ainput()
        await queue.put(key)
        if key == 'q':
            break

async def watcher_task(tasks, idx, timeout):
    
    new_task = asyncio.create_task(task_random_sleep(idx))
    tasks.append(new_task)
    
    try:
        result = await asyncio.wait_for(new_task, timeout=timeout)
        print(result)
        tasks.remove(new_task)
        return True
        
    except asyncio.TimeoutError:
        print("The task took too long and was cancelled.")
        new_task.cancel()
        return False
        
        

async def main_task_manager(queue):
    
    delay = 0.5
    timeout = 600
    tasks = []
    task_id = 0
    
    while True:        
        
        if not queue.empty():
            key = await queue.get()
            print(f"Received key: {key}")
            if key == 'q':
                print("Exiting...")
                break
            
            if key == 'c':
                print("Creaing Task...")
                new_watcher_task = asyncio.create_task(watcher_task(tasks, task_id,timeout=timeout))
                task_id += 1
                         
        
        await asyncio.sleep(delay) 
        print("task manager loop")

    # すべてのタスク完了を待って終了
    if tasks:
        done, pending = await asyncio.wait(tasks) 
 
        print("Done tasks:")
        for task in done:
            print(task.result())
        
        print("Pending tasks:")
        for task in pending:
            print(task)
    
    print("All tasks completed")



async def main():
    
    queue = asyncio.Queue()

    input_task = asyncio.create_task(read_key_input(queue))
    process_task = asyncio.create_task(main_task_manager(queue))
    
    await asyncio.gather(input_task, process_task)

if __name__=="__main__":  
    asyncio.run(main())

所感

非同期処理をほぼ初めてゼロから実装したので、最初は思ってないところで実行が止まったりして苦戦した。。。ただ、なんとなく型がわかってくると、そこそこ応用もきくようになったように思う。
自分的にはどこで別タスクに別れて、どこでもとのタスクと合流させるか、を意識すると少し作りやすくなったように感じた

Discussion