Open9

Async Rustの読書メモ

ose20ose20

ch03: Building Our Own Async Queues

メモ: https://github.com/ose20/async-rust/tree/main/ch03/examples
難易度: ★★

https://www.oreilly.com/library/view/async-rust/9781098149086/

概要

以下の3つのクレートに頼った上で、簡単な非同期ランタイムを作成する(標準ライブラリだけを使って自作するのは10章らしい。楽しみだね)。

  • async-task
  • futures-lite
  • flume

最も単純な実装から、少しずつそのモチベーションを共有しながら機能を追加していく構成になっていて楽しい。

構成

  • 最も基本的な実装
    • タスクを管理するキューもそれを実行するスレッドも1つ
  • タスクを実行するスレッドを複数に増やす
  • タスクを管理するキューを複数に増やす
    • 実行したいタスクに優先度などがある場合、キューによって区別できるようになる
  • タスクを他のキューから奪う
    • 片方のキューでタスクが空な時に、別のキューからタスクを取ってこれる
  • spawn関数のリファクタ
    • マクロを使ってデフォルト引数のようなものを実現する
  • 初期化処理
    • LazyLockで実装しているタスクキューを起動するための初期化処理
  • バックグラウンドで常に走らせ続けるタスクの実装
    • 例えばログローテートとかで常に非同期ランタイム上で走らせておきたいタスクの実現
ose20ose20

ch04 Integrating Networking into Our Own Async Runtime

メモ: https://github.com/ose20/async-rust/tree/main/ch04/examples
難易度: ★★★★

概要

3章で作成したAsync Runtimeを使って、ネット越しの通信を実現する。主に2つのパートに分かれており、前半はhyperをラップしてHTTPクライアントをAsync Runtime上で動かす。後半OSのasync部分を最低限ラップしたmioを使い、localhost上にサーバーを立ててリクエストの送受信を行う。

構成詳細

前半(hyperをラップ)

hyperクレートのClientRequest<T>Response<T>といった、hyperをつかったHTTP通信をやろうと思うと、3章で自作したAsync Runtimeだけだと不十分。

やりたいことを端的に表すと以下のasync fnの実行になる。

use hyper::{Body, Client, Request, Response};

async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
    Ok(Client::builder()
        .executor(CustomExecutor)
        .build::<_, Body>(CustomConnector)
        .request(req)
        .await?)
}

これを見るとわかるが、引数としてCustomExecutorCustomConnectorを使っていて、これが追加で必要になるよというのを説明するのがおそらく主題。

この2つの構造体は自作するが、実態はhyperに既にある機能のラッパーで、トレイトの実装関係は以下のようになってる。

  • CustomExecutor
    • impl<F: Future+Send+'static> hyper::rt::Excector<F>
  • CustomStream(CustomConnectorの設定に使う)
    • impl tokio::io::AsyncRead
    • impl tokio::io::AsyncWrite
    • impl hyper::client::connect::Connection
  • CustomConnector
    • impl hyper::service::Service<Uri>

後半(mioを使って実装)

他の有名Async Runtimeの基盤としてもつかわれている、限りなく低レイヤーなOSラッパーを提供するmioというクレートを用いる。

epollをラップしたようなインターフェースを使って、サーバー側で動く非同期タスクを作って動かすのがゴール。

気になったところ

p77の説明に違和感

p77に以下のような記述がある。

Because we are using the Service trait for a client call, we will always return Ready for the poll_ready. If we were implementing the Service trait for a server, we could have the following poll_ready function:

fn poll_ready(&mut self, cx: &mut Context<'_>)
    -> Poll<Result<(), Error>> {
    Poll::Ready(Ok(()))
}

「クライアント用の実装だから常にPoll::Readyを返せばいい、サーバー用なら以下のように書く」として示されている例が同じように常にPoll::Readyを返す実装になっているのは文意に合わないので間違っていそう。今のところErattaにも記載されてなかったけど、7月に出るらしい邦訳版ではどうなってるか気になる。

ErrorKing::WouldBlock対処の説明

mioクレートを使って定義したサーバー側の非同期タスクが以下のようになっている(多少改変している)のだが、


const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);

