Closed6
「Futures Explained in 200 Lines of Rust」を読む
Futures Explained in 200 Lines of Rust
- Rust の Future について詳細な説明をしている小さなbook
- 内容が少し古いが、Future を理解する上で大事そうなことが書かれている
- このスクラップは読んだときのメモ
Futures in Rust
概要
Future は未来に完了する何らかの操作を表現した概念
フェーズ
Rust の async は poll ベースのアプローとをとっている、非同期タスクは3つのフェーズがある
- poll phase
- Future を poll し、タスクが進行不能になるまで進行させる
- Future を pull する部分は executor と呼ぶ
- wait phase
- Future がイベント(I/Oとか?)発生していることを登録し、イベントの準備ができたら Future を起こす
- wake phase
- Future を再度 poll するようにスケジュールする
- Future を poll して、再度タスクを進行させる
Leaf futures
- Non-leaf-futures のコードと比べると、
asyncキーワードがあるかどうかの違いになっている - 恐らく、非同期で処理できる最小限のタスクのことを指していると思われる
- コード例を見ると、tcp connection を作る処理はアトミックであり、処理を中断することができないので、スレッドに逃しても stream を取得するため、結局待ちが発生して意味がない
// stream is a **leaf-future**
let mut stream = tokio::net::TcpStream::connect("127.0.0.1:3000");
Non leaf futures
-
aysncキーワードを使って executor が実行できるタスクを作成する Future のこと - 要は
async {}のこと
// Non-leaf-future
let non_leaf = async {
let mut stream = TcpStream::connect("127.0.0.1:3000").await.unwrap();// <- yield
println!("connected!");
let result = stream.write(b"hello world\n").await; // <- yield
println!("message sent!");
...
};
Runtimes
- Rust で動作する非同期の仕組みで、以下のものが登場する
- Reactor
- Executor
- Future
- これら3つは
Wakerと呼ばれる共有されたオブジェクトで連携を行っている
雑な図を示すとこんな感じ(あっていなさそうなので、完全理解したらまた更新する)
- Executor が初めて Future を poll するとき、 Waker の参照クローンを渡す
- Reactor が Future の準備ができたと判断したら、Waker を通じて Future をスケジュールして、 Executor が Future を poll できるようにする
簡単にまとめると Future を実行するのが Runtime で、そのデファクトスタンダードが tokio。
他にもfuturesがある。
NOTE: futures で Future を実行するときの実装はここらへん。見た感じ
CPU or I/O が重いタスクを処理するとき、別スレッドに逃がすための手段として spawn_blocking() のようなメソッドを提供している。
ちなみに、tokio はそれを提供していて、tokio::task::spawn_blocking() は同期処理しかできないので、
更に非同期処理をしたい場合は tokio::spawn() を使う必要がある。
Waker and Context
The Waker & Understanding the Waker
まとめると
- Waker は実際そのまま使われているのではなく、 Waker を包んだ Context が用意されている
- Waker の実装は Executor に依存するが、どの Waker も同じようなインターフェイスを持つ
- Waker は Trait のようなものだが、Trait として扱うと動的ディスパッチをする必要があり、パフォーマンスコストがかかる
- パフォーマンスコストをユーザが受け入れたとしても、
Arc<Box<dyn Waker>>のようにしてヒープ割当を必要とすることになる
Fat pointers in Rust
簡単にまとめると
- トレイトオブジェクトのデータ構造は、データオブジェクトとそのオブジェクトに紐付けられている関数(要はメソッド)への参照を持つオブジェクト
- Waker より柔軟にメモリを管理できるようにするため、通常のトレイトオブジェクトを使わずに実装している
Generators and async/await
Why learn about generators?
- generator/yield と async/await は同等
- Rust が並行処理を行う方法を設定した際に、以下の3つを主に議論されたらしい
- stackfull coroutine(グリーンスレッド、いつでも中断/再開できる)
- stackless coroutine(ジェネレーター、中断/再開できるタイミングが限られる)
- combinator
- stackfull/stackless coroutine の違いの詳細についてはこちらを参照
Combinators
- Futures 0.1 ではコンビネーターを使っていた
let future = Connection::connect(conn_str).and_then(|conn| { conn.query("somerequest").map(|row|{ SomeStruct::from(row) }).collect::<Vec<SomeStruct>>() }); let rows: Result<Vec<SomeStruct>, SomeLibraryError> = block_on(future); - コンビネーターだといくつかデメリットがある
- コンビネーター間での借用ができない
- コールバック数の数だけ、メモリ使用量が増える
- コールバックが増える = 関数の呼び出しが増える = キャプチャが増える
Stackless coroutines/generators
- 現在Rustが使っているモデル
- いくつかメリットがある
- async/await を stackless coroutine に変換するには簡単
- コンテキストスイッチやCPUのステートを保存・復元が必要ない
- 動的スタック確保が不要
- メモリ効率が良い
- 中断ポイントを跨いで借用ができる
async fn myfn() {
let text = String::from("Hello world");
let borrowed = &text[0..5];
somefuture.await;
println!("{}", borrowed);
}
How generators work
-
yieldキーワードを使うと、ジェネレーターに変換される - このコード中断ポイントの間で借用が発生するこれがステートマシンに変換すると次のように、ジェネレーターにステートを持たせることになる
let mut generator = move || { let to_borrow = String::from("Hello"); let borrowed = &to_borrow; yield borrowed.len(); println!("{} world!", borrowed); };しかし、このやり方だとメモリが変化すると想定外の挙動になることがあるenum GeneratorA { Enter, Yield1 { to_borrow: String, borrowed: *const String, // NB! This is now a raw pointer! }, Exit, }
ので、Pinを使ってメモリを固定する必要があるpub fn main() { let mut gen = GeneratorA::start(); let mut gen2 = GeneratorA::start(); if let GeneratorState::Yielded(n) = gen.resume() { println!("Got value {}", n); } std::mem::swap(&mut gen, &mut gen2); // <--- Big problem! if let GeneratorState::Yielded(n) = gen2.resume() { println!("Got value {}", n); } // This would now start gen2 since we swapped them. if let GeneratorState::Complete(()) = gen.resume() { () }; }
Async and generators
ジェネレーターとasyncとの関連性は次の通り
- async ブロックは Future を返すが、Future はジェネレーターのように動く
-
Generator::resumeの代わりに、Future::pollを使う -
Yieldedの代わりにPending、Completeの代わりにReadyを返す - Futureの各awaitポイントはジェネレーターのYieldポイントのようなもの
Pin
より詳細なことはPinのdocを参照
簡潔にまとめると次の通り
-
Pinはメモリの位置が変わらないことを保証する- 自己参照構造体を作るときに使う
- どうやって保証しているか?
-
Pin::new(P)で渡せるPはUnpinを実装している必要があるimpl<P: Deref<Target: Unpin>> Pin<P> { // ... pub const fn new(pointer: P) -> Pin<P> { // ... } } // ... pub const fn get_mut(self) -> &'a mut T where T: Unpin, { // ... }
-
-
Unpinはメモリの位置が変わっても問題ない型に実装される- 問題ない型 = 全ての基礎型(
bool、i32など)とそれらのみから成る型 - なので
Pinがあっても、Unipnが実装されている場合はメモリの位置を移動できる -
UnpinはPinしてもピン留め外せるよってくらいのもの
- 問題ない型 = 全ての基礎型(
-
!Unpinはピン留めしたメモリの位置を移動できないように、型レベルで制限するためのTrait
- 以下の例では
Pinを使っているが、StringはUnpinを実装しているため、メモリ位置を変更できてしまうuse std::mem; use std::pin::Pin; fn main() { let mut string = "this".to_string(); let mut pinned_string = Pin::new(&mut string); mem::replace(&mut *pinned_string, "other".to_string()); println!("{}", *pinned_string); // other } - このような場合は
!Unpinを実装したPhantomPinned埋め込むことで、型レベルで制限する事ができる
次はエラーメッセージの例error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:49:20 | 49 | std::mem::swap(test1.get_mut(), test2.get_mut()); | ^^^^^ ------- required by a bound introduced by this call | | | within `Test`, the trait `Unpin` is not implemented for `PhantomPinned` -
- 以下の例では
Pinning and self-referential structs
メモリが変わると参照先が予想外の場所になる例
struct Test {
a: String,
b: *const String,
}
// ...
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b()); // a: test1, b: test1
std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", test2.a(), test2.b()); // a: test1, b: test2
}

Pinning to the stack
-
!Unpinを使って、スタックに構造体をピン留めする例 - 個人的に初見でわからなかった場所はコメントで追記
use std::marker::PhantomPinned;
use std::pin::Pin;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned, // This makes our type `!Unpin`
}
}
fn init<'a>(self: Pin<&'a mut Self>) {
let self_ptr: *const String = &self.a;
// NOTE: `self.get_mut()`を使わない理由
// 戻り値は`Unpin`を要求するので、`!Unpin`を実装しているオブジェクトの参照を取り出せない
// NOTE: `self.get_unchecked_mut()`がなぜ`unsafe`なのか
// `get_mut()`の戻り値は`&'a mut T: Unpin`だが、
// `get_unchecked_mut()`の戻り値は`&'a mut T: ?Sized`なので、
// `Unpin`または`!Unpin`のどちらなのかは型上だとわからない
// 戻り値の型は`&'a mut: Test`なので`!Unpin`でも`mem::swap()`などでアドレスが変わってしまう
// このように、安全じゃない操作ができてしまうという意図の`unsafe`
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let mut test1 = Test::new("test1");
// NOTE: `Pin::new(P)`を使わない理由
// 引数の型が`P: Deref<Target: Unpin>`のため、
// デリファレンスした値は`Unpin`を実装している必要があるため
// NOTE: `Pin::new_unchecked(P)`が`unsafe`の理由
// 引数の型が`P: Deref`のため、`!Unpin`なオブジェクトでも受け取れる
// 戻り値は`Pin<P>`なので、`!Unpin`か`Unpin`か型上ではわからない
// そのため、実態は`!Unpin`なオブジェクトでも`mem::swap()`などでアドレスが変わってしまう
// 意図しないアドレスの変更ができてしまう可能性があるので、`unsafe`になっている
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
// NOTE: `as_mut()`の挙動と使っている理由
// `Pin<&mut T>`をそのまま関数にわたすと、所有権が移動してしまうため
// `Pin<&mut T>`が保持している`&mut T`を包んだ新しい`Pin`を返すようにしている
// 要は`Clone`していると同等なことが起きている
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!(
"a: {}, b: {}",
Test::a(test1.as_ref()),
Test::b(test1.as_ref())
);
println!(
"a: {}, b: {}",
Test::a(test2.as_ref()),
Test::b(test2.as_ref())
);
}
Pinning to the heap
- スタックではなくヒープにピン留めをする例
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let mut test1 = Test::new("test1");
let mut test2 = Test::new("test2");
println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b());
}
Practical rules for Pinning
-
T: Upinの場合Pin<'a, T>は実質&'a mut Tと同様- なぜなら
Unpinは値の移動をOKとしているため
- なぜなら
-
T: !Unpinの場合、&mut Tを取得するにはunsafeな操作が必要- 言い換えれば、
!Unpinは値の移動をunsafeな操作でもしない限りできないということになる
- 言い換えれば、
- スタックへのピン留めはunsafeが必要
-
!Unpinを実装したオブジェクトをピン留めするにはunsafeなAPI(Pin::new_unchecked())を使う必要があるから
-
- ヒープへのピン留めはunsafeは不要
-
!Unpinを実装したオブジェクトをBoxで包むことで、Boxを無条件でPinできるAPI設計になっている
-
補足
よりPin詳細はこちらを参照
Our finished code
コードが長いので、リンク先参照
超雑にメモ書きでまとめる
処理の流れまとめ
-
reactor(Arc<Mutext<Box<reactor>>>) を生成- スレッドを生成し、タスクを起こすためのイベントループを実行
-
Event::Closeを受信した場合は、イベントループを終了 -
Event::Timeoutを受信した場合は、スレッドを生成して、受け取ったidのタスクを起こす
-
- NOTE
-
reactor自身はtasksを持っていて、これは各Futureが持っているTaskStateのスライス -
NotReady(Waker)の場合wakerを持っているので、そのwakerを使ってFutureを起こす-
FutureのPendingにはstd::thread::park()、 wake にはstd::thread::unpark()が使われている
-
-
- スレッドを生成し、タスクを起こすためのイベントループを実行
-
Futureを生成する-
fut1とfut2をまとめたmainfutを生成 - 各
async {}でTask(独自のFuture) を生成し、それにreactorの参照を渡している - NOTE:
-
Taskにreactorの参照を渡している理由-
Task(Future) はreactorに対して、タスクの状態を更新(作成、wake)するため
-
- これがないと
Taskを起こせないため
-
-
-
block_on()-
packerを生成-
packer(Arc<Packer>) はスレッドの条件停止/再開の役割を持つ -
Poll::Pendingのときはpackerを使ってスレッドを一時停止する
-
-
Wakerを生成-
packerを包んだwaker(Arc<MyWaker>) を生成 -
wakerは独自のMyWakerを定義していて、それをcore::task::wake::Wakerに変換したもの
-
-
Contextを生成-
wakerを包んだcore::task::wake::Contextを生成
-
-
futureをピン留め- 受け取った
Task(Future) をピン留めして、アドレスが変わらないようにする
- 受け取った
-
Future::poll()でFutureのイベントループを実行 -
Task::poll()を実行- ループするたびに
reactorのTaskStateを更新- 初回は
tasksにTaskState::NotReady(Waker)追加し、チャンネルを使ってreactorのイベントループに送信し、Pendingを返し、Futureのイベントループを停止 -
reactorがwake()する-
TaskState::NotReady(Waker)の場合はTaskState::Readyにしたあと、wake()して、Futureのイベントループを再開
-
-
TaskState::Readyの場合はTaskState::Finishedにして、Poll::Readyを返し、Futureのイベントループを抜ける - Ready: 値を返す
- Pending:
parker.park()でblock_onを動かしているスレッドを条件停止する-
parker最終的にTask(Future) 内でreactorにwakeしてもらうために使っている
-
- 初回は
- ループするたびに
-
データ構造
- Packer // スレッドの停止/再開の役割を持つ
- park() // スレッドが再開できるまで停止(pthreadを使っている)させる(CPUを使わないようにするため)
- unpark() // スレッドを再開させる
- MyWaker // 自作waker、packerを持つ
- packer
- Reactor // タスクを保持し、 wake する役割を持つ
- dispatcher // チャンネルの送信側
- handle // イベントループのスレッドのhandler
- tasks // タスクの状態を保持するマップ
- TaskState // タスクの状態
- Ready // 再開可能
- NotReady(Waker) // 再開待ち(wakerで起こしてもらう)
- Finished // 終了
- Task
- id // タスクのid、タスクを wake するときに使う
- reactor
- data // タスクの持つデータ(今回の例では数値)
このスクラップは2022/12/20にクローズされました