🐍

Python でキューを使ってリトライ処理する

2024/10/14に公開

はじめに

指定された回数だけ処理を繰り返し、成功した場合はそのまま処理を続行し、規定回数内に成功しない場合はエラーを出力するような実装が必要なことがありました。これは一般的に『リトライ処理』と呼ばれるものです。特にキューを使った方法をまた使う機会ありそうなので、ここにメモとして残しておきます。

検証環境

% python --version
Python 3.10.13

% pip show tenacity | grep "Version"
Version: 9.0.0

検証準備

複数のタスクを処理し、それぞれのタスクが成功または失敗するかをランダムに決定するシンプルなタスク処理シミュレーションです

import time
import random

# 複数の文字列セットを持つタスク
tasks = [
    ('task1', 'step1', 'parameterA'),
    ('task2', 'step2', 'parameterB'),
    ('task3', 'step3', 'parameterC')
]

# タスクを処理する関数
def process_task(task):
    task_name, step, parameter = task
    print(f"Processing {task_name}: {step}, {parameter}")
    # ランダムに成功または失敗
    return random.choice([True, False])

リトライ処理①

for-else構文を使ってリトライ処理する方法

複数のタスクを処理し、各タスクが失敗した場合に最大で3回(デフォルト値)までリトライを行うプロセスを実現しています。サンプルタスクはランダムに成功するか失敗するかが決定され、リトライの仕組みを備えています。

import time
import random

# 複数の文字列セットを持つタスク
tasks = [
    ('task1', 'step1', 'parameterA'),
    ('task2', 'step2', 'parameterB'),
    ('task3', 'step3', 'parameterC')
]

# タスクを処理する関数
def process_task(task):
    task_name, step, parameter = task
    print(f"Processing {task_name}: {step}, {parameter}")
    # ランダムに成功または失敗
    return random.choice([True, False])

# タスクを再試行するメインループ
def execute_tasks(tasks, retry_limit=3):
    for task in tasks:
        retries = 0
        while retries < retry_limit:
            if process_task(task):
                print(f"Task {task[0]} succeeded.")
                break
            else:
                retries += 1
                print(f"Task {task[0]} failed. Retrying... ({retries}/{retry_limit})")
                time.sleep(2)  # 再試行前に少し待つ
        else:
            print(f"Task {task[0]} reached retry limit and failed.")

# タスクを実行
execute_tasks(tasks)

実行結果1

Processing task1: step1, parameterA
Task task1 failed. Retrying... (1/3)
Processing task1: step1, parameterA
Task task1 succeeded.
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Task task3 failed. Retrying... (1/3)
Processing task3: step3, parameterC
Task task3 failed. Retrying... (2/3)
Processing task3: step3, parameterC
Task task3 succeeded.

実行結果2

Processing task1: step1, parameterA
Task task1 failed. Retrying... (1/3)
Processing task1: step1, parameterA
Task task1 failed. Retrying... (2/3)
Processing task1: step1, parameterA
Task task1 failed. Retrying... (3/3)
Task task1 reached retry limit and failed.
Processing task2: step2, parameterB
Task task2 failed. Retrying... (1/3)
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Task task3 succeeded.

リトライ処理②

tenacityで同様の実装

tenacity ライブラリを使ってタスクのリトライ処理を行っています。サンプルタスクがランダムに成功または失敗し、失敗した場合は最大3回まで再試行されます。

https://github.com/jd/tenacity
Pythonでリトライ処理を簡単に導入するためのライブラリ

pip install tenacity
from tenacity import retry, stop_after_attempt, wait_fixed
import random

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def process_task(task):
    task_name, step, parameter = task
    print(f"Processing {task_name}: {step}, {parameter}")
    # ランダムに成功(True)または失敗(False)
    if not random.choice([True, False]):
        raise Exception(f"Task {task_name} failed.")
    print(f"Task {task_name} succeeded.")

tasks = [
    ('task1', 'step1', 'parameterA'),
    ('task2', 'step2', 'parameterB'),
    ('task3', 'step3', 'parameterC')
]

for task in tasks:
    try:
        process_task(task)
    except Exception as e:
        print(e)

実行結果1

Processing task1: step1, parameterA
Task task1 succeeded.
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Task task3 succeeded.

実行結果2

