Open8

手探りでFutureの理解を深める

Yoshiki KudoYoshiki Kudo

rustc: stable 1.48.0 (2020-11-16)
以下、検証しきれていない部分が多々あるが、自分の理解深める用のメモとして書き連ねていく。


Futureの定義

// std::future::Future

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Yoshiki KudoYoshiki Kudo

Pin : ムーブしても内部保持のポインタのアドレスが変わらない。自己参照構造体を定義するために使う。

// std::pin::Pin

pub struct Pin<P> {
    pointer: P,
}

Unpin : Pinによるピン留めした後でもムーブできる (自動トレイト)
!Unpin : Pinによるピン留めした後にムーブできない

pub auto trait Unpin {}

参考) https://tech-blog.optim.co.jp/entry/2020/03/05/160000

Yoshiki KudoYoshiki Kudo

https://keens.github.io/blog/2019/07/07/rustnofuturetosonorunnerwotsukuttemita/

記事にならって、即時に値を返すFutureであるReturnFutureを作ってみる。
値をムーブして返すからOptionで包んでやる。

struct ReturnFuture<T>(Option<T>);

impl<T> ReturnFuture<T> {
    fn new(x: T) -> Self {
        ReturnFuture(Some(x))
    }
}

impl<T: Unpin> Future for ReturnFuture<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let elem = self
            .get_mut()
            .0
            .take()
            .expect("");

        Poll::Ready(elem)
    }
}

T: UnpinなのでPin::get_mutで中身の可変参照を取得。T: Unpinを満たさない場合はunsafeだけどPin::get_unchecked_mutで取得。

これでReturnFutureFutureとなったので、tokio等のruntimeを使えば実際に使うことができる。

#[tokio::main]
async fn main() {
    let val = ReturnFuture::new(1);
    let val = tokio::spawn(async {
        val.await
    }).await.unwrap();
    println!("val: {}", val); // => 1
}
Yoshiki KudoYoshiki Kudo

tokioasync-std等のruntime使わない場合、Contextを生成しFuture::pollを呼び出してやる必要がある。まずはContextの定義を見ていく。

impl<'a> Context {
   fn from_waker(waker: &'a Waker) -> Context<'a>;
}

impl Waker {
    unsafe fn from_raw(waker: RawWaker) -> Waker;
}

impl RawWaker {
    const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker;
}

impl RawWakerVTable {
    const fn new(
        clone: unsafe fn(*const ()) -> RawWaker,
        wake: unsafe fn(*const ()),
        wake_by_ref: unsafe fn(*const ()),
        drop: unsafe fn(*const ())
    ) -> RawWakerVTable;
}

Context作るためにWakerを作る。
Wakerを作るためにはRawWakerを作る。
RawWakerを作るためにはRawWakerVTableを作る。

Yoshiki KudoYoshiki Kudo

RawWakerVTableはその名の通りvtableで、cloneClone::cloneとして使われ、dropDrop::dropとして使われる。wakewake_by_refWakerに同じ名前のメソッドが定義されているのでそれとして使われる。

ひとまず上っ面だけ作ってみる。

unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
    todo!()
}
unsafe fn unsafe_wake(this: *const ()) {}
unsafe fn unsafe_wake_by_ref(this: *const ()) {}
unsafe fn unsafe_drop(this: *const ()) {}

static SPIN_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
    unsafe_clone,
    unsafe_wake,
    unsafe_wake_by_ref,
    unsafe_drop,
);

*const ()RawWakerに渡すraw pointerで、新しく定義したSpinWakerを使うことにする。

#[derive(Debug, Clone)]
struct SpinWaker;

impl SpinWaker {
    fn new() -> Self {
        SpinWaker
    }

    fn waker() -> Waker {
        unsafe { Waker::from_raw(Self::new().into_raw_waker()) }
    }

    fn into_raw_waker(self) -> RawWaker {
        let myself = Box::new(self);
        let ptr = Box::into_raw(myself) as *const ();
        RawWaker::new(ptr, &SPIN_WAKER_VTABLE)
    }
}

raw pointer を取得するためにBox::into_raw を用いる。

ドキュメントには

Consumes the Box, returning a wrapped raw pointer.

After calling this function, the caller is responsible for the memory previously managed by the Box. In particular, the caller should properly destroy T and release the memory, taking into account the memory layout used by Box. The easiest way to do this is to convert the raw pointer back into a Box with the Box::from_raw function, allowing the Box destructor to perform the cleanup.

