Closed6

「Futures Explained in 200 Lines of Rust」を読む

ゴリラ@完全週休7日の仕事くださいゴリラ@完全週休7日の仕事ください

Futures in Rust

概要

Future は未来に完了する何らかの操作を表現した概念

フェーズ

Rust の async は poll ベースのアプローとをとっている、非同期タスクは3つのフェーズがある

  • poll phase
    • Future を poll し、タスクが進行不能になるまで進行させる
    • Future を pull する部分は executor と呼ぶ
  • wait phase
    • Future がイベント(I/Oとか?)発生していることを登録し、イベントの準備ができたら Future を起こす
  • wake phase
    • Future を再度 poll するようにスケジュールする
    • Future を poll して、再度タスクを進行させる

Leaf futures

  • Non-leaf-futures のコードと比べると、async キーワードがあるかどうかの違いになっている
  • 恐らく、非同期で処理できる最小限のタスクのことを指していると思われる
  • コード例を見ると、tcp connection を作る処理はアトミックであり、処理を中断することができないので、スレッドに逃しても stream を取得するため、結局待ちが発生して意味がない
// stream is a **leaf-future**
let mut stream = tokio::net::TcpStream::connect("127.0.0.1:3000");

Non leaf futures

  • aysnc キーワードを使って executor が実行できるタスクを作成する Future のこと
  • 要は async {} のこと
// Non-leaf-future
let non_leaf = async {
    let mut stream = TcpStream::connect("127.0.0.1:3000").await.unwrap();// <- yield
    println!("connected!");
    let result = stream.write(b"hello world\n").await; // <- yield
    println!("message sent!");
    ...
};

Runtimes

  • Rust で動作する非同期の仕組みで、以下のものが登場する
    • Reactor
    • Executor
    • Future
  • これら3つは Waker と呼ばれる共有されたオブジェクトで連携を行っている
    雑な図を示すとこんな感じ(あっていなさそうなので、完全理解したらまた更新する)
    • Executor が初めて Future を poll するとき、 Waker の参照クローンを渡す
    • Reactor が Future の準備ができたと判断したら、Waker を通じて Future をスケジュールして、 Executor が Future を poll できるようにする

簡単にまとめると Future を実行するのが Runtime で、そのデファクトスタンダードが tokio
他にもfuturesがある。

NOTE: futures で Future を実行するときの実装はここらへん。見た感じ
https://github.com/rust-lang/futures-rs/blob/2acba12354cf7c7a00dc1253cff9a5827fed2d45/futures-executor/src/local_pool.rs#L80-L103

CPU or I/O が重いタスクを処理するとき、別スレッドに逃がすための手段として spawn_blocking() のようなメソッドを提供している。
ちなみに、tokio はそれを提供していて、tokio::task::spawn_blocking() は同期処理しかできないので、
更に非同期処理をしたい場合は tokio::spawn() を使う必要がある。

ゴリラ@完全週休7日の仕事くださいゴリラ@完全週休7日の仕事ください

Waker and Context

The Waker & Understanding the Waker

まとめると

  • Waker は実際そのまま使われているのではなく、 Waker を包んだ Context が用意されている
  • Waker の実装は Executor に依存するが、どの Waker も同じようなインターフェイスを持つ
  • Waker は Trait のようなものだが、Trait として扱うと動的ディスパッチをする必要があり、パフォーマンスコストがかかる
  • パフォーマンスコストをユーザが受け入れたとしても、Arc<Box<dyn Waker>>のようにしてヒープ割当を必要とすることになる

Fat pointers in Rust

簡単にまとめると

  • トレイトオブジェクトのデータ構造は、データオブジェクトとそのオブジェクトに紐付けられている関数(要はメソッド)への参照を持つオブジェクト
  • Waker より柔軟にメモリを管理できるようにするため、通常のトレイトオブジェクトを使わずに実装している
ゴリラ@完全週休7日の仕事くださいゴリラ@完全週休7日の仕事ください

Generators and async/await

Why learn about generators?

  • generator/yield と async/await は同等
  • Rust が並行処理を行う方法を設定した際に、以下の3つを主に議論されたらしい
    • stackfull coroutine(グリーンスレッド、いつでも中断/再開できる)
    • stackless coroutine(ジェネレーター、中断/再開できるタイミングが限られる)
    • combinator
  • stackfull/stackless coroutine の違いの詳細についてはこちらを参照

