reqwestでリトライをする
目的
reqwestは本番で利用するにはタイムアウトとリトライは必須です。開発中は問題無く動いたとしても、サーバーの調子が悪くなった時に、正常に動かなくなります。
タイムアウトはreqwestにtimeoutメソッドで指定できるので簡単に設定できます。リトライはcrateを利用するか自前で実装する必要があります。
crates
reqwest-retry
reqwest-middlewareを利用したretryのcrateです。ポリシーを設定した後の使い心地はreqwestと同じです。本家のドキュメントにあるサンプルを引用します。
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
async fn run_retries() {
// Retry up to 3 times with increasing intervals between attempts.
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
client
.get("https://truelayer.com")
.header("foo", "bar")
.send()
.await
.unwrap();
}
ただしreqwesest-middlewareのstructで動くので純粋なreqwestのstructではなくなります。reqwestしか想定していないようなAPI Bindingなcrateでは使えません。
ちなみにこの問題を解消すべく本家にmiddlewareの実装のIsuueがあります。
reqwest-builder-retry
自作のリトライのcrateになります。最初reqwest-retryの存在に気が付かなくてこの名前でcargo publishしたら名前がぶつかったエラーが出て慌てて中身を見に行きました。
自分は𝕏、LINE、TikTok、PinterestのAPIライブラリを書いていて、これが使えるリトライを作りたかったのでreqwest-retryでは実現できませんでした。
ただ、middlewareという考え方は素晴らしいので本家がmiddlewareをサポートしたら、それに乗っかっていこうと思っています。
インターフェース
リトライのする関数のインターフェースは以下の通りです。
pub enum RetryType {
Stop,
Retry,
RetryAfter(Duration),
}
pub async fn execute<
SuccessResponse,
ErrorResponse,
MakerBuilder,
CheckDone,
JITTER,
SLEEPER,
FutCheckDone,
FutSLEEPER,
>(
make_builder: MakerBuilder,
check_done: CheckDone,
try_count: u8,
retry_duration: Duration,
jitter: JITTER,
sleeper: SLEEPER,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
MakerBuilder: Fn(u8) -> RequestBuilder,
CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
JITTER: Fn() -> Duration,
SLEEPER: Fn(Duration) -> FutSLEEPER,
FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,
FutSLEEPER: Future<Output = ()>,
なんか複雑ですね。すこし簡単にした便利版も作りました。
pub async fn execute<SuccessResponse, ErrorResponse, MakerBuilder, CheckDone, FutCheckDone>(
make_builder: MakerBuilder,
check_done: CheckDone,
try_count: u8,
retry_duration: Duration,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
MakerBuilder: Fn(u8) -> RequestBuilder,
CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,
-
make_buliderは引数がトライ数、戻り値がRequestBuliderになります。RequestBuliderにcloneがあれば関数にする必要はなかったんですが、try_cloneしか無くって、リトライするたびにRquestBuilderを作り直すことにしました。ちなみにtry_cloneで失敗するのはmultipartの時がそうでした。
-
check_doneはreqwestを実行した結果、正常とするのか、リトライするのか、リトライを中止するのかを決める関数です。APIによってはステータスコードだけでリトライかそうでないかを判断できないものがあるので、呼び出し側に確認してもらうようにしています。エラー時にはRetryTypeを返します。Stopはリトライをしない、Retryはリトライをする。そしてもう一つRetryAfter(Duration)があります。これはAPIのレートリミットなどで停止する時間がAPI側から示された時にその時間を指定します。
-
try_countは最初の実行を含めたAPIの実行回数です。
-
retry_durationは次のAPIを実行するまでの停止時間です。ただし実行回数に応じてExponestial Backoffが発動してもっと長く停止します。
サンプル
𝕏APIを利用した例は以下になります。
[package]
name = "x-sample"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1"
reqwest-builder-retry = { path = "../..", features = ["rustls-tls", "convenience"], default-features = false }
serde = "1"
serde_json = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-bunyan-formatter = "0.3"
tracing-subscriber = "0.3"
twapi-v2 = { version = "0.20.0", features = ["oauth10a"] }
use std::time::Duration;
use reqwest_builder_retry::{
RetryType,
convenience::check_status_code,
reqwest::{Error, Response, StatusCode, header::HeaderMap},
};
use tracing::Level;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::{Registry, filter::Targets, layer::SubscriberExt};
use twapi_v2::{api::get_2_users_me, oauth10a::OAuthAuthentication};
// Tracingの準備
pub fn setup_tracing(name: &str) {
let formatting_layer = BunyanFormattingLayer::new(name.into(), std::io::stdout);
let filter = Targets::new()
.with_target(name, Level::TRACE)
.with_target("twapi_v2", Level::TRACE);
let subscriber = Registry::default()
.with(filter)
.with(JsonStorageLayer)
.with(formatting_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
// エラー時のレスポンスのデータ
#[derive(Debug)]
pub struct ResponseData {
pub status_code: StatusCode,
pub body: String,
pub headers: HeaderMap,
}
// エラー情報、reqwestのエラーかそれ以外
#[derive(Debug)]
pub struct ResponseError {
pub error: Option<reqwest_builder_retry::reqwest::Error>,
pub response_data: Option<ResponseData>,
}
// レスポンスのチェック
async fn check_done<T>(
response: Result<Response, Error>,
retryable_status_codes: &[StatusCode],
) -> Result<T, (RetryType, ResponseError)>
where
T: serde::de::DeserializeOwned,
{
let response = response.map_err(|err| {
(
RetryType::Retry,
ResponseError {
error: Some(err),
response_data: None,
},
)
})?;
let status_code = response.status();
let headers = response.headers().clone();
let body = response.text().await.unwrap_or_else(|_| "".to_string());
let response_data = ResponseData {
status_code,
body,
headers,
};
if let Some(retry_type) = check_status_code(status_code, retryable_status_codes).await {
return Err((
retry_type,
ResponseError {
error: None,
response_data: Some(response_data),
},
));
}
match serde_json::from_str::<T>(&response_data.body) {
Ok(result) => Ok(result),
Err(_) => Err((
RetryType::Retry,
ResponseError {
error: None,
response_data: Some(response_data),
},
)),
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_tracing("x_sample");
tracing::trace!("start");
let auth = OAuthAuthentication::new(
std::env::var("CONSUMER_KEY").unwrap_or_default(),
std::env::var("CONSUMER_SECRET").unwrap_or_default(),
std::env::var("ACCESS_KEY").unwrap_or_default(),
std::env::var("ACCESS_SECRET").unwrap_or_default(),
);
// スレッドで利用可能化チェック
let handle = tokio::spawn({
async move {
let result = reqwest_builder_retry::convenience::execute(
|_| {
let api = get_2_users_me::Api::open();
// APIの実行には必ずタイムアウトをつけましょう
let builder = api.build(&auth).timeout(Duration::from_secs(3));
// リクエストのログ
tracing::trace!(?builder, "api request");
builder
},
|response| {
// レスポンスのログ
tracing::trace!(?response, "api response");
// レスポンスのチェック
check_done::<get_2_users_me::Response>(
response,
&[StatusCode::TOO_MANY_REQUESTS, StatusCode::FORBIDDEN],
)
},
3, // トライ回数
Duration::from_secs(2), // リトライ間隔
)
.await;
println!("Result: {:?}", result);
}
});
handle.await?;
Ok(())
}
まとめ
APIの実行で必要なタイムアウト、リトライができました。またリクエスト、レスポンスのログも出しました。ここまでやってやっと安心して本番で利用できるようになります。
Discussion