ISUCON14 をベンチマーカーの限界を超えて最適化した話
はじめに
2024-12-08 に開催された ISUCON 14 にチーム最上川(@kawaemon, @shun-shobon, @re-taro)で参加しました。今回が ISUCON 初参加でした。本番では悔しくもデータ保持違反[1]で失格でしたが、スコア順位としては 37,127 点で全体 8 / 834 位、学生 2 / 99 位 でした。
また、その後 1 ヶ月程度に渡って開催された感想戦[2]においては、850,573 点で全体 2 位、学生 1 位でした。
この点数は、感想戦ベンチマーカー(インスタンス)のほぼ測定限界となっています。
筆者ローカルでは約 180 万点まで、後述するベンチマーカーの最適化を行うことで約 400 万点まで確認しています。
この記事では、どうやってそこまで点数を出したかやその経緯について書きます。
本戦の話はこちら:
経緯
本戦において、
- トップが 5.8 万点に対して自チームのスコアが 3.7 万と 1.5 倍の差がある
- データ保持違反で失格
と悔しい要因があったのに加えて、終了後の Discord で「運営チーム側では 27 万点まで確認している」とアナウンスがあり、悔しすぎて気が狂いそうになりました。
感想戦なくてもやってたと思いますが、せっかく感想戦があることだし、やるところまでやると決意しました。
方針
作業を始める前に頭の中に大体あった方針です。
- 今回の問題では外部要因が DB しかない
- 以前の問題では DNS 水攻めとかもあったようですが。。
- CPU に厳しい処理がない
- マッチングと付近の椅子取得が若干きついが、なんとでもなる範囲
- DB 剥がしすれば律速要因がネットワーク IO ぐらいしかなくなりそう?
- DB の内容を起動時に全部読み込んで、定期的に bulk insert/update すればデータ保持違反にはならなさそう
- バックエンド API 自体の分割がかなり難しくなるけど、上記の理由で問題なさそう
- 低レイヤが問題になった場合に備えて、一番低レイヤまで触りやすい Rust を使う
- CPU-bound な処理も C 並に早いし
- nginx とかはやってみないと分からん!
時系列
ざっくりとした時系列です。
- 2024-12-08: 本戦
- 2024-12-09: 帰宅 悔しすぎて狂う
- 2024-12-09~11: リファクタ抜きで最適化をするが失敗。コードベースリセットして再トライ。
- 2024-12-13: リファクタを大体終わらせる
- 2024-12-14~17: DB 剥がし途中まで
- 2024-12-17: 31万点で2位
- 2024-12-18: 完全に DB 剥がし完了
- 2024-12-31: async 剥がし
- 2025-01-01: 55万点で1位
- 2025-01-02: 63万点に更新 マッチングアルゴリズムを貪欲のまま改良
- 2025-01-02: ちゃんチームさんに 75 万点で抜かれる
- 2025-01-04: string interning, 自作 http フレームワーク, nginx チューン
- 2025-01-05: ハンガリアン法マッチング実装、ベンチマーカーが壊れ始める
- 2025-01-06: 84万点で1位 スコア目標とリミッター実装
- 2025-01-06: 86万点で抜かれる
- 2025-01-07: 85万点に更新するも抜ききれず
特にちゃんチームさんと抜きつ抜かれつで戦えたのはすごくモチベーションになりました。本当にありがとうございました!
ちゃんチームさんの感想戦の記事も参考になりますのでぜひ。特に nginx の L4/L7 分割は思いつきもしませんでした。
注意
- 掲載しているコードは読みやすさのために多くを省略しており、厳密ではありません。
- 色んなことを同時に試したりしていたので、明確に「これをやったら N% 早くなった」を見せることができません。すみません。読みやすいように、時系列を無視してトピックごとにまとめて示します。
リファクタリング
初期コードから愚直に最適化をやっていくと、コード品質が下がりすぎて身動きが取れなくなってしまうので、まず最適化に耐えられるコードベースを作るところから始めました。
- 型を強くする
pub struct Id<T>(String, PhantomData<fn() -> T>);
pub struct Chair {
- pub id: String,
+ pub id: Id<Chair>,
pub enum RideStatusEnum { Matching, Enroute, ... }
- "CARRYING" => {
+ RideStatusEnum::Carrying => {
let status = crate::get_latest_ride_status(&mut *tx, &ride.id).await?;
- if status != "PICKUP" {
+ if status != RideStatusEnum::Pickup {
- DB 操作の切り出し
DB を剥がす上で、このアプリケーションが DB に対して何を行っているかを整理します。意外と使いまわせるコードや、明らかに無駄な DB 呼び出しなどもここで気づけます。
pub struct Repository { ... }
impl Repository {
pub async fn ride_status_update(
&self,
may_tx: impl Into<Option<&mut Tx>>,
ride_id: &Id<Ride>,
status: RideStatusEnum,
) -> Result<(), Error> {
sqlx::query("INSERT INTO ride_statuses (id, ride_id, status) VALUES (?, ?, ?)")
// ...
}
}
現在時間を知るためだけの DB 呼び出し🤦♂️や
- let retrieved_at: chrono::DateTime<chrono::Utc> =
- sqlx::query_scalar("SELECT CURRENT_TIMESTAMP(6)")
// ...
Ok(axum::Json(AppGetNearbyChairsResponse {
chairs: nearby_chairs,
- retrieved_at: retrieved_at.timestamp(),
+ retrieved_at: Utc::now().timestamp(),
}))
updated_at
を取得するためだけのクエリなど、意外と本番中に気づけなかった無駄が。。
- let result = sqlx::query("UPDATE rides SET evaluation = ? WHERE id = ?")
+ let updated_at = Utc::now();
+ let result = sqlx::query("UPDATE rides SET evaluation = ?, updated_at = ? WHERE id = ?")
// ....
- let Some(ride): Option<Ride> = sqlx::query_as("SELECT * FROM rides WHERE id = ?")
// ...
Ok(axum::Json(AppPostRideEvaluationResponse {
fare,
- completed_at: ride.updated_at.timestamp_millis(),
+ completed_at: updated_at.timestamp_millis(),
}))
クエリ自体の最適化などはなるべく後回しにして、コード品質を上げることに専念しました。
DB 剥がし
結果として全 table に対して行いました。一部を紹介します。
基本的にデータベース構造自体は変更していません。
read
read に関しては、
- 起動時に全レコードを読み込みキャッシュを構築する
- 配列, Queue, HashMap, 二分木などを使い分けて最も適したものを使う
- 動作中は DB を読みに行くことなくキャッシュから答えを得る
-
/api/initialize
呼び出し時(DB 初期化時)にはキャッシュも一緒に破棄して再初期化する
例えば chair_locations
の場合は、総移動距離を答える必要があるので、専用の struct を各 chair ごとに作り管理します。
/// DB にあるデータからキャッシュを構築する
pub(super) async fn init_chair_location_cache(
init: &mut CacheInit,
) -> ChairLocationCache {
// init.locations には chair_locations テーブルのすべての行が入っている
// このキャッシュでは created_at でソートされていて欲しいのでソートする
init.locations.sort_unstable_by_key(|x| x.created_at);
let mut cache: HashMap<Id<Chair>, TrackLocation> = HashMap::new();
for loc in &init.locations {
if let Some(c) = cache.get_mut(&loc.chair_id) {
// 総移動距離とその更新日付を更新する
c.update(loc.coord(), loc.created_at);
} else {
// 初めてのレポートの場合は 0 初期化する
cache.insert(
loc.chair_id.clone(),
TrackLocation::new(loc.coord(), loc.created_at),
);
}
}
Arc::new(ChairLocationCacheInner {
cache: RwLock::new(cache),
})
}
write
書き込みに関しては、キャッシュに対しての更新は即座に行う必要がありますが、read で DB を使わないため、DB に対しての更新を遅延可能です。トランザクション数を減らすことが DB 負荷の削減につながるので、一度にまとめて行いたいです。
ここで考えられるシナリオが大きく二つあり、別々に処理しています。
insert だけで update をしない場合
chair_locations
などは insert するだけで行の更新を行いません。こういう場合はそのまま貯めて一気に書き込むだけで問題ありません。
pub struct SimplyDeferred<D: SimplyDeferrable> {
// go で言う chan
// 内部バッファが必要に応じて伸びることで、tx で絶対に block しない便利なやつ。
// これをキュー替わりに使う。
tx: mpsc::UnboundedSender<D::Insert>,
}
impl<D: SimplyDeferrable> SimplyDeferred<D> {
pub fn new(pool: &Pool<MySql>) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
// ... 一定期間ごとに commit を呼ぶタスクを作る処理 ...
Self { tx }
}
pub fn insert(&self, i: D::Insert) {
self.tx.send(i).unwrap();
}
async fn commit(inserts: &[D::Insert], pool: &Pool<MySql>) {
if inserts.is_empty() {
return;
}
let mut tx = pool.begin().await.unwrap();
// 一気に全部 insert するとパラメータが多すぎて失敗することがあるので
// 500 行ごとに insert する
for i in inserts.chunks(500) {
D::exec_insert(&mut tx, i).await;
}
tx.commit().await.unwrap();
}
}
insert も update もする場合
こちらは少し複雑です。なるべくクエリ数を減らすために、遅延された insert に対する update は送信前にまとめてしまいたいです。
例えば
insert into ride_statuses
(id, app_sent_at, chair_sent_at)
values ('1', null, null);
update ride_statuses
set app_sent_at = '...'
where id = '1';
update ride_statuses
set chair_sent_at = '...'
where id = '1';
は
insert into ride_statuses
(id, app_sent_at, chair_sent_at)
values ('1', '...', '...');
にできますし、実際こういうのが殆どです。(時間的局所性)
こういった insert と update の合成を行えるようにします。
Rust の型の強さがこういう所(関連型: D::Update
とか)で特に生きてきて嬉しいです。
Rust もっと流行れ。
pub struct UpdatableDeferred<D: DeferrableMayUpdated> {
insert_tx: mpsc::UnboundedSender<D::Insert>,
update_tx: mpsc::UnboundedSender<D::Update>,
}
impl<D: DeferrableMayUpdated> UpdatableDeferred<D> {
pub fn new(...) { /* ... 一定期間ごとに commit を呼ぶタスクを作る処理 ... */ }
pub fn insert(&self, i: D::Insert) {
self.insert_tx.send(i).unwrap();
}
pub fn update(&self, u: D::Update) {
self.update_tx.send(u).unwrap();
}
async fn commit(mut inserts: Vec<..>, updates: Vec<..>, pool: &Pool<MySql>) {
// insert と update の合成を行い、
// 合成できた場合は inserts を直接更新し該当 update を無かった事にする
// 合成できなかったものは actual_updates として返す
let actual_updates = D::summarize(&mut inserts, updates);
let mut tx = pool.begin().await.unwrap();
for i in inserts.chunks(500) {
D::exec_insert(&mut tx, i).await;
}
for update in actual_updates {
D::exec_update(&mut tx, &update).await;
}
tx.commit().await.unwrap();
}
}
通知 SSE
仕様を整理すると、
- 接続されたら
- もし未送信の通知があればそれらを送信
- 無ければ最新のライドの通知を送信
- これが無ければ簡単だった
- それ以降は、イベントが起こる毎にその通知を送信
- 基本的には
ride_statuses
に insert するついでに送信すればいい - 但しライドが作成されたときの
MATCHING
イベントだけ送信するタイミングが違う- 乗客にはライドが作成されたとき (≒
ride_statuses
に insert するとき) - 椅子にはマッチングしたとき (≒
rides
にchair_id
をセットする時)- これが無ければ簡単だった
- 乗客にはライドが作成されたとき (≒
- 基本的には
これを満たすために、各椅子とユーザーごとにキューを作り管理します。
pub struct NotificationQueue {
/// 最後に送信した通知
last_sent: Option<NotificationBody>,
/// 未送信通知キュー
queue: VecDeque<NotificationBody>,
/// SSE コネクションが生きている場合のみセットされるチャンネル
tx: Option<Sender<Option<NotificationBody>>>,
}
impl NotificationQueue {
/// 通知が発生したら、送信先に紐づいた struct のこのメソッドを呼ぶ
#[must_use = "true が返却された場合、即座に通知が送信されたので sent_at をセットすること"]
pub fn push(&mut self, b: NotificationBody) -> bool {
// .. ここにデータベースから通知の状態を復元する処理 ..
// .. (上記で説明したデータベース剥がしの副作用) ..
// もし SSE 接続がすでにあったら?
if let Some(tx) = self.tx.as_ref() {
// 即座に送る
let sent = tx.send(Some(b.clone())).is_ok();
if sent {
// 送信できた!
self.last_sent = Some(b);
return true;
}
// 接続が切れていて送信できなかった
self.tx = None;
}
// 次接続された時送るべき通知を登録
self.last_sent = None;
self.queue.push_back(b);
false
}
/// SSE 通知が接続された時、送信済みの通知が出てくるまで、このメソッドから
/// 出てくる通知を送信し続ける
/// None が帰ってきた場合、通知を1個も送ったことがないので何もできない
pub fn get_next(&mut self) -> Option<NotificationEntry> {
// 未送信の通知がある?
if self.queue.is_empty() {
// 未送信の通知がないので、すでに送信済みの通知を送る
let last = self.last_sent.clone();
return last.map(|body| NotificationEntry { sent: true, body });
}
// 未送信の通知を一個取り出して返却する
let e = self.queue.pop_front().unwrap();
self.last_sent = Some(e.clone());
Some(NotificationEntry {
sent: false,
body: e,
})
}
}
ハマったこと
椅子への COMPLETED
の通知をベンチマーカーが受け取る前に nearby-chairs エンドポイントの結果に含まれてしまうと、以下の警告が出てしまいます。
しかし、SSE ではコネクションに書き込んでも、相手にいつ届いたかどうかを知るのが難しいです。TCP レイヤで見ればわかるのだと思いますが、ユーザースペースからそれを知る方法が分かりませんでした。
現状は 20ms 程度の遅延を入れて対処していますが、ベンチ終盤になって色々がパンクしてくるとどうしても出てしまいます。これが SSE じゃなくて WebSocket で、ACK に該当するメッセージを返してくれるようなプロトコルだったら嬉しかった。
(最初は警告メッセージをマッチングの反映が遅いのかと読み取り結構ハマりました。逆でした。)
payment gateway
最適化を進めると、やはりボトルネックになるのはライドの評価時に発生する決済です。
決済のためには決済サーバーにリクエストを送る必要がありますが、どうやら不安定なようです。
// 失敗したらとりあえずリトライ
// FIXME: 社内決済マイクロサービスのインフラに異常が発生していて、同時にたくさんリクエストすると変なことになる可能性あり
クエリ最適化
決済処理が失敗すると自動的にリトライする仕組みになっていますが、そのリトライ毎にユーザーの持つライドをすべて取得し、その個数が決済回数と同じになっているか比較しています。
まずライドの個数にしか興味がないのでクエリを簡単にできます。
- sqlx::query_as("SELECT * FROM rides WHERE user_id = ? ORDER BY created_at ASC")
+ sqlx::query_scalar("SELECT count(*) FROM rides WHERE user_id = ? ORDER BY created_at ASC")
また決済処理中に同じユーザーに対して新たなライドが発生することはありえませんから、リトライ毎に取得する必要もないです。
+ let expected_payments = retrieve_rides_order_by_created_at_asc().await?;
loop {
// ...
- let rides = retrieve_rides_order_by_created_at_asc().await?;
- if rides.len() != payments.len() { return Error; }
+ if expected_payments != payments.len() { return Error; }
}
Idempotency-Key
マニュアルにしれっと記載がある Idempotency-Key を使うとそもそも決済回数比較の必要がなくなります。本番中に気づきたかった。
同時に複数リクエスト(失敗)
負荷が少ないときは大体1発でリクエストが通るのですが、負荷がそれなりに大きくなると、1回の決済を通すのに大体 3.3 リクエスト必要になります。
135万点負荷のときの決済状態 (5秒おき)
一方でマニュアルには
Idempotency-Key
ヘッダを利用して決済の重複を防ぐことができます。... 同じIdempotency-Keyヘッダを持つリクエストが複数回送信されたとしても冪等に処理されます。
とあります。最初から失敗を見越して同時に 4 つぐらいリクエストを送ってしまえば、1回分の時間で済んでお得なのでは。。?
type Param = { idemKey: string, amount: number };
async function mySuperPayment(p: Param) {
// 失敗を見越して同時に 4 つリクエストする
const reqs = [doRequest(p), doRequest(p), doRequest(p), doRequest(p)];
try {
// 4 つのうちどれか成功すれば ok !
await Promise.any(reqs);
} catch {
// 4 つ全部失敗したらリトライ
return mySuperPayment(p);
}
}
と思い実験したところ、どうやら一回でも 200 が帰った後に追加でリクエストが送られるとクリティカルエラー決済サーバーに誤った支払いがリクエストされました (CODE=35): 既に支払い済みです。
となるようです。悲しい。
マッチング
初期実装がかなり酷く、問題の大きな修正ポイントとして設計してあるんだろうなと思いました。
お手軽修正
私のチームが本番中にやったのはこれです。初期実装はインターバルごとに1個のライドしかマッチングしませんが、これを全てに対して行います。
- let Some(ride): Option<Ride> = sqlx::query_as(".. limit 1");
+ let waiting_rides: Vec<Ride> = sqlx::query_as("..");
+ for ride in waiting_rides {
- for _ in 0..10 {
+ for _ in 0..10 {
方針
この世界には座標 0, 0 付近と 300, 300 付近にサービス展開地域が二つあります。
公式ブログより引用
また、スコア計算式によれば、迎車距離のスコア配分がかなり小さく、これを抑えるべきだとわかります。
以上を踏まえて迎車距離を抑えるため、同じ地域内の椅子とライドをマッチングするようにした上で、以下のようなスコア関数を用意しました。
// 小さいスコア = いいマッチング
fn score(chair: &Chair, ride: &Ride) -> i32 {
// 迎車距離
let pickup = chair.coord.distance(ride.pickup);
// 客の移動距離
let distance = ride.pickup.distance(ride.destination);
// 迎車距離にペナルティをつけて、椅子の移動速度も考慮する
return (pickup * 10 + distance) / chair.speed;
}
貪欲法
愚直実装ですがまずまずの結果は出せます。
// 各地域について
for region in regions {
// 地域内の一番待たせているライドから
for ride in region.waiting_rides {
// もっとも評価がいい椅子とマッチングする
let Some(chair) = region.chairs
.iter()
.min_by_key(|chair| score(chair, ride))
else { break };
region.chairs.remove(chair);
repository.make_match(chair, ride);
}
}
二部マッチング(ハンガリアン法)
もちろん貪欲法ではベストを狙えないので、ハンガリアン法というアルゴリズムを用いてベストなマッチングを求めます。実装はpathfindingライブラリのものを用いました。
貪欲と比較すると、スコア平均が大体 20% ぐらいになっています。すごい。。。
貪欲法の計算量が
ライドが長時間マッチングされないとクリティカルエラーとなってしまうので、あまりにも選択されなかったライドは強制的にマッチングする仕組みを仕込んであります。
for region in regions {
// 15 秒以上マッチングされなかった場合緊急扱いする
let urgent_threshold = TimeDelta::seconds(15);
let (normal_rides, urgent_rides) =
仕分け(region.waiting_rides.drain(..), urgent_threshold);
// 先に緊急ライドのマッチングを行う
let (mut rides_left, mut chairs_left, mut matches) =
match_using_hungarian(urgent_rides, region.chairs.drain(..));
// ハンガリアン法では、椅子かライドのどちらかを必ず使い切るまでマッチングが行われる
// この時点で考えられるパターンは
// - 緊急ライドを使い切って椅子が余った → 余った椅子を緊急じゃないライドに割り当てる
// - 緊急ライドが余って椅子を使い切ったか、両方使い切った → 何もできない
if !chairs_left.is_empty() {
// 緊急ライドマッチングで余った椅子を用いて残りのライドのマッチングをする
let (rides_left2, chairs_left2, matches2) =
match_using_hungarian(normal_rides, chairs_left);
// マッチ結果を合成する
matches.extend(matches2);
}
// ... 最終的に余った椅子/ライドは保存して次のマッチング処理で使う ...
for m in matches {
repository.make_match(m.chair, m.ride);
}
}
最適を狙わないマッチング
ハンガリアン法での実装に切り替えたところ、ローカルでは動作するものの、公式ベンチマーカーサーバーで実行するとベンチマーカーがクラッシュして完走できない事態が発生しました。
ベンチマーカーのクラッシュをローカルでどうしても再現できず、クラッシュ原因は不明なままです。
現在ライド距離を短く抑えることで、ライドをなるべく早く完了させ、ベンチマーク中に完了するライド数を最大化することに全力を注ぐスコア関数になっています。
この結果、ユーザーやライド数が爆発的に増えた結果、ベンチマーカーの何かがおかしくなっていると予想し、わざと最適を狙わずライド数を減らす方向にマッチングを改良することにしました。
下記二つを実装したところ、クラッシュすることなくそこそこ安定して完走できるようになりました。
スコア目標
スコアがなるべく小さくなることを目指す実装となっていましたが、あるスコア目標値を外部から設定し、そこになるべく近づけることを目指す実装に切り替えます。
スコア目標を大きくすればするほど、1ライド当たりで走る距離が長くなり時間がかかるようになります。
// 小さいスコア = いいマッチング
fn score_with_target(chair: &Chair, ride: &Ride) -> i32 {
let score = score(chair, ride);
// 目標スコア
let target = /* 環境変数で設定 */;
// 実際のスコアと目標スコアとの距離をスコアとする
return score.abs_diff(target) as i32;
}
マッチング数リミッター
より直接的な方法として、1秒ごとにマッチングできるライド数を制限します。
クリティカルエラーを出さないために、長時間マッチングされなかったライドはリミッターの制限を受けずマッチングされるように実装しました。
nginx
nginx は本当に詳しくなく、今回とても勉強になりました。
解説できるほどまだ自信がないので、ざっくりとしか書けないことをお許しください。。
nginx が今回行う仕事は主に2つあり、静的ファイルの配信と、コネクションの TLS (https) 化です。
静的ファイルの配信については Cache-Control
ヘッダと gzip_static
を使用しました。
TLS が遅い問題に関しては主に以下の二つを行ったところ、ベンチ走行中に CPU 使用率が飽和しなくなりました。
keep_alive
コネクションを貼る数を抑えるために、keep_alive
します。
http {
upstream backend {
server localhost:8080;
keepalive 65536;
keepalive_requests 100000;
}
keepalive_timeout 120s;
keepalive_requests 100000;
}
TLS のセッションをキャッシュします。
http {
server {
ssl_protocols TLSv1.3;
ssl_session_cache shared:SSL:50m;
ssl_session_timeout 5m;
ssl_session_tickets on;
}
}
証明書差し替え
最初から設定してあった Let's Encrypt の証明書ではなく、暗号化方式を高速な楕円曲線暗号に設定したオレオレ証明書に書き換えます。
# 最初からあった証明書
$ openssl x509 -in _.xiv.isucon.net.crt -text -noout
# ...
Signature Algorithm: sha256WithRSAEncryption
Issuer: C=US, O=Let's Encrypt, CN=R10
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (2048 bit)
# ...
# オレオレ証明書
$ openssl x509 -in lightcert.pem -text -noout
# ...
Signature Algorithm: ecdsa-with-SHA256
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
# ...
非同期剥がし
最初はメモリ上データ更新のための排他制御 (Mutex, RwLock など)に非同期処理を行っていました。そのほうがデッドロック等をやってしまった場合のデバッグが簡単だったからです。
実際にデッドロックをやることはありませんでしたが、それでもうっかり長時間ロックを保持しっぱなしにして他の処理を止めてしまっていたことはそこそこあり、かなり役立ちました。
(後から知ったことですが、別ライブラリ(parking_lot)を使えば非同期にしなくても lock のタイムアウトを設定できます。が、精神衛生上 async の方が楽かも。。w)
pub struct MyMutex<T>(tokio::sync::Mutex<T>);
impl<D: Deref> Deref for TrackedRef<D>;
impl<D: DerefMut> DerefMut for TrackedRef<D>;
impl<T> MyMutex<T> {
pub async fn lock(&self) -> impl DerefMut<Target = T> + '_ {
let Ok(lock) = tokio::time::timeout(lock_timeout, self.0.lock()).await else {
// lock を取得するのにめちゃくちゃ時間がかかった!
panic!("lock cannot acquired");
};
TrackedRef(lock, Instant::now())
}
}
impl<T> Drop for TrackedRef<T> {
/// TrackedRef<T> がスコープを抜けて lock が解放される時に呼ばれる
fn drop(&mut self) {
if self.1.elapsed() > warn_threshold {
// 長時間ロックが保持された!
eprintln!("lock held too long\n{backtrace}");
}
}
}
が、更なる高速化を狙った場合これはかなりの無駄です。
なぜ無駄なのか
これは筆者の予測で、実際検証したわけではないのですが。。
async 版 Mutex は lock の度に .await
をします。
プリエンプティブにタスクスイッチする Go 等とは大きく異なり、Rust では基本的には .await
ポイントでのみタスクスイッチする可能性があります。(多分ここで判断している)
今回は、Mutex が保護している対象が単なるデータで、仮にある時点でロックされていたとしてもすぐ解放されるケースが大半です。(HashMap でエントリ検索してるだけ、とか)
それなのにタスクスイッチを行なってしまうと、そのコストの方が高くつき、結果レスポンスの生成に時間がかかるようになっていた。。と思われます。因みにドキュメントにもその旨が記載されています。
これらを Sync 版の Mutex に置き換えたところかなり早くなりました。
before (async mutex)
after (sync mutex)
ここまでやると、全てのリクエストハンドラから await
キーワードが消えます。こんなコードを見たのは正直初めてで面白かったです。
ConcurrentHashMap
リクエストのたびに HashMap の検索が何回も走るので、HashMap の高速化は重要です。今までは RwLock<HashMap<K,V>>
のようにして使っていましたが、HashMap の内部構造を考えると、本当に必要な部分だけ排他制御できた方が効率がいいです。
これを実現する Rust のライブラリは幾つかあるのですが、今回は dashmap を採用しました。
この辺りからオーバーエンジニアリング感がさらに強くなっていきます。が、やれることは全てやります。
String Interning
HashMap のキーは大体何かしらの Id です。この検索の早さが全体の早さに直結します。
いつかやってみたかったというのもあり、String Interning を行いました。半分ロマンです。
今回は新規 String が来た場合はそのまま String::leak し、それへのポインタをキーとしました。長時間動くプログラムでもないので。
pub struct Symbol(&'static str);
impl PartialEq<Symbol> for Symbol {
fn eq(&self, other: &Symbol) -> bool {
// 文字列を見なくても、同じ場所を指す参照かどうかを見ればいい!
std::ptr::eq(self.0, other.0)
}
}
impl Hash for Symbol {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// アドレス(をスクランブルしたもの)をそのままハッシュに使える!
let addr = self.0.as_ptr() as usize as u64;
state.write_u64(scramble(addr));
}
}
impl<'de> serde::Deserialize<'de> for Symbol {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
// JSON デシリアライズを String の追加のコピーなしで行える!(場合が多い)
let s = <Cow<'_, str>>::deserialize(deserializer)?;
Ok(Self::new_from_ref(&s))
}
}
impl Symbol {
pub fn new(s: Cow<'_, str>) -> Self {
static STRING_TABLE: LazyLock<ConcurrentHashSet<&'static str>> =
LazyLock::new(Default::default);
// もし既に確保している文字列ならそれを返す
if let Some(found) = STRING_TABLE.get(s.as_ref()) {
return Symbol(found);
}
// ...細かい排他制御は省略しています...
// まだ確保していない文字列なら、それをテーブルに追加する
let stored: &'static str = String::leak(s.into_owned());
STRING_TABLE.insert(stored);
return Self(stored);
}
}
メリット
- String コピーがほぼ 0 コストになる
-
malloc
してmemcpy
が、文字列ポインタ1つコピーするだけになります
-
- 単純なポインタ比較で同値判定を行える
- ポインタの値をハッシュ値に使える
- そのまま使ってしまうと、アライメントの問題で mod 取った時の値が似通ってしまい実質的にハッシュ衝突を引き起こしたので、いい感じにスクランブルしています
- JSON のデシリアライズ時に割と高い確率で追加の文字列コピーが不要になる
デメリット
- (今回の実装では)メモリリークする
- DoS に対して脆弱になります
- 今回の場合は短い文字列しか来ませんし、最悪ベンチ 1 回持てばいいので問題ありません
- ローカルでスコア 100 万超の負荷かけて使用量 400MB 程度です
http フレームワーク自作
flamegraph を眺めていると、tower
ライブラリ(の malloc)がそこそこの時間を使っていることが分かりました。
これは現在使っている http
フレームワーク axum
の依存ライブラリで、ミドルウェアの提供などに使われています。
ミドルウェアがあると拡張性があり便利ですが、仕様が変わることのない今回の場合ミドルウェアを犠牲にして剥がせそうなものです。どうせ tower
を剥がすなら axum
もついでに剥がしちゃいましょう。
hyper
と http
を直接使い、ISURIDE のためだけの http フレームワーク?を用意します。
主に必要な機能は、ルーティング、Cookie、SSE のみで、基本的に axum の実装を ISURIDE 専用に特殊化して用いました。
ルーティング
ISURIDE 専用ライブラリですので、汎化せず筋肉で実装してしまいます。ハードコードしているため、愚直に実装しても問題になる程遅くなりません。
let (code, body) = {
if let Some(app_path) = uri.path().strip_prefix("/api/app") {
use isuride::app_handlers::*;
match (method, app_path) {
(Method::POST, "/users") => {
let (code, res) = app_post_users(&mut context).await?;
(code, json(res))
}
(Method::POST, "/payment-methods") => {
let code = app_post_payment_methods(&mut context).await?;
(code, empty())
}
// ...
json シリアライザ自作
ここまで詰めると JSON のシリアライズが問題になってきます。(デシリアライズはそこまで問題になりませんでした)
先程と同じく汎用性を捨てて速度を得ます。今回使えそうな制約は、String のエスケープです。
今回の問題で登場するすべての文字列は JSON 上でエスケープシーケンスを使う必要がありません。
// こういうのが発生せず
{ "key": "Hello \n World" }
// こういうのしかない
{ "key": "Hello World" }
エスケープシーケンスの必要性確認や、エスケープシーケンスの挿入をするのにも時間がかかりますので、今回の場合何も考えずにプレースホルダーにコピーすることで早くできそうです。
fn serialize_app_post_users_response(data) -> String {
const JSON_TEMPLATE = [
r#"{"id":"#,
r#","invitation_code":"#,
r#"}"#,
];
// テンプレートを格納するのに必要なサイズ
const JSON_TEMPLATE_SIZE: usize = 26usize;
// あらかじめ必要なバイト数を計算しておくことで realloc を減らす
let mut buf = String::with_capacity(
JSON_TEMPLATE_SIZE
+ data.id.len()
+ data.invitation_code.len()
);
buf.push_str(JSON_TEMPLATE[0]);
buf.push_str(data.id); // エスケープシーケンスが必要ないことは保証済み
buf.push_str(JSON_TEMPLATE[1]);
buf.push_str(data.invitation_code); // エスケープシーケンスが必要ないことは保証済み
buf.push_str(JSON_TEMPLATE[2]);
return buf;
}
これをマクロやトレイトを使って実装したところ、律速要因が memcpy になり、だいぶ限界に近づいたのではないかと思います。
当たり前ですが、現実の業務でこんなことをしたら大きな脆弱性になります。
http ライブラリのバグ
flamegraph をまた眺めていると、HTTP ヘッダのパース時の realloc
呼び出しが append
からだと気づきました。
他の言語でもよくある最適化として、先に使う量を一気に確保しておくことで realloc
を減らすテクニックがあります。Go では make([]int, 0, 32)
とか書きますよね。これが正しく行われていれば、realloc
は発生しないか、するにしても append
からは発生しないはずです。
ライブラリのコードを読んでいくと、hyper
では必要な領域を先に用意する関数 reserve
を呼んでいるにも関わらず、何故か append
で realloc
が起こっていました。
さらに調べると、原因が reserve
のバグで、実際に必要な領域よりも少ない領域を確保してしまっていることがわかり、修正 PR を送り無事マージされました。
このライブラリは Rust の http 実装の多くの場所で使われているので、ISUCON のおかげで Rust の http が少し早くなったと言っても過言ではないかもしれません。ありがとう ISUCON。
結果
ざっくりになりますが、String Interning + http フレームワーク自作 をする前と後を比較します。
スタックが深すぎるので雑に縦に潰してます。
before
after
注目すべきは、一番上がオレンジ (= カーネル)の処理の割合です。カーネル時間の割合が増えていることが読み取れるかと思います。今回カーネルが行っている仕事は主にネットワークデータ転送処理ですので、それが増えることは、本当にざっくりですがいい方向に進んでいると言えます。
サーバー構成
サーバー構成はほぼ最初から最後まで以下の通りでした。ここまで最適化したところ、ベンチマーク走行中にどのサーバーも CPU 使用率が飽和することはありませんでした。
終盤の経緯
80万点以上は本当にベンチマーカーの限界と言った感じで、何回もトライして運が良ければ出るといった感じでした。連続実行するとスコアがどんどん下がっていく傾向もあり、クラッシュもローカルで再現できないため、暗闇の中でパラメータ調整をしてお祈りを繰り返すような感じでした。
最終的には以下のスクリーンショットのベストスコアで、2位フィニッシュとなりました。画像引用元ツイート
ベンチマーカーを早くする
おまけです。ベンチマーカーが限界ならベンチマーカーを最適化してみようというお話。
競技者が採点者であるベンチマーカーを変更する時点で競技として成り立たたないので、エンタメとしてどうぞ。
ここから先は全てローカルマシンを使って実験しています。
- ベンチマーカー: デスクトップ i7-8700K
- バックエンド: ラップトップ i7-13700H
- ネットワーク: Ethernet 1000BASE-T
- OS: Linux
変更前
何も変更しない状態だとこんな感じです。
186万点は出ますが、通知 SSE の章で説明した、大量の ID:~の椅子はすでにライド中です
エラーで pass はできません。
TCP と TIME_WAIT
TCP 通信では 1 IP アドレス毎に最大 65535 ポートまでしか使用できません。しかし今回のベンチマークでは SSE で大量のコネクションが開きっぱなしで占有されます。ユーザーだけで1.5万超えコネクションが保持されますし、結構切断もされます。
TCP では一度コネクションが切断されてしまうとそのポートが暫く使えなります。
Linux ではこれは TIME_WAIT と呼ばれる状態で、60 秒持続します。ベンチマーク時間は 60 秒ですから、一回ベンチマーク中にコネクションが切断されるとそのポートは同じベンチマーク中で二度と使えません。条件によっては再利用されるようですが。。。
ベンチマーク中の TCP 接続数を見るとこのように飽和していることがわかります。飽和するとカーネル時間(赤色)が非常に増えます。多分バインドできるまでリトライしてるのかな。
(もしかして、ベンチマーカーがクラッシュしたり、連続実行するとスコアが下がったりしてたのってこれのせい?)
これを解決するために、接続元/接続先の IP アドレスをそれぞれ分散します。色々方法は調べてみたのですが、あまり情報が出てこず、DNS ラウンドロビンぐらいしか手軽に試せる方法が無い上、今回の問題では競技者側からベンチマーカーの使う DNS サーバーを操作できないので、結局ベンチマーカーの修正が必要です。
今回は、ベンチマーカーコードをいじれるなら一番単純明快な、ソフトウェア側で接続先 IP をラウンドロビンする実装とします。
割り当て IP を増やす
1 つのインターフェースに複数の IP アドレスを割り当てます。これをベンチマーカーとバックエンドサーバーの両方に行います。
$ ip addr add 192.168.0.102/24 dev eno1
$ ip addr add 192.168.0.103/24 dev eno1
$ ip a
1: eno1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
inet 192.168.0.101/24 metric 1024 brd 192.168.0.255 scope global dynamic eno1
valid_lft 6716sec preferred_lft 6716sec
inet 192.168.0.102/24 scope global secondary eno1
valid_lft forever preferred_lft forever
inet 192.168.0.103/24 scope global secondary eno1
valid_lft forever preferred_lft forever
リクエスト元/先の IP をラウンドロビンする
接続元/先の IP をラウンドロビンします。net/http
の DialContext
をいじることで可能です。
type roundRobin[T any] struct {
data []T
counter uint64
}
func newRR[T any](s []T) roundRobin[T] {
return roundRobin[T]{data: s, counter: 0}
}
func (r *roundRobin[T]) next() T {
idx := atomic.AddUint64(&r.counter, 1) % uint64(len(r.data))
return r.data[idx]
}
var (
local = newRR([]net.IP{ net.ParseIP("192.168.0.102"), ... }),
dest = newRR([]string{ "192.168.0.249:8080", ... }),
)
transport.DialContext = func(ctx context.Context, network, _ string) (net.Conn, error) {
local := net.TCPAddr{ IP: local.next(), Port: 0 },
dialer := net.Dialer{ LocalAddr: &local }
return dialer.DialContext(ctx, network, dest.next())
}
結果
無事6万コネクション以上を保持できました。
プロファイリング
現状ベンチマーカーのCPU使用率が異常に高いので、何が重いのか見てみます。
とりあえず、pprof を刺して cpuprofile を見てみます。
一番左端の ServeHTTP は決済処理を請け負っている部分です。memeqbody
(string比較するランタイム関数らしい)が見え、改善の余地がありそうです。
Agent.Do
などは、http リクエストを送っている部分で、ここはライブラリの変更(fasthttpとか)とかをしないと一筋縄では行かなそうです。
net/http
は内部で channel を多用した並行処理を行っており、これが大きなオーバーヘッドとなっているようです。詳しくはよく分かりませんが。。。
PostPaymentsHandler
決済処理を行っている部分ですが、やけに IO 以外の処理時間が長いです。
pprof によれば、大きなボトルネックが2つあります。(こことここ)
L72-76 の for
これは直近3秒で処理した決済を数え上げる処理です。ベンチ終盤になるととんでもない数の決済が走りますので、ループ数が増えて問題になります。
var recentProcessedCount int
+ now := time.Now()
for _, processed := range s.processedPayments.BackwardIter() {
+ // time.Since は内部で time.Now() を呼び
+ // for ループごとに現在時刻の取得をしてしまい遅いので
+ // Now を一回だけ取得して使いまわす
- if time.Since(processed.processedAt) > 3*time.Second {
+ if now.Sub(processed.processedAt) > 3*time.Second {
break
}
recentProcessedCount++
+ // この下の percentage の処理を見ると、50以上数える必要がない
+ if recentProcessedCount >= 50 {
+ break
+ }
}
failurePercentage := recentProcessedCount
- if failurePercentage > 50 {
- failurePercentage = 50
- }
L90-95 の for
これは過去に同じ Idempotency-Key
を持つ決済処理が完了したかどうかを確認しています。ただの線形探索ですので明らかに遅いです。set を使っちゃいましょう。
alreadyProcessed := false
- if !newPayment {
- for _, processed := range s.processedPayments.ToSlice() {
- if processed.payment.IdempotencyKey == p.IdempotencyKey {
- alreadyProcessed = true
- break
- }
- }
+ if !newPayment && len(p.IdempotencyKey) > 0 {
+ alreadyProcessed = s.processedPaymentsByIdemKey.Has(p.IdempotencyKey)
+ }
if !alreadyProcessed {
s.processedPayments.Append(&processedPayment{payment: p, processedAt: time.Now()})
+ if len(p.IdempotencyKey) > 0 {
+ s.processedPaymentsByIdemKey.Add(p.IdempotencyKey)
+ }
runtime.newobject
Go がヒープにデータを確保するときに呼ぶ関数です。今回はこいつが全体の 18% も時間を使っていたので、なるべく減らします。こいつを減らすと、gc の負荷も減ってお得です。
json.Unmarshal と sync.Pool
Go では json.Unmarshal
時に any
(= interface{}
) を介すため自動的にデコード対象変数がヒープに逃されるようですが、特に通知は凄まじい勢いで送るのでこのヒープアロケーションコストが大きいです。これを sync.Pool
を使い、一度確保した struct を使いまわすことで早くします。
スタック変数に対して Unmarshal できればこんなことしなくて良いのですが。。
+ var appNotificationPool = sync.Pool{
+ New: func() any {
+ return new(AppNotificationData)
+ },
+ }
// ...
for scanner.Scan() {
- request := &AppNotificationData{}
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
+ request := appNotificationPool.Get().(*AppNotificationData)
jsonText := []byte(line[len("data:"):]
err := json.Unmarshal(jsonText), &request)
+ // デコードしたデータをスタックにコピーしてから pool に返却する
+ res := *request
+ appNotificationPool.Put(request)
if !yield(res, err) {
return
}
}
}
BackwardIter
スライスの末尾の要素を取ってくるためだけに、最近追加されたイテレータを使っている箇所があります。イテレータは便利ですが、クロージャを多用するため今回の用途ではあまりにもオーバーヘッドが大きいです。
+ func (s *SimpleSlice[V]) Last() (value V, ok bool) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+ if len(s.s) == 0 { return }
+ return s.s[len(s.s)-1], true
+ }
- for _, r := range chair.RequestHistory.BackwardIter() {
+ if r, hasLast := chair.RequestHistory.Last(); hasLast {
req = r
// baseTimeよりも3秒前以降に完了状態に遷移している場合は、含まれていなくても許容する
if r.ServerCompletedAt.After(baseTime.Add(-3 * time.Second)) {
ok = true
}
- break
}
結果
その他無駄にポインタが引き回されているところなども修正し、flamegraph はこのようになりました。
http ライブラリを載せ替えられればおそらく他のボトルネックも見えて面白そうなのですが、一旦ここまでとしておきます。
気になるベストスコアは、、、
うひょ〜〜〜
※ベンチマーカーを、ロジックは変えないように注意はしていますが、改造しているのは事実ですので、参考にもならないスコアとなります。
381万点が出た時の録画がありますので気になる方はどうぞ。
TCPコネクション数 6 万とか 13 万とか初めて見ました。。
バックエンド側の CPU 使用率を見ると、ピークでも 300% とかなの、Rustの凄さを感じます。
おわりに
ISUCON 運営様、および一緒に戦ってくださった他の参加者の皆様、ありがとうございました!
個人的には、本戦の短い時間でどう早くするかという緊張感も好きでしたが、やはり感想戦でじっくり考えながらやるのがすごく楽しかったです。
非常に楽しめましたし、多くの学びがありました。
本番最終スコアのほぼ 100 倍に当たる 400 万点を出せて私は満足です。
-
負荷走行によって作成されたデータが再起動後に取得できなかったことを意味 ↩︎
-
本番終了後も競技サーバーが開かれたままにされました。本当に感謝。総競技参加者は不明です。が、個人的にブログに書かれたスコア と 2024-01-12 早朝時点での最新スコアを比較してみたところ、実に 740 チームに差がありました。そのうち 81 チームは感想戦からの参加でした。本当に適当計測なので信憑性はありませんが、もし合っているとすれば、結構な数参加されていてびっくりです。 ↩︎
Discussion