Combinators

  • Futures 0.1 ではコンビネーターを使っていた
    let future = Connection::connect(conn_str).and_then(|conn| {
        conn.query("somerequest").map(|row|{
            SomeStruct::from(row)
        }).collect::<Vec<SomeStruct>>()
    });
    
    let rows: Result<Vec<SomeStruct>, SomeLibraryError> = block_on(future);
    
  • コンビネーターだといくつかデメリットがある
    • コンビネーター間での借用ができない
    • コールバック数の数だけ、メモリ使用量が増える
      • コールバックが増える = 関数の呼び出しが増える = キャプチャが増える

Stackless coroutines/generators

  • 現在Rustが使っているモデル
  • いくつかメリットがある
    • async/await を stackless coroutine に変換するには簡単
    • コンテキストスイッチやCPUのステートを保存・復元が必要ない
    • 動的スタック確保が不要
    • メモリ効率が良い
    • 中断ポイントを跨いで借用ができる
async fn myfn() {
    let text = String::from("Hello world");
    let borrowed = &text[0..5];
    somefuture.await;
    println!("{}", borrowed);
}

How generators work

  • yieldキーワードを使うと、ジェネレーターに変換される
  • このコード中断ポイントの間で借用が発生する
    let mut generator = move || {
            let to_borrow = String::from("Hello");
            let borrowed = &to_borrow;
            yield borrowed.len();
            println!("{} world!", borrowed);
    };
    
    これがステートマシンに変換すると次のように、ジェネレーターにステートを持たせることになる
    enum GeneratorA {
        Enter,
        Yield1 {
            to_borrow: String,
            borrowed: *const String, // NB! This is now a raw pointer!
        },
        Exit,
    }
    
    しかし、このやり方だとメモリが変化すると想定外の挙動になることがある
    ので、Pinを使ってメモリを固定する必要がある
    pub fn main() {
        let mut gen = GeneratorA::start();
        let mut gen2 = GeneratorA::start();
    
        if let GeneratorState::Yielded(n) = gen.resume() {
            println!("Got value {}", n);
        }
    
        std::mem::swap(&mut gen, &mut gen2); // <--- Big problem!
    
        if let GeneratorState::Yielded(n) = gen2.resume() {
            println!("Got value {}", n);
        }
    
        // This would now start gen2 since we swapped them.
        if let GeneratorState::Complete(()) = gen.resume() {
            ()
        };
    }
    

Async and generators

ジェネレーターとasyncとの関連性は次の通り

  • async ブロックは Future を返すが、Future はジェネレーターのように動く
  • Generator::resumeの代わりに、Future::pollを使う
  • Yieldedの代わりにPendingCompleteの代わりにReadyを返す
  • Futureの各awaitポイントはジェネレーターのYieldポイントのようなもの
ゴリラ@完全週休7日の仕事くださいゴリラ@完全週休7日の仕事ください

Pin

より詳細なことはPinのdocを参照
簡潔にまとめると次の通り

  • Pinはメモリの位置が変わらないことを保証する
    • 自己参照構造体を作るときに使う
    • どうやって保証しているか?
      • Pin::new(P)で渡せるPUnpinを実装している必要がある
        impl<P: Deref<Target: Unpin>> Pin<P> {
            // ...
            pub const fn new(pointer: P) -> Pin<P> {
                // ...
            }
        }
        
        // ...
        
        pub const fn get_mut(self) -> &'a mut T
        where
            T: Unpin,
        {
            // ...
        }
        
  • Unpinはメモリの位置が変わっても問題ない型に実装される
    • 問題ない型 = 全ての基礎型(booli32など)とそれらのみから成る型
    • なのでPinがあっても、Unipnが実装されている場合はメモリの位置を移動できる
    • UnpinPinしてもピン留め外せるよってくらいのもの
  • !Unpinはピン留めしたメモリの位置を移動できないように、型レベルで制限するためのTrait
    • 以下の例ではPinを使っているが、StringUnpinを実装しているため、メモリ位置を変更できてしまう
      use std::mem;
      use std::pin::Pin;
      
      fn main() {
          let mut string = "this".to_string();
          let mut pinned_string = Pin::new(&mut string);
          
          mem::replace(&mut *pinned_string, "other".to_string());
          println!("{}", *pinned_string); // other
      }
      
    • このような場合は!Unpinを実装したPhantomPinned埋め込むことで、型レベルで制限する事ができる
      次はエラーメッセージの例
      error[E0277]: `PhantomPinned` cannot be unpinned
         --> src/main.rs:49:20
          |
      49  |     std::mem::swap(test1.get_mut(), test2.get_mut());
          |                    ^^^^^ ------- required by a bound introduced by this call
          |                    |
          |                    within `Test`, the trait `Unpin` is not implemented for `PhantomPinned`
          - 
      

