Async Rustの読書メモ

ch03: Building Our Own Async Queues
メモ: https://github.com/ose20/async-rust/tree/main/ch03/examples
難易度: ★★
概要
以下の3つのクレートに頼った上で、簡単な非同期ランタイムを作成する(標準ライブラリだけを使って自作するのは10章らしい。楽しみだね)。
- async-task
- futures-lite
- flume
最も単純な実装から、少しずつそのモチベーションを共有しながら機能を追加していく構成になっていて楽しい。
構成
- 最も基本的な実装
- タスクを管理するキューもそれを実行するスレッドも1つ
- タスクを実行するスレッドを複数に増やす
- タスクを管理するキューを複数に増やす
- 実行したいタスクに優先度などがある場合、キューによって区別できるようになる
- タスクを他のキューから奪う
- 片方のキューでタスクが空な時に、別のキューからタスクを取ってこれる
- spawn関数のリファクタ
- マクロを使ってデフォルト引数のようなものを実現する
- 初期化処理
-
LazyLock
で実装しているタスクキューを起動するための初期化処理
-
- バックグラウンドで常に走らせ続けるタスクの実装
- 例えばログローテートとかで常に非同期ランタイム上で走らせておきたいタスクの実現

ch04 Integrating Networking into Our Own Async Runtime
メモ: https://github.com/ose20/async-rust/tree/main/ch04/examples
難易度: ★★★★
概要
3章で作成したAsync Runtimeを使って、ネット越しの通信を実現する。主に2つのパートに分かれており、前半はhyperをラップしてHTTPクライアントをAsync Runtime上で動かす。後半OSのasync部分を最低限ラップしたmioを使い、localhost上にサーバーを立ててリクエストの送受信を行う。
構成詳細
前半(hyperをラップ)
hyper
クレートのClient
やRequest<T>
、Response<T>
といった、hyperをつかったHTTP通信をやろうと思うと、3章で自作したAsync Runtimeだけだと不十分。
やりたいことを端的に表すと以下のasync fnの実行になる。
use hyper::{Body, Client, Request, Response};
async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
Ok(Client::builder()
.executor(CustomExecutor)
.build::<_, Body>(CustomConnector)
.request(req)
.await?)
}
これを見るとわかるが、引数としてCustomExecutor
とCustomConnector
を使っていて、これが追加で必要になるよというのを説明するのがおそらく主題。
この2つの構造体は自作するが、実態はhyper
に既にある機能のラッパーで、トレイトの実装関係は以下のようになってる。
-
CustomExecutor
impl<F: Future+Send+'static> hyper::rt::Excector<F>
-
CustomStream
(CustomConnector
の設定に使う)impl tokio::io::AsyncRead
impl tokio::io::AsyncWrite
impl hyper::client::connect::Connection
-
CustomConnector
impl hyper::service::Service<Uri>
後半(mioを使って実装)
他の有名Async Runtimeの基盤としてもつかわれている、限りなく低レイヤーなOSラッパーを提供するmio
というクレートを用いる。
epoll
をラップしたようなインターフェースを使って、サーバー側で動く非同期タスクを作って動かすのがゴール。
気になったところ
p77の説明に違和感
p77に以下のような記述がある。
Because we are using the Service trait for a client call, we will always return Ready for the poll_ready. If we were implementing the Service trait for a server, we could have the following poll_ready function:
fn poll_ready(&mut self, cx: &mut Context<'_>)
-> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
「クライアント用の実装だから常にPoll::Ready
を返せばいい、サーバー用なら以下のように書く」として示されている例が同じように常にPoll::Ready
を返す実装になっているのは文意に合わないので間違っていそう。今のところErattaにも記載されてなかったけど、7月に出るらしい邦訳版ではどうなってるか気になる。
ErrorKing::WouldBlock対処の説明
mio
クレートを使って定義したサーバー側の非同期タスクが以下のようになっている(多少改変している)のだが、
const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);
struct ServerFuture {
server: TcpListener,
// to pool the socket and tell the future when the socket is readable.
poll: MioPoll,
}
impl Future for ServerFuture {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut events = Events::with_capacity(1);
// 200ms 間で event に Event をため込む
self.poll
.poll(&mut events, Some(Duration::from_millis(200)))
.unwrap();
for event in events.iter() {
// このイベントには token が SERVER じゃないものも来ることを示唆している
// このイベントって、OS 全体のやつなのかな?それともこの Rust プログラムに関するものだけ?
// Grok3曰く、この Rust プログラムに関するものだけらしい
if event.token() == SERVER && event.is_readable() {
let (mut stream, _) = self.server.accept().unwrap();
let mut buffer = [0u8; 1024];
let mut received_data = Vec::new();
loop {
match stream.read(&mut buffer) {
Ok(n) if n > 0 => {
received_data.extend_from_slice(&buffer[..n]);
}
Ok(_) => {
break;
}
// これ本に書いてあるけど実行すると何も出力されなくなる
// cx..., return..., じゃなくて break にすると正常に動く(実質的に次の節と同じなのでそれはそう)
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// WouldBlock エラーは、読み取り可能なデータがないことを示す
println!("No more data to read, waiting for more...");
break;
}
Err(e) => {
eprintln!("Error reading from stream: {}", e);
break;
}
}
}
if !received_data.is_empty() {
let received_str = String::from_utf8_lossy(&received_data);
return Poll::Ready(received_str.to_string());
// もしプログラム終了まで socket が poll され続けてほしいなら以下のようにする
// spawn_task!(some_async_handle_function(&received_data)).detach();
// return Poll::Pending;
}
// ここに辿り着いてしまうと hang するように見える
// この実装は self に状態を保存していないから
// poll はステートマシンを一歩進めるみたいな直観をもっていたいっぽい(だから状態の更新をしたい)
// 本の例でも実はここには辿り着いてない
// ↓ ChatGPT の説明
// ただし ハングの直接原因 は TcpStream と受信バッファを self に保持していないこと。
// 末尾に到達したこと自体が悪いのではなく、次回 poll() が呼ばれたときに 続きのステートが無い ので、
// 毎回ほぼ同じ処理(accept()→WouldBlock)を繰り返し、永久に Poll::Pending を返し続ける状態に陥る。
cx.waker().wake_by_ref();
println!("poll function ended, waiting for more data...");
return Poll::Pending;
}
}
cx.waker().wake_by_ref();
Poll::Pending
}
}
本書には、stream.read
の結果がWouldBlock
の場合の節を追加するなら
cx.waker().wake_by_ref();
return Poll::Pending;
をすればいいと書いてあるが、これをやるとクライアントのリクエストが処理されず、プログラム全体も終了しない状態になる。そもそもこの関数の最後のreturn Poll::Pending
に辿り着くと同じ状況になるので、元々の関数が特定の実行パスの場合でしか意図通りに動かない関数であると思う。
LLMに聞く限りでは、このpoll
という関数は背後にあるステートマシンを1歩進める直観を当てはめるとよくて、self
に対して状態を保存しないで処理を終えてることが問題らしいんだけど、ちゃんとわかってない。ここら辺の知識を深めたい場合はどうすればいいんだろうか。考えられる選択肢は
- この本を読み進めていけば部分的に解決する
-
epoll
のような、もっとプリミティブなAPIを使ったトイプログラミングをして直観をつかむ- これは「並行プログラミング入門」に対応する箇所があった気がする
- 非同期プログラミングに関するもっと低レイヤーに詳しい文献をあたる
とかがありそう。そのうちやりたい。

