手探りでFutureの理解を深める
rustc: stable 1.48.0 (2020-11-16)
以下、検証しきれていない部分が多々あるが、自分の理解深める用のメモとして書き連ねていく。
Futureの定義
// std::future::Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Pin : ムーブしても内部保持のポインタのアドレスが変わらない。自己参照構造体を定義するために使う。
// std::pin::Pin
pub struct Pin<P> {
pointer: P,
}
Unpin : Pinによるピン留めした後でもムーブできる (自動トレイト)
!Unpin : Pinによるピン留めした後にムーブできない
pub auto trait Unpin {}
記事にならって、即時に値を返すFuture
であるReturnFuture
を作ってみる。
値をムーブして返すからOptionで包んでやる。
struct ReturnFuture<T>(Option<T>);
impl<T> ReturnFuture<T> {
fn new(x: T) -> Self {
ReturnFuture(Some(x))
}
}
impl<T: Unpin> Future for ReturnFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let elem = self
.get_mut()
.0
.take()
.expect("");
Poll::Ready(elem)
}
}
T: Unpin
なのでPin::get_mut
で中身の可変参照を取得。T: Unpin
を満たさない場合はunsafeだけどPin::get_unchecked_mut
で取得。
これでReturnFuture
はFuture
となったので、tokio
等のruntimeを使えば実際に使うことができる。
#[tokio::main]
async fn main() {
let val = ReturnFuture::new(1);
let val = tokio::spawn(async {
val.await
}).await.unwrap();
println!("val: {}", val); // => 1
}
tokio
やasync-std
等のruntime使わない場合、Context
を生成しFuture::poll
を呼び出してやる必要がある。まずはContext
の定義を見ていく。
impl<'a> Context {
fn from_waker(waker: &'a Waker) -> Context<'a>;
}
impl Waker {
unsafe fn from_raw(waker: RawWaker) -> Waker;
}
impl RawWaker {
const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker;
}
impl RawWakerVTable {
const fn new(
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ())
) -> RawWakerVTable;
}
Context
作るためにWaker
を作る。
Waker
を作るためにはRawWaker
を作る。
RawWaker
を作るためにはRawWakerVTable
を作る。
RawWakerVTable
はその名の通りvtableで、clone
はClone::clone
として使われ、drop
はDrop::drop
として使われる。wake
とwake_by_ref
はWaker
に同じ名前のメソッドが定義されているのでそれとして使われる。
ひとまず上っ面だけ作ってみる。
unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
todo!()
}
unsafe fn unsafe_wake(this: *const ()) {}
unsafe fn unsafe_wake_by_ref(this: *const ()) {}
unsafe fn unsafe_drop(this: *const ()) {}
static SPIN_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
unsafe_clone,
unsafe_wake,
unsafe_wake_by_ref,
unsafe_drop,
);
*const ()
はRawWaker
に渡すraw pointerで、新しく定義したSpinWaker
を使うことにする。
#[derive(Debug, Clone)]
struct SpinWaker;
impl SpinWaker {
fn new() -> Self {
SpinWaker
}
fn waker() -> Waker {
unsafe { Waker::from_raw(Self::new().into_raw_waker()) }
}
fn into_raw_waker(self) -> RawWaker {
let myself = Box::new(self);
let ptr = Box::into_raw(myself) as *const ();
RawWaker::new(ptr, &SPIN_WAKER_VTABLE)
}
}
raw pointer を取得するためにBox::into_raw
を用いる。
ドキュメントには
Consumes the Box, returning a wrapped raw pointer.
After calling this function, the caller is responsible for the memory previously managed by the Box. In particular, the caller should properly destroy T and release the memory, taking into account the memory layout used by Box. The easiest way to do this is to convert the raw pointer back into a Box with the Box::from_raw function, allowing the Box destructor to perform the cleanup.
とある。終了後にunsafe_drop
が呼ばれるのでそこでBox::from_raw
使って開放するようにする。すると、unsafe_drop
は以下のように修正する。
unsafe fn unsafe_drop(this: *const ()) {
let ptr = this as *mut SpinWaker;
Box::from_raw(ptr);
}
ついでにunsafe_clone
も実装しておく。といってもSpinWaker::into_raw_waker
を使うだけ。
unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
let ptr = this as *const SpinWaker;
let waker = ptr
.as_ref()
.unwrap()
.clone();
Box::new(waker).into_raw_waker()
}
ここまででSpinWaker
からWaker
が生成できる。
SpinWaker
→*const ()
としてraw pointerを生成
→RawWaker
→Waker
このWaker
からContext
を生成すればFuture::poll
が実行できる。
あとは実行するだけ。
fn run<F>(mut fut: Pint<F>) -> <<F as Deref>::Target as Future>::Output
where
F: DerefMut,
<F as Deref>::Target: Future
{
let waker = SpinWaker::waker();
let mut ctx = Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut ctx) {
Poll::Ready(x) => {
return x
}
Poll::Pending => continue
}
}
}
fn main() {
let val = ReturnFuture::new(1u8); // ReturnFuture<u8>
let mut val = Pin::from(Box::new(val)); // Pin<Box<ReturnFuture<u8>>>
let val = run(val);
println!("{}", val); // 1
}
次はfutures
を触ってみてみる。
futures::future
に定義されている関数から見ていく。
まずはfutures::future::join
pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>ⓘ
where
Fut1: Future,
Fut2: Future,
2つのFuture
を平行に走らせて両方が終わるまで待つFuture
を生成する。
こんな感じで使う。
#[tokio::main]
async fn main() {
let a = async { 1 };
let b = async { 2 };
let (a, b) = futures::future::join(a, b).await;
println!("a: {}, b: {}", a, b);
}
// a: 1, b: 2
tokio::time::sleep
使ってタイミングをずらすとわかりやすい。
use futures::prelude::*;
use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() {
let a = async {
println!("called a");
sleep(Duration::from_secs(3)).await;
println!("wake up a");
1u8
};
let b = async {
println!("called b");
sleep(Duration::from_secs(1)).await;
println!("wake up b");
2u8
};
let (a, b) = futures::future::join(a, b).await;
println!("a: {}, b: {}", a, b);
}
// called a
// called b
// <-----------wait 1 sec
// wake up b
// <-----------wait 2 sec
// wake up a
// a: 1, b: 2
次はfutures::future::select
pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>ⓘ
where
B: Future + Unpin,
A: Future + Unpin,
future1とfuture2で早く終わったら処理を止めるFuture
を返す。
処理を止めると言っても終わってない方のFuture
も返ってくるので再度処理可能。
複数回走る可能性があるのか調べたくて以下コードで確認してみたけど1度しか走っていなそう。
use std::sync::Arc;
use futures::{future::Either, prelude::*};
use tokio::{sync::Mutex, time::{Duration, sleep}};
#[tokio::main]
async fn main() {
let data1 = Arc::new(Mutex::new(0));
let data2 = Arc::clone(&data1);
let a = async {
let mut lock = data2.lock().await;
*lock += 1;
sleep(Duration::from_secs(2)).await;
1
};
let b = async {
2
};
futures::pin_mut!(a);
futures::pin_mut!(b);
let val = match futures::future::select(a, b).await {
Either::Left((val1, _)) => {
// aが先に終わった場合
val1
}
Either::Right((val2, a)) => {
// bが先に終わった場合
let sum = a
.map(|x| x + val2)
.await;
sum
}
};
println!("val: {}", val);
println!("data: {:?}", data1);
}
// val: 3
// data: Mutex { data: 1 }