🙌

reqwestでリトライをする

に公開

目的

reqwestは本番で利用するにはタイムアウトとリトライは必須です。開発中は問題無く動いたとしても、サーバーの調子が悪くなった時に、正常に動かなくなります。

タイムアウトはreqwestにtimeoutメソッドで指定できるので簡単に設定できます。リトライはcrateを利用するか自前で実装する必要があります。

crates

reqwest-retry

reqwest-middlewareを利用したretryのcrateです。ポリシーを設定した後の使い心地はreqwestと同じです。本家のドキュメントにあるサンプルを引用します。

use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};

async fn run_retries() {
    // Retry up to 3 times with increasing intervals between attempts.
    let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
    let client = ClientBuilder::new(reqwest::Client::new())
        .with(RetryTransientMiddleware::new_with_policy(retry_policy))
        .build();

    client
        .get("https://truelayer.com")
        .header("foo", "bar")
        .send()
        .await
        .unwrap();
}

ただしreqwesest-middlewareのstructで動くので純粋なreqwestのstructではなくなります。reqwestしか想定していないようなAPI Bindingなcrateでは使えません。

ちなみにこの問題を解消すべく本家にmiddlewareの実装のIsuueがあります。

reqwest-builder-retry

自作のリトライのcrateになります。最初reqwest-retryの存在に気が付かなくてこの名前でcargo publishしたら名前がぶつかったエラーが出て慌てて中身を見に行きました。
自分は𝕏、LINE、TikTok、PinterestのAPIライブラリを書いていて、これが使えるリトライを作りたかったのでreqwest-retryでは実現できませんでした。
ただ、middlewareという考え方は素晴らしいので本家がmiddlewareをサポートしたら、それに乗っかっていこうと思っています。

インターフェース

リトライのする関数のインターフェースは以下の通りです。

pub enum RetryType {
    Stop,
    Retry,
    RetryAfter(Duration),
}

pub async fn execute<
    SuccessResponse,
    ErrorResponse,
    MakerBuilder,
    CheckDone,
    JITTER,
    SLEEPER,
    FutCheckDone,
    FutSLEEPER,