struct ServerFuture {
    server: TcpListener,
    // to pool the socket and tell the future when the socket is readable.
    poll: MioPoll,
}

impl Future for ServerFuture {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut events = Events::with_capacity(1);

        // 200ms 間で event に Event をため込む
        self.poll
            .poll(&mut events, Some(Duration::from_millis(200)))
            .unwrap();

        for event in events.iter() {
            // このイベントには token が SERVER じゃないものも来ることを示唆している
            // このイベントって、OS 全体のやつなのかな?それともこの Rust プログラムに関するものだけ?
            // Grok3曰く、この Rust プログラムに関するものだけらしい
            if event.token() == SERVER && event.is_readable() {
                let (mut stream, _) = self.server.accept().unwrap();
                let mut buffer = [0u8; 1024];
                let mut received_data = Vec::new();

                loop {
                    match stream.read(&mut buffer) {
                        Ok(n) if n > 0 => {
                            received_data.extend_from_slice(&buffer[..n]);
                        }
                        Ok(_) => {
                            break;
                        }
                        // これ本に書いてあるけど実行すると何も出力されなくなる
                        // cx..., return..., じゃなくて break にすると正常に動く(実質的に次の節と同じなのでそれはそう)
                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                            // WouldBlock エラーは、読み取り可能なデータがないことを示す
                            println!("No more data to read, waiting for more...");
                            break;
                        }
                        Err(e) => {
                            eprintln!("Error reading from stream: {}", e);
                            break;
                        }
                    }
                }

                if !received_data.is_empty() {
                    let received_str = String::from_utf8_lossy(&received_data);
                    return Poll::Ready(received_str.to_string());

                    // もしプログラム終了まで socket が poll され続けてほしいなら以下のようにする
                    // spawn_task!(some_async_handle_function(&received_data)).detach();
                    // return Poll::Pending;
                }

                // ここに辿り着いてしまうと hang するように見える
                // この実装は self に状態を保存していないから
                // poll はステートマシンを一歩進めるみたいな直観をもっていたいっぽい(だから状態の更新をしたい)
                // 本の例でも実はここには辿り着いてない
                // ↓ ChatGPT の説明
                // ただし ハングの直接原因 は TcpStream と受信バッファを self に保持していないこと。
                // 末尾に到達したこと自体が悪いのではなく、次回 poll() が呼ばれたときに 続きのステートが無い ので、
                // 毎回ほぼ同じ処理(accept()→WouldBlock)を繰り返し、永久に Poll::Pending を返し続ける状態に陥る。
                cx.waker().wake_by_ref();
                println!("poll function ended, waiting for more data...");
                return Poll::Pending;
            }
        }

        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

本書には、stream.readの結果がWouldBlockの場合の節を追加するなら

cx.waker().wake_by_ref();
return Poll::Pending;

をすればいいと書いてあるが、これをやるとクライアントのリクエストが処理されず、プログラム全体も終了しない状態になる。そもそもこの関数の最後のreturn Poll::Pendingに辿り着くと同じ状況になるので、元々の関数が特定の実行パスの場合でしか意図通りに動かない関数であると思う。

LLMに聞く限りでは、このpollという関数は背後にあるステートマシンを1歩進める直観を当てはめるとよくて、selfに対して状態を保存しないで処理を終えてることが問題らしいんだけど、ちゃんとわかってない。ここら辺の知識を深めたい場合はどうすればいいんだろうか。考えられる選択肢は

  • この本を読み進めていけば部分的に解決する
  • epollのような、もっとプリミティブなAPIを使ったトイプログラミングをして直観をつかむ
    • これは「並行プログラミング入門」に対応する箇所があった気がする
  • 非同期プログラミングに関するもっと低レイヤーに詳しい文献をあたる
    とかがありそう。そのうちやりたい。
ose20ose20

ch5 Coroutines

メモ: https://github.com/ose20/async-rust/tree/main/ch05/examples
難易度: ★

概要