Pinning and self-referential structs

メモリが変わると参照先が予想外の場所になる例

struct Test {
    a: String,
    b: *const String,
}

// ...

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b()); // a: test1, b: test1
    std::mem::swap(&mut test1, &mut test2);
    println!("a: {}, b: {}", test2.a(), test2.b()); // a: test1, b: test2
}

Pinning to the stack

  • !Unpinを使って、スタックに構造体をピン留めする例
  • 個人的に初見でわからなかった場所はコメントで追記
use std::marker::PhantomPinned;
use std::pin::Pin;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned, // This makes our type `!Unpin`
        }
    }
    fn init<'a>(self: Pin<&'a mut Self>) {
        let self_ptr: *const String = &self.a;

        // NOTE: `self.get_mut()`を使わない理由
        // 戻り値は`Unpin`を要求するので、`!Unpin`を実装しているオブジェクトの参照を取り出せない

        // NOTE: `self.get_unchecked_mut()`がなぜ`unsafe`なのか
        // `get_mut()`の戻り値は`&'a mut T: Unpin`だが、
        // `get_unchecked_mut()`の戻り値は`&'a mut T: ?Sized`なので、
        // `Unpin`または`!Unpin`のどちらなのかは型上だとわからない
        // 戻り値の型は`&'a mut: Test`なので`!Unpin`でも`mem::swap()`などでアドレスが変わってしまう
        // このように、安全じゃない操作ができてしまうという意図の`unsafe`
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a<'a>(self: Pin<&'a Self>) -> &'a str {
        &self.get_ref().a
    }

    fn b<'a>(self: Pin<&'a Self>) -> &'a String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let mut test1 = Test::new("test1");
    // NOTE: `Pin::new(P)`を使わない理由
    // 引数の型が`P: Deref<Target: Unpin>`のため、
    // デリファレンスした値は`Unpin`を実装している必要があるため

    // NOTE: `Pin::new_unchecked(P)`が`unsafe`の理由
    // 引数の型が`P: Deref`のため、`!Unpin`なオブジェクトでも受け取れる
    // 戻り値は`Pin<P>`なので、`!Unpin`か`Unpin`か型上ではわからない
    // そのため、実態は`!Unpin`なオブジェクトでも`mem::swap()`などでアドレスが変わってしまう
    // 意図しないアドレスの変更ができてしまう可能性があるので、`unsafe`になっている
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    // NOTE: `as_mut()`の挙動と使っている理由
    // `Pin<&mut T>`をそのまま関数にわたすと、所有権が移動してしまうため
    // `Pin<&mut T>`が保持している`&mut T`を包んだ新しい`Pin`を返すようにしている
    // 要は`Clone`していると同等なことが起きている
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!(
        "a: {}, b: {}",
        Test::a(test1.as_ref()),
        Test::b(test1.as_ref())
    );
    println!(
        "a: {}, b: {}",
        Test::a(test2.as_ref()),
        Test::b(test2.as_ref())
    );
}

Pinning to the heap

  • スタックではなくヒープにピン留めをする例
use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
        let t = Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        };
        let mut boxed = Box::pin(t);
        let self_ptr: *const String = &boxed.as_ref().a;
        unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };

        boxed
    }

    fn a<'a>(self: Pin<&'a Self>) -> &'a str {
        &self.get_ref().a
    }

    fn b<'a>(self: Pin<&'a Self>) -> &'a String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let mut test1 = Test::new("test1");
    let mut test2 = Test::new("test2");

    println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
    println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b());
}