ch5 Coroutines
メモ: https://github.com/ose20/async-rust/tree/main/ch05/examples
難易度: ★
概要
コルーチンを、async/awaitとの関係を念頭に置きながら様々な例を実装して理解を深める構成。特に躓くような箇所はなく、平易でわかりやすい。async/await関係なく、コルーチンの入門としても十分なくらい。ただ簡単めによりすぎていてあまり踏み込んだ例がない。たとえばRustのI/Fではコルーチンはresume()
関数で処理を再開することができるが、このとき引数には任意の型の値を渡すことができる。しかし本書ではこれをほとんど活用していない。
ハイライト
- コルーチンの
Yielded
とCompleted
が、async/awaitのPending
とReady
に対応しているという直観を得られてよかった - コルーチンは
Future
を直観的に実装でき、それをつかえばasync/awaitシステムに組み込める- コルーチンではいつ処理を止めて、いつ再開するのか(はたまたしないのか)を自由に決められるが、 async/awaitシステムではランタイムがスケジューリングを行うので細かい制御ができない

ch6 Reactive Programming
メモ: https://github.com/ose20/async-rust/tree/main/ch06/examples
難易度: ★
概要
- リアクティブプログラミングとはプログラミングパラダイムのことで、データの値の変化やイベントに対してコードが反応するという側面に注目したパラダイム
- リアルタイムで刻一刻と変化する何らかの状態に反応するシステムなどはこれを援用しやすい
構成
- プログラムは大きく分けて二つ
- 1つめ
-
static
なアトミック変数を状態とし、それに対して以下のようなタスクを動かす- 適当に変化させる非同期タスク
- その変化に応じて適当なロジックでさらに状態を変化させる非同期タスク
-
- 2つめ
- 本書ではeventバスと呼ばれる、リアクティブプログラミングの関心の的であるイベントのキューをもちいたmulti consumer singile producer型のチャンネルを用いてリアクティブプログラミングを実現する
- 1つめ
余談
『詳解Rustアトミック操作とロック』でメモリオーダリングについてたくさん読んで書いたので、本書ででてくる不必要に強くパフォーマンスを悪化させるSeqCst
を、その場に応じて最低限の強さのRelaxed
、Release
、Acquire
などに置き換えることができてかなり達成感があった。あとOreillyレベルのちゃんとした技術書でもここらへんの扱いはそんなに厳密じゃないんだなという学びがあった。