コルーチンを、async/awaitとの関係を念頭に置きながら様々な例を実装して理解を深める構成。特に躓くような箇所はなく、平易でわかりやすい。async/await関係なく、コルーチンの入門としても十分なくらい。ただ簡単めによりすぎていてあまり踏み込んだ例がない。たとえばRustのI/Fではコルーチンはresume()関数で処理を再開することができるが、このとき引数には任意の型の値を渡すことができる。しかし本書ではこれをほとんど活用していない。

ハイライト

  • コルーチンのYieldedCompletedが、async/awaitのPendingReadyに対応しているという直観を得られてよかった
  • コルーチンはFutureを直観的に実装でき、それをつかえばasync/awaitシステムに組み込める
    • コルーチンではいつ処理を止めて、いつ再開するのか(はたまたしないのか)を自由に決められるが、 async/awaitシステムではランタイムがスケジューリングを行うので細かい制御ができない
ose20ose20

ch6 Reactive Programming

メモ: https://github.com/ose20/async-rust/tree/main/ch06/examples
難易度: ★

概要

  • リアクティブプログラミングとはプログラミングパラダイムのことで、データの値の変化やイベントに対してコードが反応するという側面に注目したパラダイム
  • リアルタイムで刻一刻と変化する何らかの状態に反応するシステムなどはこれを援用しやすい

構成

  • プログラムは大きく分けて二つ
    • 1つめ
      • staticなアトミック変数を状態とし、それに対して以下のようなタスクを動かす
        • 適当に変化させる非同期タスク
        • その変化に応じて適当なロジックでさらに状態を変化させる非同期タスク
    • 2つめ
      • 本書ではeventバスと呼ばれる、リアクティブプログラミングの関心の的であるイベントのキューをもちいたmulti consumer singile producer型のチャンネルを用いてリアクティブプログラミングを実現する

余談

『詳解Rustアトミック操作とロック』でメモリオーダリングについてたくさん読んで書いたので、本書ででてくる不必要に強くパフォーマンスを悪化させるSeqCstを、その場に応じて最低限の強さのRelaxedReleaseAcquireなどに置き換えることができてかなり達成感があった。あとOreillyレベルのちゃんとした技術書でもここらへんの扱いはそんなに厳密じゃないんだなという学びがあった。

https://www.oreilly.co.jp/books/9784814400515/

ose20ose20

ch7 Customizing Tokio

メモ: https://github.com/ose20/async-rust/tree/main/ch07/examples
難易度: ★★

概要

Rustで非同期プログラミングする時のデファクトスタンダードになっている、tokio(非同期ランタイム)の設定をいろいろいじる章。枝葉末節の話になるかと思いきや、tokioのカスタマイズを通じて、非同期ランタイムの責任範囲や、しくみについても多少理解が広がる構成になっていた。

内容

  • ランタイムの設定
    • スレッドの数
    • 時間のかかるタスクの制御方法
    • 各ステップでのフック
  • thread_localという機能
  • いろんな非同期タスクからUnsafeCellに安全にアクセスする方法
  • グレースフルシャットダウン
    • シグナルへのフック
      • それを確実に機能させる方法

余談

このセクションに直接書いてあったわけじゃないけど、写経したり内容から連想したりしてasync/awaitに対していくつか気づきがあった。

  • awaitはスケジューラ全体を止めるという直観が自分の中に残っていたがこれは間違い
    • なんでこんな誤解をしたのか
      • 簡単なプログラムだと、トップレベルの関数内で、非同期関数であっても結局直列に呼びだすような例しかかかないから
        • つまり自分が定義した複数の非同期関数を並列に実行して、それら全部の結果を一斉に待ち合わせるみたいなことをあんまりやってない
          • 事実これを書いた時に自分の直観との矛盾に気づいた
  • async/awaitが協調的なシステムというのは、非同期タスクがawaitをして制御をスケジューラーに返さなければ、スケジューラーは一生その非同期タスクを実行し続けることになる
    • なので、非同期関数を提供する側にはこまめにawaitしてスケジューラーに制御を返す責任がある
      • これってasync/awaitのめちゃくちゃ重要な要素だと思うけどあんまり知らなかった
        • なぜ?
          • おそらく、そういう、スケジューラーにこまめに制御を返す責任があるような非同期関数は大体ライブラリとして提供されているので自分では書かない

