Rustの非同期を浅く整理する
はじめに
自分の理解の整理も兼ねてRustの非同期を整理してみます。私はRust初心者かつ知能があまり高くないので誤りなどが多分に含まれていると思います。誤りはコメントでご指摘いただけると助かります。
非同期ランタイムはtokioを前提としています。
taskとruntime
Rustの非同期には2つの主要概念があります。task
とruntime
です。
task
task
は処理の単位です。task
はtokio::spawn
という関数で起動されます。またtask
の実行はruntime
によって管理されます。
spaen
のシグネチャを見ればわかりますが、tokio::spawn
はFuture
トレイトの実装を要求します。(Send
と'static
も要求しますが、その理由は省略します。)
Future
トレイトについてはFuture
セクションをご覧ください。
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
use tokio::net::{TcpListener, TcpStream};
use std::io;
async fn process(socket: TcpStream) {
// ...
}
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Process each socket concurrently.
process(socket).await
});
}
}
引用[1]
runtime
runtime
はtask
群の実行を管理します。
task
群のthread
群への割当てはwork stealing
という技術が用いられています[2]。
Future
セクションで説明しますが、tokio
の実行モデルはpoll
(task
を進める処理)呼び出しがすぐ返ってくることを前提としています。つまり処理を進めることができないならとりあえずリターンしてこいということです。
ではいつpoll
を再度呼び出せばいいかという話になるのですが、それはContext
セクションを参照ください。
runtime
を利用するためにはtokio::runtime::Runtime
のblock_on
を呼びます。
ただ以下のようにblock_on
関数が見当たらないコードが多いです。
#[tokio::main]
という属性が付与されていることに注目してください。これはRustの手続きマクロで作成されたもので、このマクロが展開されると以下のような形になります。
#[tokio::main]
async fn main() {
println!("hello");
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
引用[3]
Future
Future
トレイトは以下のように定義されています。
(おいおい、Pin<&mut self>
ってなんやねん。。という方はPin
セクションを見てください。)
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
引用[4]
Future
はイメージ的にはまだ計算が終わっていないかもしれない値だと思ってください。poll
関数を呼ぶことで処理を進めようとします。
poll
関数はそのFutute
がまだ準備ができていなければPoll::Pending
、準備ができていればPoll::Ready(val)
を返します。
poll
関数はすぐに値を返すことが期待されています。またpoll
関数はブロックすべきではありません。なぜならpoll
が処理をブロックしてしまうと、そのFuture
を実行しているスレッドがブロックされてしまうからです。(このようにタスク自身がスケジューラに即座に制御を返すスケジューリング方式は協調的スケジューリングやノンプリエンプティブマルチタスクなどと言うそうです。)
runtime
はtask
群のpoll
をいい感じに呼び出すことで処理を進めていきます。
ではどうやっていい感じに呼び出すのでしょうか?Context
がそれを助けます。
Context
poll
関数はContext
を引数に取っていることが分かると思います。runtime
はFuture
にContext
を渡して、そのFuture
がPending後、再度処理を進めることができるという段階でそのContext
からwaker
を取得してwake
関数を呼んでもらうことを期待します。
なのでruntime
としてはwake
が呼ばれたらそのtask
を実行すると処理が進むんだなということが分かるわけです。
wake
を呼ぶ例としてはWebサーバーのプログラムででクライアントからの接続を待ち受けるtask
があったとして、クライアントからのリクエストがあったときにwake
を呼び出すなどが考えられます。(このようなイベントの待ち受けはmio
というライブラリで実現されているようです。)
ただこのような処理は大体ライブラリに用意されているので、一般ユーザがwake
を呼ぶ機会がどれだけあるのかは良く分かっていません。。
async/await
Future
トレイトを実装するにはpoll
を実装しなければいけないことが分かったと思いますが、どう実装するのでしょうか?
poll
の実装としては手動でstate machine
を実装するというのが考えられます。ただ手動でstate machine
を実装するって嫌ですよね。。
他に策はないのでしょうか?安心してください。あります。Rustはasync/await
という機構を提供しています。
関数にasync
をつけるとその関数がFuture
を返すようになります。またasync
関数内では任意のFuture
をawait
することができます。
async
関数はgenerator
に変換されて、generator
がstate machine
に変換されます。
generator
generator
は再開可能な計算です。関数内でyield
を呼ぶことで呼び元に制御を戻すことができます。Python民には馴染み深いかもしれません。計算が再開可能であるためには中断地点での情報を保持しておく必要があります。
ここで問題が発生します。generator
が保持するデータ構造には自己参照構造体が簡単にできてしまうのですが、もし自己参照構造体を持つgenerator
がムーブするとダングリングポインタができてしまいます。
この問題を解消するのがPin
です。
Pin
Pin
はその背後の値をムーブさせないための型です。Pin
を作るにはその値の所有権を渡さなければいけない、かつSafe RustでPin
から可変参照を取ることができないので、Pin
に渡された値をムーブさせることができません。
generator
が保持するデータ構造は最初にyield
が呼ばれるまでは作られないので(怪しい)、それまでは値をムーブしても問題ありません、したがって、まずPin止めをして(ムーブしますが、前述の理由により問題ありません)、それからpoll
を呼べば、その後値がムーブされることは無いという訳です。
pin_project
SelfはPin止めされていますが、poll
内でselfにアクセスしたいことがあると思います。そんな操作を簡単にするのがpin_project
クレートです。
使い方はドキュメント[5]を参照ください。
おわりに
だいぶあっさり書いたので、これを読むだけで理解するのは難しいと思います。
この記事は概ねRust for rusteaceansの内容ベースで書いているので、Rustの非同期処理に興味のある方を読んでみるといいかもしれません。
参考文献
- https://tokio.rs/tokio/tutorial
- https://www.oreilly.co.jp/books/9784873119786/
- https://nostarch.com/rust-rustaceans
Discussion