ch7 Customizing Tokio
メモ: https://github.com/ose20/async-rust/tree/main/ch07/examples
難易度: ★★
概要
Rustで非同期プログラミングする時のデファクトスタンダードになっている、tokio(非同期ランタイム)の設定をいろいろいじる章。枝葉末節の話になるかと思いきや、tokioのカスタマイズを通じて、非同期ランタイムの責任範囲や、しくみについても多少理解が広がる構成になっていた。
内容
- ランタイムの設定
- スレッドの数
- 時間のかかるタスクの制御方法
- 各ステップでのフック
-
thread_local
という機能 - いろんな非同期タスクから
UnsafeCell
に安全にアクセスする方法 - グレースフルシャットダウン
- シグナルへのフック
- それを確実に機能させる方法
- シグナルへのフック
余談
このセクションに直接書いてあったわけじゃないけど、写経したり内容から連想したりしてasync/awaitに対していくつか気づきがあった。
-
await
はスケジューラ全体を止めるという直観が自分の中に残っていたがこれは間違い- なんでこんな誤解をしたのか
- 簡単なプログラムだと、トップレベルの関数内で、非同期関数であっても結局直列に呼びだすような例しかかかないから
- つまり自分が定義した複数の非同期関数を並列に実行して、それら全部の結果を一斉に待ち合わせるみたいなことをあんまりやってない
- 事実これを書いた時に自分の直観との矛盾に気づいた
- つまり自分が定義した複数の非同期関数を並列に実行して、それら全部の結果を一斉に待ち合わせるみたいなことをあんまりやってない
- 簡単なプログラムだと、トップレベルの関数内で、非同期関数であっても結局直列に呼びだすような例しかかかないから
- なんでこんな誤解をしたのか
-
async/await
が協調的なシステムというのは、非同期タスクがawait
をして制御をスケジューラーに返さなければ、スケジューラーは一生その非同期タスクを実行し続けることになる- なので、非同期関数を提供する側にはこまめに
await
してスケジューラーに制御を返す責任がある- これって
async/await
のめちゃくちゃ重要な要素だと思うけどあんまり知らなかった- なぜ?
- おそらく、そういう、スケジューラーにこまめに制御を返す責任があるような非同期関数は大体ライブラリとして提供されているので自分では書かない
- なぜ?
- これって
- なので、非同期関数を提供する側にはこまめに
引用しているツイートはツリーになっていて、関連する思考の跡が続いている。