引用しているツイートはツリーになっていて、関連する思考の跡が続いている。
https://x.com/ose20_/status/1934579984221528491
https://x.com/ose20_/status/1934997606343827485

ose20ose20

ch8 The Actor Model

メモ: https://github.com/ose20/async-rust/tree/main/ch08/examples
難易度: ★★

概要

  • アクターモデルを使って async/await プログラミングをやる章
  • 場合によっては Mutex などの同期機構がいらなくなる可能性を見る
  • ルーターパターンというデザインパターンでアクターモデル志向の構造を組み立てる
  • アクターを組み合わせる

アクターモデルの勘所

  • アクターというのは、メッセージのやり取りによって外部と協調する分離されたコード片
    • 分離されているのでモジュール性が高かったり、テストがやりやすかったりする
  • この章で紹介されていたアクターは全部非同期関数であり、その引数としてchannelReceiverをもらうことがポイント
    • 外部からアクターへの命令はこのチャンネルを通して行われる
      • アクターはこのReceiverreceive().awaitし続けて、命令が来たら対応するという形になる
  • Mutexなどの同期機構へのアクセスをこの関数の押し込めることで、同期機構がいらなくなり、(スコープが関数内で、その関数の中からしかアクセスしないから)ロックなどのオーバーヘッドがなくせる可能性がある

使われてるコードが微妙かも①

Working with Actors Versus Mutexes のセクションで、Mutexを使うのやめてかわりにアクターを使うとパフォーマンスが向上するというのを説明しているコードがあるんだけど、Mutexを使う方の次のコードが恐らく無意味に2重の非同期タスクに包んでいて、無駄なオーバーヘッドがある。

let future = async move {
    let handle = tokio::spawn(async move {
        actor_replacement(state_ref, i).await
    });
     let _ = handle.await.unwrap();
};
handles.push(tokio::spawn(future));

これをやめて次のようにすると、自分の手元ではMutexを使う方が実行時間が短くて趣旨が崩壊していた。

let future = async move {
    actor_replacement(state_ref, i).await;
};
handles.push(tokio::spawn(future));

ただまぁ本文でも常にアクターの方が優れているわけではなく、優れている場合があるといってるので本筋は崩れていないはず。

使われてるコードが微妙かも②

こっちはより重要度が大きいかも。Creating Actor Supervision のセクションで他のアクターを監視して、不健康だと判断したらそのアクターをリセットするアクターであるheatbeat_actorを実装している。このheatbeat_actorには2つの問題点があると思う。

問題点1 そもそもヘルスチェックしない

本セクションではこのheatbeat_actor以外にkey_value_actorwriter_actorの2つのアクターがおり、さらにそれらへルーティングをするものとしてrouterがいる。heatbeat_actorのコードはこうなっている。

async fn heatbeat_actor(mut receiver: Receiver<ActorType>) {
    let mut map = HashMap::new();
    let timeout_duration = Duration::from_millis(200);
    loop {
        match time::timeout(timeout_duration, receiver.recv()).await {
            Ok(Some(actor_name)) => map.insert(actor_name, Instant::now()),
            Ok(None) => break,
            Err(_) => {
                continue;
            }
        };
    }

    let half_second_ago = Instant::now() - Duration::from_millis(500);
    for (key, &value) in map.iter() {
        if value < half_second_ago {
            match key {
                ActorType::KeyValue | ActorType::Writer => {
                    println!("Actor {:?} is not responding, resetting...", key);
                    ROUTER_SENDER
                        .get()
                        .unwrap()
                        .send(RoutingMessage::Reset(ActorType::KeyValue))
                        .await
                        .unwrap();

                    map.remove(&ActorType::KeyValue);
                    map.remove(&ActorType::Writer);

                    break;
                }
            }
        }
    }
}

loopを抜けないと後半のヘルスチェックに行かないのだが、これを抜けるのは、receiverが閉じたときだけである。そしてreceiverが閉じるのは、大元のrouterで保持している対応したsenderが閉じられた時である。しかしrouterはデーモンとして動いているのでsenderはドロップされないし、明示的に破棄されたりもしない。なのでheatbeat_actorはヘルスチェックをしていない。