とある。終了後にunsafe_dropが呼ばれるのでそこでBox::from_raw使って開放するようにする。すると、unsafe_dropは以下のように修正する。

unsafe fn unsafe_drop(this: *const ()) {
    let ptr = this as *mut SpinWaker;
    Box::from_raw(ptr);
}

ついでにunsafe_cloneも実装しておく。といってもSpinWaker::into_raw_wakerを使うだけ。

unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
    let ptr = this as *const SpinWaker;
    let waker = ptr
        .as_ref()
        .unwrap()
        .clone();

    Box::new(waker).into_raw_waker()
}

ここまででSpinWakerからWakerが生成できる。
SpinWaker
*const ()としてraw pointerを生成
RawWaker
Waker

このWakerからContextを生成すればFuture::pollが実行できる。

Yoshiki KudoYoshiki Kudo

あとは実行するだけ。

fn run<F>(mut fut: Pint<F>) -> <<F as Deref>::Target as Future>::Output
where
    F: DerefMut,
    <F as Deref>::Target: Future
{
    let waker = SpinWaker::waker();
    let mut ctx = Context::from_waker(&waker);

    loop {
        match fut.as_mut().poll(&mut ctx) {
            Poll::Ready(x) => {
                return x
            }
            Poll::Pending => continue
        }
    }
}

fn main() {
    let val = ReturnFuture::new(1u8); // ReturnFuture<u8>
    let mut val = Pin::from(Box::new(val)); // Pin<Box<ReturnFuture<u8>>>
    let val = run(val);
    println!("{}", val); // 1
}
Yoshiki KudoYoshiki Kudo

次はfuturesを触ってみてみる。

https://github.com/rust-lang/futures-rs

futures::futureに定義されている関数から見ていく。
まずはfutures::future::join

pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>where
    Fut1: Future,
    Fut2: Future, 

2つのFutureを平行に走らせて両方が終わるまで待つFutureを生成する。
こんな感じで使う。

#[tokio::main]
async fn main() {
    let a = async { 1 };
    let b = async { 2 };

    let (a, b) = futures::future::join(a, b).await;
    println!("a: {}, b: {}", a, b);
}

// a: 1, b: 2

tokio::time::sleep使ってタイミングをずらすとわかりやすい。

use futures::prelude::*;
use tokio::time::{Duration, sleep};

#[tokio::main]
async fn main() {
    let a = async {
        println!("called a");
        sleep(Duration::from_secs(3)).await;
        println!("wake up a");
        1u8
    };
    let b = async { 
        println!("called b");
        sleep(Duration::from_secs(1)).await;
        println!("wake up b");
        2u8
    };

    let (a, b) = futures::future::join(a, b).await;
    println!("a: {}, b: {}", a, b);
}

// called a
// called b
// <-----------wait 1 sec
// wake up b
// <-----------wait 2 sec
// wake up a
// a: 1, b: 2
Yoshiki KudoYoshiki Kudo

次はfutures::future::select

pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>where
    B: Future + Unpin,
    A: Future + Unpin, 

future1とfuture2で早く終わったら処理を止めるFutureを返す。
処理を止めると言っても終わってない方のFutureも返ってくるので再度処理可能。

複数回走る可能性があるのか調べたくて以下コードで確認してみたけど1度しか走っていなそう。

use std::sync::Arc;
use futures::{future::Either, prelude::*};
use tokio::{sync::Mutex, time::{Duration, sleep}};

#[tokio::main]
async fn main() {
    let data1 = Arc::new(Mutex::new(0));
    let data2 = Arc::clone(&data1);

    let a = async {
        let mut lock = data2.lock().await;
        *lock += 1;

        sleep(Duration::from_secs(2)).await;
        1
    };
    let b = async {
        2
    };

    futures::pin_mut!(a);
    futures::pin_mut!(b);

    let val = match futures::future::select(a, b).await {
        Either::Left((val1, _)) => {
            // aが先に終わった場合
            val1
        }
        Either::Right((val2, a)) => {
            // bが先に終わった場合
            let sum = a
                .map(|x| x + val2)
                .await;

            sum
        }
    };
    
    println!("val: {}", val);
    println!("data: {:?}", data1);
}

// val: 3
// data: Mutex { data: 1 }