ch8 The Actor Model
メモ: https://github.com/ose20/async-rust/tree/main/ch08/examples
難易度: ★★
概要
- アクターモデルを使って async/await プログラミングをやる章
- 場合によっては
Mutex
などの同期機構がいらなくなる可能性を見る - ルーターパターンというデザインパターンでアクターモデル志向の構造を組み立てる
- アクターを組み合わせる
アクターモデルの勘所
- アクターというのは、メッセージのやり取りによって外部と協調する分離されたコード片
- 分離されているのでモジュール性が高かったり、テストがやりやすかったりする
- この章で紹介されていたアクターは全部非同期関数であり、その引数として
channel
のReceiver
をもらうことがポイント- 外部からアクターへの命令はこのチャンネルを通して行われる
- アクターはこの
Receiver
へreceive().await
し続けて、命令が来たら対応するという形になる
- アクターはこの
- 外部からアクターへの命令はこのチャンネルを通して行われる
-
Mutex
などの同期機構へのアクセスをこの関数の押し込めることで、同期機構がいらなくなり、(スコープが関数内で、その関数の中からしかアクセスしないから)ロックなどのオーバーヘッドがなくせる可能性がある
使われてるコードが微妙かも①
Working with Actors Versus Mutexes のセクションで、Mutex
を使うのやめてかわりにアクターを使うとパフォーマンスが向上するというのを説明しているコードがあるんだけど、Mutex
を使う方の次のコードが恐らく無意味に2重の非同期タスクに包んでいて、無駄なオーバーヘッドがある。
let future = async move {
let handle = tokio::spawn(async move {
actor_replacement(state_ref, i).await
});
let _ = handle.await.unwrap();
};
handles.push(tokio::spawn(future));
これをやめて次のようにすると、自分の手元ではMutex
を使う方が実行時間が短くて趣旨が崩壊していた。
let future = async move {
actor_replacement(state_ref, i).await;
};
handles.push(tokio::spawn(future));
ただまぁ本文でも常にアクターの方が優れているわけではなく、優れている場合があるといってるので本筋は崩れていないはず。
使われてるコードが微妙かも②
こっちはより重要度が大きいかも。Creating Actor Supervision のセクションで他のアクターを監視して、不健康だと判断したらそのアクターをリセットするアクターであるheatbeat_actor
を実装している。このheatbeat_actor
には2つの問題点があると思う。
問題点1 そもそもヘルスチェックしない
本セクションではこのheatbeat_actor
以外にkey_value_actor
とwriter_actor
の2つのアクターがおり、さらにそれらへルーティングをするものとしてrouter
がいる。heatbeat_actor
のコードはこうなっている。
async fn heatbeat_actor(mut receiver: Receiver<ActorType>) {
let mut map = HashMap::new();
let timeout_duration = Duration::from_millis(200);
loop {
match time::timeout(timeout_duration, receiver.recv()).await {
Ok(Some(actor_name)) => map.insert(actor_name, Instant::now()),
Ok(None) => break,
Err(_) => {
continue;
}
};
}
let half_second_ago = Instant::now() - Duration::from_millis(500);
for (key, &value) in map.iter() {
if value < half_second_ago {
match key {
ActorType::KeyValue | ActorType::Writer => {
println!("Actor {:?} is not responding, resetting...", key);
ROUTER_SENDER
.get()
.unwrap()
.send(RoutingMessage::Reset(ActorType::KeyValue))
.await
.unwrap();
map.remove(&ActorType::KeyValue);
map.remove(&ActorType::Writer);
break;
}
}
}
}
}
loop
を抜けないと後半のヘルスチェックに行かないのだが、これを抜けるのは、receiver
が閉じたときだけである。そしてreceiver
が閉じるのは、大元のrouter
で保持している対応したsender
が閉じられた時である。しかしrouter
はデーモンとして動いているのでsender
はドロップされないし、明示的に破棄されたりもしない。なのでheatbeat_actor
はヘルスチェックをしていない。
問題点2 ハートビートを送るタイミングが不適切
key_value_actor
とwriter_actor
は同じロジックでrouter
経由でheatbeat_actor
にハートビートを送っている。そのコードが以下の部分である。
async fn key_value_actor(mut receiver: Receiver<KeyValueMessage>) {
let (writer_key_value_sender, writer_key_value_receiver) = channel(32);
let _writer_handle = tokio::spawn(writer_actor(writer_key_value_receiver));
let (get_sender, get_receiver) = oneshot::channel();
writer_key_value_sender
.send(WriterLogMessage::Get(get_sender))
.await;
let mut map = get_receiver.await.unwrap();
let timeout_duration = Duration::from_millis(200);
let router_sender = ROUTER_SENDER.get().unwrap().clone();
loop {
match time::timeout(timeout_duration, receiver.recv()).await {
Ok(Some(message)) => {
if let Some(write_message) = WriterLogMessage::from_key_value_message(&message) {
writer_key_value_sender.send(write_message).await;
}
match message {
KeyValueMessage::Get(GetKeyValueMessage { key, response }) => {
response.send(map.get(&key).cloned());
}
KeyValueMessage::Delete(DeleteKeyValueMessage { key, response }) => {
map.remove(&key);
response.send(());
}
KeyValueMessage::Set(SetKeyValueMessage {
key,
value,
response,
}) => {
map.insert(key, value);
response.send(());
}
}
}
Ok(None) => break,
Err(_) => {
router_sender
.send(RoutingMessage::Heartbeat(ActorType::KeyValue))
.await
.unwrap();
}
}
}
}
ハートビートを送るのは receiver
からの受信が遅延した場合のみである。これだと、遅延せずに滞りなくメッセージを捌いている間はまったくハートビートが送られなくなる。前述のheartbeat_actor
では前回のハートビートから0.5秒すぎてたら死んでると判断してリセットするようになっているので、これだと生きているものをリセットしてしまう問題がある。
上記2つの間違い(?)は、本書で動作確認やテストをしていないから気づかれなかったのだと思われる。他のコードは最低限実行例があるが、このコードはheatbeat_actor
は起動せず、手動でハートビートリクエストを送ることでリセットを引き起こしている。アクターの良さとして、そのモジュール性からくるテスト容易性をあげていただけに、ここもテストが欲しかったところ。