問題点2 ハートビートを送るタイミングが不適切

key_value_actorwriter_actorは同じロジックでrouter経由でheatbeat_actorにハートビートを送っている。そのコードが以下の部分である。

async fn key_value_actor(mut receiver: Receiver<KeyValueMessage>) {
    let (writer_key_value_sender, writer_key_value_receiver) = channel(32);
    let _writer_handle = tokio::spawn(writer_actor(writer_key_value_receiver));
    let (get_sender, get_receiver) = oneshot::channel();
    writer_key_value_sender
        .send(WriterLogMessage::Get(get_sender))
        .await;
    let mut map = get_receiver.await.unwrap();

    let timeout_duration = Duration::from_millis(200);
    let router_sender = ROUTER_SENDER.get().unwrap().clone();
    loop {
        match time::timeout(timeout_duration, receiver.recv()).await {
            Ok(Some(message)) => {
                if let Some(write_message) = WriterLogMessage::from_key_value_message(&message) {
                    writer_key_value_sender.send(write_message).await;
                }

                match message {
                    KeyValueMessage::Get(GetKeyValueMessage { key, response }) => {
                        response.send(map.get(&key).cloned());
                    }
                    KeyValueMessage::Delete(DeleteKeyValueMessage { key, response }) => {
                        map.remove(&key);
                        response.send(());
                    }
                    KeyValueMessage::Set(SetKeyValueMessage {
                        key,
                        value,
                        response,
                    }) => {
                        map.insert(key, value);
                        response.send(());
                    }
                }
            }
            Ok(None) => break,
            Err(_) => {
                router_sender
                    .send(RoutingMessage::Heartbeat(ActorType::KeyValue))
                    .await
                    .unwrap();
            }
        }
    }
}

ハートビートを送るのは receiver からの受信が遅延した場合のみである。これだと、遅延せずに滞りなくメッセージを捌いている間はまったくハートビートが送られなくなる。前述のheartbeat_actorでは前回のハートビートから0.5秒すぎてたら死んでると判断してリセットするようになっているので、これだと生きているものをリセットしてしまう問題がある。

上記2つの間違い(?)は、本書で動作確認やテストをしていないから気づかれなかったのだと思われる。他のコードは最低限実行例があるが、このコードはheatbeat_actorは起動せず、手動でハートビートリクエストを送ることでリセットを引き起こしている。アクターの良さとして、そのモジュール性からくるテスト容易性をあげていただけに、ここもテストが欲しかったところ。

ose20ose20

ch9 Design Patterns

メモ: https://github.com/ose20/async-rust/tree/main/ch09
難易度: ★

概要

非同期プログラミングを書く上で役に立ちそうなデザインパターンの紹介。全体的に平易かつ薄味でちょっとだけ期待を下回ってしまった。

内容

  • Building an Isolated Module
    • すでにコードが存在していて、さらにそれが非同期を全然考慮していない場合にどう非同期システムを組み込むかという話
    • 個人的にここが一番面白かった
    • 非同期関数のspawnも、joinHandleの待ち合わせも同期的な文脈で実行できるので実は難しくない
    • 本質的な違いは、タスクスケジューリングを呼び出し側がうまくやらないといけないという限界があるということ
  • Waterfal Design Pattern
    • 多分素朴に書くとこうなるんじゃないかなというありふれたパターンだった
  • The Decorator Pattern
    • JavaのAOPとちょっと近いと思った
    • traitをうまくつかって、既存のtype signatureを変えず、既存のコードをいじらずにラッパーを用いて振る舞いを変える
  • The State Machine Pattern
    • 本書はかなり消化不良な感じで終わっている気がする
    • ようは状態とその入力をenumで定義して、(状態, 入力) -> 状態という関数でシステムを表現しようねという話
      • よく代数的データ型とその上のパターンマッチがいい言語の条件として挙げられていることが多いけど、その理由の一つは、このパターンが異様に書きやすいからだと思う。網羅性のチェックもコンパイラに任せられるし、認知負荷がかなり減って自分みたいな人間でもコード書くの楽になる
  • The Retry Pattern
    • わざわざ名前をつけるほど大したことはしてないように感じた
  • The Circuit-Breaker Pattern
    • これも同様
    • 複数スレッドからアクセスできるAtomicな状態を用意して
      • ある条件でその状態を「閉じる」
      • タスクはその実行の前に状態をチェックして「閉じて」いたら実行せずエラーを返す