Practical rules for Pinning

  • T: Upinの場合Pin<'a, T>は実質&'a mut Tと同様
    • なぜならUnpinは値の移動をOKとしているため
  • T: !Unpinの場合、&mut Tを取得するにはunsafeな操作が必要
    • 言い換えれば、!Unpinは値の移動をunsafeな操作でもしない限りできないということになる
  • スタックへのピン留めはunsafeが必要
    • !Unpinを実装したオブジェクトをピン留めするにはunsafeなAPI(Pin::new_unchecked())を使う必要があるから
  • ヒープへのピン留めはunsafeは不要
    • !Unpinを実装したオブジェクトをBoxで包むことで、Boxを無条件でPinできるAPI設計になっている

補足

よりPin詳細はこちらを参照

ゴリラ@完全週休7日の仕事くださいゴリラ@完全週休7日の仕事ください

Our finished code

コードが長いので、リンク先参照
超雑にメモ書きでまとめる

処理の流れまとめ

  • reactor(Arc<Mutext<Box<reactor>>>) を生成
    • スレッドを生成し、タスクを起こすためのイベントループを実行
      • Event::Closeを受信した場合は、イベントループを終了
      • Event::Timeoutを受信した場合は、スレッドを生成して、受け取ったidのタスクを起こす
    • NOTE
      • reactor 自身は tasks を持っていて、これは各 Future が持っている TaskState のスライス
      • NotReady(Waker) の場合 waker を持っているので、その waker を使って Future を起こす
        • FuturePending には std::thread::park()、 wake には std::thread::unpark() が使われている
  • Future を生成する
    • fut1fut2 をまとめた mainfut を生成
    • async {}Task(独自のFuture) を生成し、それに reactor の参照を渡している
    • NOTE:
      • Taskreactor の参照を渡している理由
        • Task(Future) は reactor に対して、タスクの状態を更新(作成、wake)するため
      • これがないと Task を起こせないため
  • block_on()
    • packer を生成
      • packer(Arc<Packer>) はスレッドの条件停止/再開の役割を持つ
      • Poll::Pendingのときは packer を使ってスレッドを一時停止する
    • Waker を生成
      • packer を包んだ waker(Arc<MyWaker>) を生成
      • waker は独自の MyWaker を定義していて、それを core::task::wake::Waker に変換したもの
    • Context を生成
      • waker を包んだ core::task::wake::Context を生成
    • future をピン留め
      • 受け取った Task(Future) をピン留めして、アドレスが変わらないようにする
    • Future::poll()Future のイベントループを実行
    • Task::poll() を実行
      • ループするたびに reactorTaskState を更新
        • 初回は tasksTaskState::NotReady(Waker) 追加し、チャンネルを使って reactor のイベントループに送信し、Pendingを返し、Futureのイベントループを停止
        • reactorwake()する
          • TaskState::NotReady(Waker)の場合はTaskState::Readyにしたあと、wake()して、Futureのイベントループを再開
        • TaskState::Readyの場合はTaskState::Finishedにして、Poll::Readyを返し、Futureのイベントループを抜ける
        • Ready: 値を返す
        • Pending: parker.park()block_on を動かしているスレッドを条件停止する
          • parker 最終的に Task(Future) 内で reactorwake してもらうために使っている

データ構造

  • Packer // スレッドの停止/再開の役割を持つ
    • park() // スレッドが再開できるまで停止(pthreadを使っている)させる(CPUを使わないようにするため)
    • unpark() // スレッドを再開させる
  • MyWaker // 自作waker、packerを持つ
    • packer
  • Reactor // タスクを保持し、 wake する役割を持つ
    • dispatcher // チャンネルの送信側
    • handle // イベントループのスレッドのhandler
    • tasks // タスクの状態を保持するマップ
  • TaskState // タスクの状態
    • Ready // 再開可能
    • NotReady(Waker) // 再開待ち(wakerで起こしてもらう)
    • Finished // 終了
  • Task
    • id // タスクのid、タスクを wake するときに使う
    • reactor
    • data // タスクの持つデータ(今回の例では数値)
このスクラップは2022/12/20にクローズされました