>(
    make_builder: MakerBuilder,
    check_done: CheckDone,
    try_count: u8,
    retry_duration: Duration,
    jitter: JITTER,
    sleeper: SLEEPER,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
    MakerBuilder: Fn(u8) -> RequestBuilder,
    CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
    JITTER: Fn() -> Duration,
    SLEEPER: Fn(Duration) -> FutSLEEPER,
    FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,
    FutSLEEPER: Future<Output = ()>,

なんか複雑ですね。すこし簡単にした便利版も作りました。

pub async fn execute<SuccessResponse, ErrorResponse, MakerBuilder, CheckDone, FutCheckDone>(
    make_builder: MakerBuilder,
    check_done: CheckDone,
    try_count: u8,
    retry_duration: Duration,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
    MakerBuilder: Fn(u8) -> RequestBuilder,
    CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
    FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,
  • make_buliderは引数がトライ数、戻り値がRequestBuliderになります。RequestBuliderにcloneがあれば関数にする必要はなかったんですが、try_cloneしか無くって、リトライするたびにRquestBuilderを作り直すことにしました。ちなみにtry_cloneで失敗するのはmultipartの時がそうでした。

  • check_doneはreqwestを実行した結果、正常とするのか、リトライするのか、リトライを中止するのかを決める関数です。APIによってはステータスコードだけでリトライかそうでないかを判断できないものがあるので、呼び出し側に確認してもらうようにしています。エラー時にはRetryTypeを返します。Stopはリトライをしない、Retryはリトライをする。そしてもう一つRetryAfter(Duration)があります。これはAPIのレートリミットなどで停止する時間がAPI側から示された時にその時間を指定します。

  • try_countは最初の実行を含めたAPIの実行回数です。

  • retry_durationは次のAPIを実行するまでの停止時間です。ただし実行回数に応じてExponestial Backoffが発動してもっと長く停止します。

サンプル

𝕏APIを利用した例は以下になります。

Cargo.toml
[package]
name = "x-sample"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = "1"
reqwest-builder-retry = { path = "../..", features = ["rustls-tls", "convenience"], default-features = false }
serde = "1"
serde_json = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-bunyan-formatter = "0.3"
tracing-subscriber = "0.3"
twapi-v2 = { version = "0.20.0", features = ["oauth10a"] }
main.rs
use std::time::Duration;

use reqwest_builder_retry::{
    RetryType,
    convenience::check_status_code,
    reqwest::{Error, Response, StatusCode, header::HeaderMap},
};
use tracing::Level;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::{Registry, filter::Targets, layer::SubscriberExt};
use twapi_v2::{api::get_2_users_me, oauth10a::OAuthAuthentication};

// Tracingの準備
pub fn setup_tracing(name: &str) {
    let formatting_layer = BunyanFormattingLayer::new(name.into(), std::io::stdout);
    let filter = Targets::new()
        .with_target(name, Level::TRACE)
        .with_target("twapi_v2", Level::TRACE);

    let subscriber = Registry::default()
        .with(filter)
        .with(JsonStorageLayer)
        .with(formatting_layer);
    tracing::subscriber::set_global_default(subscriber).unwrap();
}

// エラー時のレスポンスのデータ
#[derive(Debug)]
pub struct ResponseData {
    pub status_code: StatusCode,
    pub body: String,
    pub headers: HeaderMap,
}

// エラー情報、reqwestのエラーかそれ以外
#[derive(Debug)]
pub struct ResponseError {
    pub error: Option<reqwest_builder_retry::reqwest::Error>,
    pub response_data: Option<ResponseData>,
}

// レスポンスのチェック
async fn check_done<T>(
    response: Result<Response, Error>,
    retryable_status_codes: &[StatusCode],
) -> Result<T, (RetryType, ResponseError)>
where
    T: serde::de::DeserializeOwned,
{
    let response = response.map_err(|err| {
        (
            RetryType::Retry,
            ResponseError {
                error: Some(err),
                response_data: None,
            },
        )
    })?;

    let status_code = response.status();
    let headers = response.headers().clone();
    let body = response.text().await.unwrap_or_else(|_| "".to_string());
    let response_data = ResponseData {
        status_code,
        body,
        headers,
    };

    if let Some(retry_type) = check_status_code(status_code, retryable_status_codes).await {
        return Err((
            retry_type,
            ResponseError {
                error: None,
                response_data: Some(response_data),
            },
        ));
    }

    match serde_json::from_str::<T>(&response_data.body) {
        Ok(result) => Ok(result),
        Err(_) => Err((
            RetryType::Retry,
            ResponseError {
                error: None,
                response_data: Some(response_data),
            },
        )),
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    setup_tracing("x_sample");
    tracing::trace!("start");

    let auth = OAuthAuthentication::new(
        std::env::var("CONSUMER_KEY").unwrap_or_default(),
        std::env::var("CONSUMER_SECRET").unwrap_or_default(),
        std::env::var("ACCESS_KEY").unwrap_or_default(),
        std::env::var("ACCESS_SECRET").unwrap_or_default(),
    );

    // スレッドで利用可能化チェック
    let handle = tokio::spawn({
        async move {
            let result = reqwest_builder_retry::convenience::execute(
                |_| {
                    let api = get_2_users_me::Api::open();
                    // APIの実行には必ずタイムアウトをつけましょう
                    let builder = api.build(&auth).timeout(Duration::from_secs(3));
                    // リクエストのログ
                    tracing::trace!(?builder, "api request");
                    builder
                },
                |response| {
                    // レスポンスのログ
                    tracing::trace!(?response, "api response");
                    // レスポンスのチェック
                    check_done::<get_2_users_me::Response>(
                        response,
                        &[StatusCode::TOO_MANY_REQUESTS, StatusCode::FORBIDDEN],
                    )
                },
                3,                      // トライ回数
                Duration::from_secs(2), // リトライ間隔
            )
            .await;
            println!("Result: {:?}", result);
        }
    });

    handle.await?;

    Ok(())
}

まとめ

APIの実行で必要なタイムアウト、リトライができました。またリクエスト、レスポンスのログも出しました。ここまでやってやっと安心して本番で利用できるようになります。

Discussion