👌

Rustの非同期を浅く整理する

2022/05/09に公開

はじめに

自分の理解の整理も兼ねてRustの非同期を整理してみます。私はRust初心者かつ知能があまり高くないので誤りなどが多分に含まれていると思います。誤りはコメントでご指摘いただけると助かります。
非同期ランタイムはtokioを前提としています。

taskとruntime

Rustの非同期には2つの主要概念があります。taskruntimeです。

task

taskは処理の単位です。tasktokio::spawnという関数で起動されます。またtaskの実行はruntimeによって管理されます。
spaenのシグネチャを見ればわかりますが、tokio::spawnFutureトレイトの実装を要求します。(Send'staticも要求しますが、その理由は省略します。)
FutureトレイトについてはFutureセクションをご覧ください。

spawnのシグネチャ
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static, 
spawnのコード例
use tokio::net::{TcpListener, TcpStream};

use std::io;

async fn process(socket: TcpStream) {
    // ...
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Process each socket concurrently.
            process(socket).await
        });
    }
}

引用[1]

runtime

runtimetask群の実行を管理します。
task群のthread群への割当てはwork stealingという技術が用いられています[2]
Futureセクションで説明しますが、tokioの実行モデルはpoll(taskを進める処理)呼び出しがすぐ返ってくることを前提としています。つまり処理を進めることができないならとりあえずリターンしてこいということです。
ではいつpollを再度呼び出せばいいかという話になるのですが、それはContextセクションを参照ください。
runtimeを利用するためにはtokio::runtime::Runtimeblock_onを呼びます。
ただ以下のようにblock_on関数が見当たらないコードが多いです。
#[tokio::main]という属性が付与されていることに注目してください。これはRustの手続きマクロで作成されたもので、このマクロが展開されると以下のような形になります。

展開前
#[tokio::main]
async fn main() {
    println!("hello");
}
展開後
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

引用[3]

Future

Futureトレイトは以下のように定義されています。
(おいおい、Pin<&mut self>ってなんやねん。。という方はPinセクションを見てください。)

Futureトレイト
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

引用[4]

Futureはイメージ的にはまだ計算が終わっていないかもしれない値だと思ってください。poll関数を呼ぶことで処理を進めようとします。

poll関数はそのFututeがまだ準備ができていなければPoll::Pending、準備ができていればPoll::Ready(val)を返します。
poll関数はすぐに値を返すことが期待されています。またpoll関数はブロックすべきではありません。なぜならpollが処理をブロックしてしまうと、そのFutureを実行しているスレッドがブロックされてしまうからです。(このようにタスク自身がスケジューラに即座に制御を返すスケジューリング方式は協調的スケジューリングやノンプリエンプティブマルチタスクなどと言うそうです。)
runtimetask群のpollをいい感じに呼び出すことで処理を進めていきます。
ではどうやっていい感じに呼び出すのでしょうか?Contextがそれを助けます。

Context

poll関数はContextを引数に取っていることが分かると思います。runtimeFutureContextを渡して、そのFutureがPending後、再度処理を進めることができるという段階でそのContextからwakerを取得してwake関数を呼んでもらうことを期待します。
なのでruntimeとしてはwakeが呼ばれたらそのtaskを実行すると処理が進むんだなということが分かるわけです。
wakeを呼ぶ例としてはWebサーバーのプログラムででクライアントからの接続を待ち受けるtaskがあったとして、クライアントからのリクエストがあったときにwakeを呼び出すなどが考えられます。(このようなイベントの待ち受けはmioというライブラリで実現されているようです。)
ただこのような処理は大体ライブラリに用意されているので、一般ユーザがwakeを呼ぶ機会がどれだけあるのかは良く分かっていません。。

async/await

Futureトレイトを実装するにはpollを実装しなければいけないことが分かったと思いますが、どう実装するのでしょうか?
pollの実装としては手動でstate machineを実装するというのが考えられます。ただ手動でstate machineを実装するって嫌ですよね。。
他に策はないのでしょうか?安心してください。あります。Rustはasync/awaitという機構を提供しています。
関数にasyncをつけるとその関数がFutureを返すようになります。またasync関数内では任意のFutureawaitすることができます。
async関数はgeneratorに変換されて、generatorstate machineに変換されます。

generator

generatorは再開可能な計算です。関数内でyieldを呼ぶことで呼び元に制御を戻すことができます。Python民には馴染み深いかもしれません。計算が再開可能であるためには中断地点での情報を保持しておく必要があります。
ここで問題が発生します。generatorが保持するデータ構造には自己参照構造体が簡単にできてしまうのですが、もし自己参照構造体を持つgeneratorがムーブするとダングリングポインタができてしまいます。
この問題を解消するのがPinです。

Pin

Pinはその背後の値をムーブさせないための型です。Pinを作るにはその値の所有権を渡さなければいけない、かつSafe RustでPinから可変参照を取ることができないので、Pinに渡された値をムーブさせることができません。
generatorが保持するデータ構造は最初にyieldが呼ばれるまでは作られないので(怪しい)、それまでは値をムーブしても問題ありません、したがって、まずPin止めをして(ムーブしますが、前述の理由により問題ありません)、それからpollを呼べば、その後値がムーブされることは無いという訳です。

pin_project

SelfはPin止めされていますが、poll内でselfにアクセスしたいことがあると思います。そんな操作を簡単にするのがpin_projectクレートです。
使い方はドキュメント[5]を参照ください。

おわりに

だいぶあっさり書いたので、これを読むだけで理解するのは難しいと思います。
この記事は概ねRust for rusteaceansの内容ベースで書いているので、Rustの非同期処理に興味のある方を読んでみるといいかもしれません。

参考文献

脚注
  1. https://docs.rs/tokio/latest/tokio/fn.spawn.html ↩︎

  2. https://tokio.rs/blog/2019-10-scheduler ↩︎

  3. https://tokio.rs/tokio/tutorial/hello-tokio ↩︎

  4. https://doc.rust-lang.org/std/future/trait.Future.html ↩︎

  5. https://docs.rs/pin-project/latest/pin_project/ ↩︎

GitHubで編集を提案

Discussion