ose20ose20

ch10 Building an Async Server with No Dependencies

メモ: https://github.com/ose20/async-rust/tree/main/ch10
難易度: ★★

概要

標準ライブラリしかつかわずに、シンプルな非同期ランタイムと、その上でサーバー/クライアント間通信をする。といってもstd::taskstd::futureあたりがあるので期待したほど低レイヤーではなかった。

気になったところ

wake周りの挙動が結局よくわからない

VTABLEにダミーの関数を登録してなかったことにしていたので、結局本来のランタイムがこれでなにをしているのかを理解することができなかった。

非同期関数を実装する際の勘所がよくわからなかった

async/await非同期システムは協調的システムなので、非同期関数側が制御を返さないとランタイムはその関数をずっと実行してしまうという問題がある。つまり非同期関数には適切なタイミングで制御を返すロジックを持っていないといけない。これは汎用的なライブラリであればあるほどそうなのだが、それをどうやってるかはわからなかった。例えば非同期のI/Oとかはどういったロジックで制御を返すんだろう。

streamの扱い方が多分間違ってる

sender.rsserverモジュールのmain.rsでの非同期の書き込み、読み込みがおそらく間違っている。ここではErrorKind::WouldBlockをデータの終端として扱っているけど、WouldBlockはそれを保証しないはずで、例えばデータの終端ではないけど滞っているみたいな時にもWouldBlockが返るはずなので、まだデータの途中なのに終わったと判断してデシリアライズに進む(そして失敗する)というケースが起き得る。

プログラムがそのままだと意図通りに動かない

clientserverを動かすときはそれぞれの前で例えばulimit -n 4096を実行しないといけない(少なくともmacはそうだと思う)。おそらく一般的に1プロセスが同時に保持できるファイルディスクリプタの数は制限されていて、macだとデフォルトは256になっている。今回のコードは4000件のソケット(ファイルディスクリプタを使う)を同時に扱う可能性があり、ulimit -n 4096で上限をあげておかないと制限を超えてソケットが作れなくなる。

雑感

まだあと1章あるけど、この章を一番楽しみにしてたのでひと段落。入門としてはいい本で非同期システムに対するいろんな直観が養われたことは確かだけど、いまいち深いところまで踏み込んでくれなかったので、そこはまた別の文献とかをあたろうと思う。何がいいんだろう。

と思いながら調べていたら自分よりずっと踏み込んでいそうな先駆者がいたので、この方とその引用先をまずは頼りに進んでみようと思う。
https://zenn.dev/toru3/articles/9bbb620e0637cd

ose20ose20

ch11 Testing

メモ: https://github.com/ose20/async-rust/tree/main/ch11
難易度: ★

概要と詳細

非同期システムまでを対象としたテストのやりかたを紹介する章。

  • Performing Basic Sync Testing
    • mockallクレートを使ったmockテスト
    • traitを挟むことでモックができる
    • これはasyncシステムのコードでもほぼ同様にできる
  • Testing For Deadlocks ~ Testing Network Interactions
    • 「テスト手法」と呼ばれるほど大掛かりなことはしていない
    • 例えば、それぞれの概念を(テストじゃなく)紹介する際に、その動作確認として自然にやるようなものが紹介されている
  • Fine-Grained Future Testing
    • tokio_testクレートを使った非同期タスクの細かい挙動テスト
    • 作成しただけでは一切計算がすすまないtokio_test::task::spawnを使って非同期タスクを作成する
      • pollを使って次のawaitまで計算ステップを進めることができる
      • 複数の非同期タスクを、「どこまで、どの順番でpollするか」によって各状態に対して assertion ができる