ch9 Design Patterns
メモ: https://github.com/ose20/async-rust/tree/main/ch09
難易度: ★
概要
非同期プログラミングを書く上で役に立ちそうなデザインパターンの紹介。全体的に平易かつ薄味でちょっとだけ期待を下回ってしまった。
内容
- Building an Isolated Module
- すでにコードが存在していて、さらにそれが非同期を全然考慮していない場合にどう非同期システムを組み込むかという話
- 個人的にここが一番面白かった
- 非同期関数のspawnも、joinHandleの待ち合わせも同期的な文脈で実行できるので実は難しくない
- 本質的な違いは、タスクスケジューリングを呼び出し側がうまくやらないといけないという限界があるということ
- Waterfal Design Pattern
- 多分素朴に書くとこうなるんじゃないかなというありふれたパターンだった
- The Decorator Pattern
- JavaのAOPとちょっと近いと思った
- traitをうまくつかって、既存のtype signatureを変えず、既存のコードをいじらずにラッパーを用いて振る舞いを変える
- The State Machine Pattern
- 本書はかなり消化不良な感じで終わっている気がする
- ようは状態とその入力をenumで定義して、(状態, 入力) -> 状態という関数でシステムを表現しようねという話
- よく代数的データ型とその上のパターンマッチがいい言語の条件として挙げられていることが多いけど、その理由の一つは、このパターンが異様に書きやすいからだと思う。網羅性のチェックもコンパイラに任せられるし、認知負荷がかなり減って自分みたいな人間でもコード書くの楽になる
- The Retry Pattern
- わざわざ名前をつけるほど大したことはしてないように感じた
- The Circuit-Breaker Pattern
- これも同様
- 複数スレッドからアクセスできるAtomicな状態を用意して
- ある条件でその状態を「閉じる」
- タスクはその実行の前に状態をチェックして「閉じて」いたら実行せずエラーを返す

