🙌

Rustでタイムアウト付きリトライ

2024/03/12に公開

目的

外部APIを呼んでいると、駄目になることがあって、ちょっと待ってリトライするとうまくいくことがあります。
他にも呼び出したまま返ってこないなんてこともあります。
その両方をどうにかする関数を書いてみました。

コード

インターフェースはこのようになります。

pub async fn execute_retry<T, E, Fut>(
    max_try_count: u64,
    retry_duration: Duration,
    timeout_duration: Duration,
    inner: impl Fn(u64) -> Fut,
) -> RetryResult<T, E>
where
    Fut: std::future::Future<Output = Result<T, E>>,
{
  • max_try_count : 最大実行回数
  • retry_duration : リトライする時に待つ時間
  • timeout_duration : タイムアウトの時間
  • inner : 実際に行う処理

戻り値は以下のようになります。

pub struct RetryResult<T, E> {
    pub success: Option<T>,
    pub errors: Vec<E>,
    pub timeout_count: u64,
}
  • success : 実行結果。全て失敗ならNone
  • errors : 失敗したエラーの配列
  • timeout_count : タイムアウトを起こした数

実際のコードです。なんちゃってExponential backoffも実装しています。

use std::time::Duration;
use tokio::time::timeout;

pub struct RetryResult<T, E> {
    pub success: Option<T>,
    pub errors: Vec<E>,
    pub timeout_count: u64,
}

pub async fn execute_retry<T, E, Fut>(
    max_try_count: u64,
    retry_duration: Duration,
    timeout_duration: Duration,
    inner: impl Fn(u64) -> Fut,
) -> RetryResult<T, E>
where
    Fut: std::future::Future<Output = Result<T, E>>,
{
    execute_retry_with_exponential_backoff(
        max_try_count,
        retry_duration,
        timeout_duration,
        inner,
        false,
    )
    .await
}

pub async fn execute_retry_with_exponential_backoff<T, E, Fut>(
    max_try_count: u64,
    retry_duration: Duration,
    timeout_duration: Duration,
    inner: impl Fn(u64) -> Fut,
    exponential_backoff: bool,
) -> RetryResult<T, E>
where
    Fut: std::future::Future<Output = Result<T, E>>,
{
    let mut try_count = 0;
    let mut timeout_count = 0;
    let mut errors = vec![];
    loop {
        try_count += 1;
        if timeout_duration.is_zero() {
            match inner(try_count).await {
                Ok(res) => {
                    return RetryResult {
                        success: Some(res),
                        errors,
                        timeout_count,
                    }
                }
                Err(err) => {
                    errors.push(err);
                }
            }
        } else {
            match timeout(timeout_duration, inner(try_count)).await {
                Ok(res) => match res {
                    Ok(res) => {
                        return RetryResult {
                            success: Some(res),
                            errors,
                            timeout_count,
                        }
                    }
                    Err(err) => {
                        errors.push(err);
                    }
                },
                Err(_) => {
                    timeout_count += 1;
                }
            }
        }
        if try_count >= max_try_count {
            return RetryResult {
                success: None,
                errors,
                timeout_count,
            };
        }
        if !retry_duration.is_zero() {
            let duration = if exponential_backoff {
                retry_duration.mul_f64(2_i32.pow(try_count as u32) as f64)
            } else {
                retry_duration
            };
            tokio::time::sleep(duration).await;
        }
    }
}

呼び出し方は以下のようになります。

#[cfg(test)]
mod tests {
    use std::vec;
    use tokio::time::sleep;
    use super::*;
    async fn inner_success() -> Result<usize, String> {
        Ok(1)
    }

    #[tokio::test]
    async fn test_retry() -> anyhow::Result<()> {
        // Success
        let res = execute_retry(
            3,
            Duration::from_secs(0),
            Duration::from_secs(0),
            |_n| async { inner_success().await },
        )
        .await;
        assert_eq!(res.success, Some(1));
        assert_eq!(res.errors.len(), 0);
        assert_eq!(res.timeout_count, 0);
    }
}

まとめ

今回のコードはcrate resident-utilsに入れました。

Discussion