Closed6
「Futures Explained in 200 Lines of Rust」を読む
Futures Explained in 200 Lines of Rust
- Rust の Future について詳細な説明をしている小さなbook
- 内容が少し古いが、Future を理解する上で大事そうなことが書かれている
- このスクラップは読んだときのメモ
Futures in Rust
概要
Future は未来に完了する何らかの操作を表現した概念
フェーズ
Rust の async は poll ベースのアプローとをとっている、非同期タスクは3つのフェーズがある
- poll phase
- Future を poll し、タスクが進行不能になるまで進行させる
- Future を pull する部分は executor と呼ぶ
- wait phase
- Future がイベント(I/Oとか?)発生していることを登録し、イベントの準備ができたら Future を起こす
- wake phase
- Future を再度 poll するようにスケジュールする
- Future を poll して、再度タスクを進行させる
Leaf futures
- Non-leaf-futures のコードと比べると、
async
キーワードがあるかどうかの違いになっている - 恐らく、非同期で処理できる最小限のタスクのことを指していると思われる
- コード例を見ると、tcp connection を作る処理はアトミックであり、処理を中断することができないので、スレッドに逃しても stream を取得するため、結局待ちが発生して意味がない
// stream is a **leaf-future**
let mut stream = tokio::net::TcpStream::connect("127.0.0.1:3000");
Non leaf futures
-
aysnc
キーワードを使って executor が実行できるタスクを作成する Future のこと - 要は
async {}
のこと
// Non-leaf-future
let non_leaf = async {
let mut stream = TcpStream::connect("127.0.0.1:3000").await.unwrap();// <- yield
println!("connected!");
let result = stream.write(b"hello world\n").await; // <- yield
println!("message sent!");
...
};
Runtimes
- Rust で動作する非同期の仕組みで、以下のものが登場する
- Reactor
- Executor
- Future
- これら3つは
Waker
と呼ばれる共有されたオブジェクトで連携を行っている
雑な図を示すとこんな感じ(あっていなさそうなので、完全理解したらまた更新する)
- Executor が初めて Future を poll するとき、 Waker の参照クローンを渡す
- Reactor が Future の準備ができたと判断したら、Waker を通じて Future をスケジュールして、 Executor が Future を poll できるようにする
簡単にまとめると Future を実行するのが Runtime で、そのデファクトスタンダードが tokio。
他にもfuturesがある。
NOTE: futures で Future を実行するときの実装はここらへん。見た感じ
CPU or I/O が重いタスクを処理するとき、別スレッドに逃がすための手段として spawn_blocking()
のようなメソッドを提供している。
ちなみに、tokio はそれを提供していて、tokio::task::spawn_blocking()
は同期処理しかできないので、
更に非同期処理をしたい場合は tokio::spawn()
を使う必要がある。
Waker and Context
The Waker & Understanding the Waker
まとめると
- Waker は実際そのまま使われているのではなく、 Waker を包んだ Context が用意されている
- Waker の実装は Executor に依存するが、どの Waker も同じようなインターフェイスを持つ
- Waker は Trait のようなものだが、Trait として扱うと動的ディスパッチをする必要があり、パフォーマンスコストがかかる
- パフォーマンスコストをユーザが受け入れたとしても、
Arc<Box<dyn Waker>>
のようにしてヒープ割当を必要とすることになる
Fat pointers in Rust
簡単にまとめると
- トレイトオブジェクトのデータ構造は、データオブジェクトとそのオブジェクトに紐付けられている関数(要はメソッド)への参照を持つオブジェクト
- Waker より柔軟にメモリを管理できるようにするため、通常のトレイトオブジェクトを使わずに実装している
Generators and async/await
Why learn about generators?
- generator/yield と async/await は同等
- Rust が並行処理を行う方法を設定した際に、以下の3つを主に議論されたらしい
- stackfull coroutine(グリーンスレッド、いつでも中断/再開できる)
- stackless coroutine(ジェネレーター、中断/再開できるタイミングが限られる)
- combinator
- stackfull/stackless coroutine の違いの詳細についてはこちらを参照
Combinators
- Futures 0.1 ではコンビネーターを使っていた
let future = Connection::connect(conn_str).and_then(|conn| { conn.query("somerequest").map(|row|{ SomeStruct::from(row) }).collect::<Vec<SomeStruct>>() }); let rows: Result<Vec<SomeStruct>, SomeLibraryError> = block_on(future);
- コンビネーターだといくつかデメリットがある
- コンビネーター間での借用ができない
- コールバック数の数だけ、メモリ使用量が増える
- コールバックが増える = 関数の呼び出しが増える = キャプチャが増える
Stackless coroutines/generators
- 現在Rustが使っているモデル
- いくつかメリットがある
- async/await を stackless coroutine に変換するには簡単
- コンテキストスイッチやCPUのステートを保存・復元が必要ない
- 動的スタック確保が不要
- メモリ効率が良い
- 中断ポイントを跨いで借用ができる
async fn myfn() {
let text = String::from("Hello world");
let borrowed = &text[0..5];
somefuture.await;
println!("{}", borrowed);
}
How generators work
-
yield
キーワードを使うと、ジェネレーターに変換される - このコード中断ポイントの間で借用が発生するこれがステートマシンに変換すると次のように、ジェネレーターにステートを持たせることになる
let mut generator = move || { let to_borrow = String::from("Hello"); let borrowed = &to_borrow; yield borrowed.len(); println!("{} world!", borrowed); };
しかし、このやり方だとメモリが変化すると想定外の挙動になることがあるenum GeneratorA { Enter, Yield1 { to_borrow: String, borrowed: *const String, // NB! This is now a raw pointer! }, Exit, }
ので、Pin
を使ってメモリを固定する必要があるpub fn main() { let mut gen = GeneratorA::start(); let mut gen2 = GeneratorA::start(); if let GeneratorState::Yielded(n) = gen.resume() { println!("Got value {}", n); } std::mem::swap(&mut gen, &mut gen2); // <--- Big problem! if let GeneratorState::Yielded(n) = gen2.resume() { println!("Got value {}", n); } // This would now start gen2 since we swapped them. if let GeneratorState::Complete(()) = gen.resume() { () }; }
Async and generators
ジェネレーターとasyncとの関連性は次の通り
- async ブロックは Future を返すが、Future はジェネレーターのように動く
-
Generator::resume
の代わりに、Future::poll
を使う -
Yielded
の代わりにPending
、Complete
の代わりにReady
を返す - Futureの各awaitポイントはジェネレーターのYieldポイントのようなもの
Pin
より詳細なことはPinのdocを参照
簡潔にまとめると次の通り
-
Pin
はメモリの位置が変わらないことを保証する- 自己参照構造体を作るときに使う
- どうやって保証しているか?
-
Pin::new(P)
で渡せるP
はUnpin
を実装している必要があるimpl<P: Deref<Target: Unpin>> Pin<P> { // ... pub const fn new(pointer: P) -> Pin<P> { // ... } } // ... pub const fn get_mut(self) -> &'a mut T where T: Unpin, { // ... }
-
-
Unpin
はメモリの位置が変わっても問題ない型に実装される- 問題ない型 = 全ての基礎型(
bool
、i32
など)とそれらのみから成る型 - なので
Pin
があっても、Unipn
が実装されている場合はメモリの位置を移動できる -
Unpin
はPin
してもピン留め外せるよってくらいのもの
- 問題ない型 = 全ての基礎型(
-
!Unpinはピン留めしたメモリの位置を移動できないように、型レベルで制限するためのTrait
- 以下の例では
Pin
を使っているが、String
はUnpin
を実装しているため、メモリ位置を変更できてしまうuse std::mem; use std::pin::Pin; fn main() { let mut string = "this".to_string(); let mut pinned_string = Pin::new(&mut string); mem::replace(&mut *pinned_string, "other".to_string()); println!("{}", *pinned_string); // other }
- このような場合は
!Unpin
を実装したPhantomPinned埋め込むことで、型レベルで制限する事ができる
次はエラーメッセージの例error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:49:20 | 49 | std::mem::swap(test1.get_mut(), test2.get_mut()); | ^^^^^ ------- required by a bound introduced by this call | | | within `Test`, the trait `Unpin` is not implemented for `PhantomPinned` -
- 以下の例では
Pinning and self-referential structs
メモリが変わると参照先が予想外の場所になる例
struct Test {
a: String,
b: *const String,
}
// ...
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b()); // a: test1, b: test1
std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", test2.a(), test2.b()); // a: test1, b: test2
}
Pinning to the stack
-
!Unpin
を使って、スタックに構造体をピン留めする例 - 個人的に初見でわからなかった場所はコメントで追記
use std::marker::PhantomPinned;
use std::pin::Pin;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned, // This makes our type `!Unpin`
}
}
fn init<'a>(self: Pin<&'a mut Self>) {
let self_ptr: *const String = &self.a;
// NOTE: `self.get_mut()`を使わない理由
// 戻り値は`Unpin`を要求するので、`!Unpin`を実装しているオブジェクトの参照を取り出せない
// NOTE: `self.get_unchecked_mut()`がなぜ`unsafe`なのか
// `get_mut()`の戻り値は`&'a mut T: Unpin`だが、
// `get_unchecked_mut()`の戻り値は`&'a mut T: ?Sized`なので、
// `Unpin`または`!Unpin`のどちらなのかは型上だとわからない
// 戻り値の型は`&'a mut: Test`なので`!Unpin`でも`mem::swap()`などでアドレスが変わってしまう
// このように、安全じゃない操作ができてしまうという意図の`unsafe`
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let mut test1 = Test::new("test1");
// NOTE: `Pin::new(P)`を使わない理由
// 引数の型が`P: Deref<Target: Unpin>`のため、
// デリファレンスした値は`Unpin`を実装している必要があるため
// NOTE: `Pin::new_unchecked(P)`が`unsafe`の理由
// 引数の型が`P: Deref`のため、`!Unpin`なオブジェクトでも受け取れる
// 戻り値は`Pin<P>`なので、`!Unpin`か`Unpin`か型上ではわからない
// そのため、実態は`!Unpin`なオブジェクトでも`mem::swap()`などでアドレスが変わってしまう
// 意図しないアドレスの変更ができてしまう可能性があるので、`unsafe`になっている
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
// NOTE: `as_mut()`の挙動と使っている理由
// `Pin<&mut T>`をそのまま関数にわたすと、所有権が移動してしまうため
// `Pin<&mut T>`が保持している`&mut T`を包んだ新しい`Pin`を返すようにしている
// 要は`Clone`していると同等なことが起きている
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!(
"a: {}, b: {}",
Test::a(test1.as_ref()),
Test::b(test1.as_ref())
);
println!(
"a: {}, b: {}",
Test::a(test2.as_ref()),
Test::b(test2.as_ref())
);
}
Pinning to the heap
- スタックではなくヒープにピン留めをする例
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let mut test1 = Test::new("test1");
let mut test2 = Test::new("test2");
println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b());
}
Practical rules for Pinning
-
T: Upin
の場合Pin<'a, T>
は実質&'a mut T
と同様- なぜなら
Unpin
は値の移動をOKとしているため
- なぜなら
-
T: !Unpin
の場合、&mut T
を取得するにはunsafe
な操作が必要- 言い換えれば、
!Unpin
は値の移動をunsafe
な操作でもしない限りできないということになる
- 言い換えれば、
- スタックへのピン留めはunsafeが必要
-
!Unpin
を実装したオブジェクトをピン留めするにはunsafe
なAPI(Pin::new_unchecked()
)を使う必要があるから
-
- ヒープへのピン留めはunsafeは不要
-
!Unpin
を実装したオブジェクトをBox
で包むことで、Box
を無条件でPin
できるAPI設計になっている
-
補足
よりPin
詳細はこちらを参照
Our finished code
コードが長いので、リンク先参照
超雑にメモ書きでまとめる
処理の流れまとめ
-
reactor
(Arc<Mutext<Box<reactor>>>
) を生成- スレッドを生成し、タスクを起こすためのイベントループを実行
-
Event::Close
を受信した場合は、イベントループを終了 -
Event::Timeout
を受信した場合は、スレッドを生成して、受け取ったidのタスクを起こす
-
- NOTE
-
reactor
自身はtasks
を持っていて、これは各Future
が持っているTaskState
のスライス -
NotReady(Waker)
の場合waker
を持っているので、そのwaker
を使ってFuture
を起こす-
Future
のPending
にはstd::thread::park()
、 wake にはstd::thread::unpark()
が使われている
-
-
- スレッドを生成し、タスクを起こすためのイベントループを実行
-
Future
を生成する-
fut1
とfut2
をまとめたmainfut
を生成 - 各
async {}
でTask
(独自のFuture
) を生成し、それにreactor
の参照を渡している - NOTE:
-
Task
にreactor
の参照を渡している理由-
Task
(Future
) はreactor
に対して、タスクの状態を更新(作成、wake)するため
-
- これがないと
Task
を起こせないため
-
-
-
block_on()
-
packer
を生成-
packer
(Arc<Packer>
) はスレッドの条件停止/再開の役割を持つ -
Poll::Pending
のときはpacker
を使ってスレッドを一時停止する
-
-
Waker
を生成-
packer
を包んだwaker
(Arc<MyWaker>
) を生成 -
waker
は独自のMyWaker
を定義していて、それをcore::task::wake::Waker
に変換したもの
-
-
Context
を生成-
waker
を包んだcore::task::wake::Context
を生成
-
-
future
をピン留め- 受け取った
Task
(Future
) をピン留めして、アドレスが変わらないようにする
- 受け取った
-
Future::poll()
でFuture
のイベントループを実行 -
Task::poll()
を実行- ループするたびに
reactor
のTaskState
を更新- 初回は
tasks
にTaskState::NotReady(Waker)
追加し、チャンネルを使ってreactor
のイベントループに送信し、Pending
を返し、Future
のイベントループを停止 -
reactor
がwake()
する-
TaskState::NotReady(Waker)
の場合はTaskState::Ready
にしたあと、wake()
して、Future
のイベントループを再開
-
-
TaskState::Ready
の場合はTaskState::Finished
にして、Poll::Ready
を返し、Future
のイベントループを抜ける - Ready: 値を返す
- Pending:
parker.park()
でblock_on
を動かしているスレッドを条件停止する-
parker
最終的にTask
(Future
) 内でreactor
にwake
してもらうために使っている
-
- 初回は
- ループするたびに
-
データ構造
- Packer // スレッドの停止/再開の役割を持つ
- park() // スレッドが再開できるまで停止(pthreadを使っている)させる(CPUを使わないようにするため)
- unpark() // スレッドを再開させる
- MyWaker // 自作waker、packerを持つ
- packer
- Reactor // タスクを保持し、 wake する役割を持つ
- dispatcher // チャンネルの送信側
- handle // イベントループのスレッドのhandler
- tasks // タスクの状態を保持するマップ
- TaskState // タスクの状態
- Ready // 再開可能
- NotReady(Waker) // 再開待ち(wakerで起こしてもらう)
- Finished // 終了
- Task
- id // タスクのid、タスクを wake するときに使う
- reactor
- data // タスクの持つデータ(今回の例では数値)
このスクラップは2022/12/20にクローズされました