ch10 Building an Async Server with No Dependencies
メモ: https://github.com/ose20/async-rust/tree/main/ch10
難易度: ★★
概要
標準ライブラリしかつかわずに、シンプルな非同期ランタイムと、その上でサーバー/クライアント間通信をする。といってもstd::task
、std::future
あたりがあるので期待したほど低レイヤーではなかった。
気になったところ
wake周りの挙動が結局よくわからない
VTABLE
にダミーの関数を登録してなかったことにしていたので、結局本来のランタイムがこれでなにをしているのかを理解することができなかった。
非同期関数を実装する際の勘所がよくわからなかった
async/await非同期システムは協調的システムなので、非同期関数側が制御を返さないとランタイムはその関数をずっと実行してしまうという問題がある。つまり非同期関数には適切なタイミングで制御を返すロジックを持っていないといけない。これは汎用的なライブラリであればあるほどそうなのだが、それをどうやってるかはわからなかった。例えば非同期のI/Oとかはどういったロジックで制御を返すんだろう。
streamの扱い方が多分間違ってる
sender.rs
とserver
モジュールのmain.rs
での非同期の書き込み、読み込みがおそらく間違っている。ここではErrorKind::WouldBlock
をデータの終端として扱っているけど、WouldBlock
はそれを保証しないはずで、例えばデータの終端ではないけど滞っているみたいな時にもWouldBlock
が返るはずなので、まだデータの途中なのに終わったと判断してデシリアライズに進む(そして失敗する)というケースが起き得る。
プログラムがそのままだと意図通りに動かない
client
とserver
を動かすときはそれぞれの前で例えばulimit -n 4096
を実行しないといけない(少なくともmacはそうだと思う)。おそらく一般的に1プロセスが同時に保持できるファイルディスクリプタの数は制限されていて、macだとデフォルトは256になっている。今回のコードは4000件のソケット(ファイルディスクリプタを使う)を同時に扱う可能性があり、ulimit -n 4096
で上限をあげておかないと制限を超えてソケットが作れなくなる。
雑感
まだあと1章あるけど、この章を一番楽しみにしてたのでひと段落。入門としてはいい本で非同期システムに対するいろんな直観が養われたことは確かだけど、いまいち深いところまで踏み込んでくれなかったので、そこはまた別の文献とかをあたろうと思う。何がいいんだろう。
と思いながら調べていたら自分よりずっと踏み込んでいそうな先駆者がいたので、この方とその引用先をまずは頼りに進んでみようと思う。

ch11 Testing
メモ: https://github.com/ose20/async-rust/tree/main/ch11
難易度: ★
概要と詳細
非同期システムまでを対象としたテストのやりかたを紹介する章。
-
Performing Basic Sync Testing
-
mockall
クレートを使ったmockテスト - traitを挟むことでモックができる
- これはasyncシステムのコードでもほぼ同様にできる
-
-
Testing For Deadlocks
~Testing Network Interactions
- 「テスト手法」と呼ばれるほど大掛かりなことはしていない
- 例えば、それぞれの概念を(テストじゃなく)紹介する際に、その動作確認として自然にやるようなものが紹介されている
-
Fine-Grained Future Testing
-
tokio_test
クレートを使った非同期タスクの細かい挙動テスト - 作成しただけでは一切計算がすすまない
tokio_test::task::spawn
を使って非同期タスクを作成する-
poll
を使って次のawait
まで計算ステップを進めることができる - 複数の非同期タスクを、「どこまで、どの順番で
poll
するか」によって各状態に対して assertion ができる
-
-