Processing task1: step1, parameterA
Processing task1: step1, parameterA
Processing task1: step1, parameterA
RetryError[<Future at 0x102ccf520 state=finished raised Exception>]
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Processing task3: step3, parameterC
Processing task3: step3, parameterC
RetryError[<Future at 0x1026278b0 state=finished raised Exception>]

リトライ処理③

キューの使ってリトライ処理する方法

タスクの実行とリトライ処理を管理しています。Queue を使って失敗したタスクを管理し、一定の回数までリトライを行います。サンプルタスクが成功するか失敗するかはランダムに決まり、失敗した場合は再度試行されます。

タスク 処理 キュー
A,B,C,D 処理開始
B,C,D タスクA成功
C,D タスクB失敗 B
D タスクC失敗 B,C
タスクD成功 B,C
リトライ処理1回目:タスクB失敗 B,C
リトライ処理1回目:タスクC失敗 B,C
リトライ処理2回目:タスクB成功 C
リトライ処理2回目:タスクC失敗 C
リトライ処理3回目:タスクC失敗

下記の実装だと、失敗したタスクを一旦キューに格納し、全てのタスクが終わった段階でキューに格納順に再度タスクを実行する処理を行う。この様な方法だと、一旦成功するタスクを全て処理して、失敗するタスクは後回しにできる。

import time
import random
from queue import Queue

# 複数の文字列セットを持つタスク
tasks = [
    ('task1', 'step1', 'parameterA'),
    ('task2', 'step2', 'parameterB'),
    ('task3', 'step3', 'parameterC')
]

# タスクを処理する関数
def process_task(task):
    task_name, step, parameter = task
    print(f"Processing {task_name}: {step}, {parameter}")
    # ランダムに成功(True)または失敗(False)
    return random.choice([True, False])

# 失敗したタスクをキューに入れて再試行
def retry_failed_tasks(failed_tasks, retry_limit=3):
    while not failed_tasks.empty():
        task = failed_tasks.get()
        retries = 0
        while retries < retry_limit:
            if process_task(task):
                print(f"Task {task[0]} succeeded on retry.")
                break
            else:
                retries += 1
                print(f"Task {task[0]} failed. Retrying... ({retries}/{retry_limit})")
                time.sleep(2)  # 再試行前に待機
        else:
            print(f"Task {task[0]} reached retry limit and failed.")

# メイン処理
def execute_tasks(tasks):
    failed_tasks = Queue()  # 失敗したタスクを管理するキュー

    # まず成功したタスクを先に処理し、失敗したものはキューに入れる
    for task in tasks:
        if process_task(task):
            print(f"Task {task[0]} succeeded.")
        else:
            print(f"Task {task[0]} failed. Adding to retry queue.")
            failed_tasks.put(task)
    
    # 失敗したタスクを再試行
    print("\nRetrying failed tasks...")
    retry_failed_tasks(failed_tasks)

# タスクを実行
execute_tasks(tasks)

実行結果1

Processing task1: step1, parameterA
Task task1 failed. Adding to retry queue.
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Task task3 succeeded.

Retrying failed tasks...
Processing task1: step1, parameterA
Task task1 failed. Retrying... (1/3)
Processing task1: step1, parameterA
Task task1 failed. Retrying... (2/3)
Processing task1: step1, parameterA
Task task1 succeeded on retry.

実行結果2

Processing task1: step1, parameterA
Task task1 failed. Adding to retry queue.
Processing task2: step2, parameterB
Task task2 succeeded.
Processing task3: step3, parameterC
Task task3 failed. Adding to retry queue.

Retrying failed tasks...
Processing task1: step1, parameterA
Task task1 failed. Retrying... (1/3)
Processing task1: step1, parameterA
Task task1 succeeded on retry.
Processing task3: step3, parameterC
Task task3 failed. Retrying... (1/3)
Processing task3: step3, parameterC
Task task3 failed. Retrying... (2/3)
Processing task3: step3, parameterC
Task task3 succeeded on retry.

おわりに

失敗タスクのリトライ処理を後回しにしたい場合は、Queueが使えるかも。

参考

https://dev.classmethod.jp/articles/try-tenacity/
https://qiita.com/pm00/items/105aa33339bcd5147c51
https://zenn.dev/taroman_zenn/articles/dd0b33a3a37d1e

Discussion