🎮

RustでWasmのゲーム用シングルスレッド非同期ランタイムを作成する

2022/09/03に公開

はじめに

この記事ではWasmのゲーム用を想定したrequestAnimationFrame駆動/シングルスレッドのRust非同期ランタイムを作ります。

ゲームとasync/await

複数フレームにまたがる処理とasync/await

ゲームを扱う上でよく必要になる複数フレームにまたがる処理というのを考えてみます。
これはasync/awaitがとてもうまく使える場面です。

ゲーム分野のasync/awaitはUniTaskの事例が参考になります。
UniTaskを利用することで、複数フレームにまたがる処理をまっすぐ書くことができるようになりました。

例えば、簡単な例で「メッセージウィンドウをアニメーションで表示し、決定ボタンを押すまで待機し、ボタンが押されたらアニメーションでウィンドウを閉じる」という処理を書くとします。
async/awaitを使わないでRustで書いてみると次のようになります。

enum Phase {
  WindowOpenAnimation,
  WaitKey,
  WindowCloseAnimation,
  End,
}

struct State {
  phase: Phase,
}
impl State {
  fn update(&mut self) {
    match self.phase {
      Phase::WindowOpenAnimation => {
        // アニメーション処理
        update_window_open_animation();

        if is_open_animation_finished() {
          self.phase = Phase::WaitKey;
        }
      }
      Phase::WaitKey => {
        // キー入力待ち
        if is_key_pressed() {
          self.phase = Phase::WindowCloseAnimation;
        }
      }
      Phase::WindowCloseAnimation => {
        // アニメーション処理
        update_close_animation();

        if is_close_animation_finished() {
          self.phase = Phase::End;
        }
      }
    }
  }
}

現在のフェーズを保持してupdateでの処理を分岐しています。

async/awaitを用いて書くイメージは次のとおりです。

async fn update() {
  // アニメーション処理
  window_open_animation().await;

  // キー入力待ち
  wait_key().await;

  // アニメーション処理
  window_close_animation().await;
}

async/awaitを使うことで直列に書くことができ、処理の流れが分かりやすくなりました。

フレームアニメーションベースの非同期処理

先程のasync/awaitを利用した例のwindow_open_animationの中身も想像してみましょう。
async/awaitを利用しない場合で使っているupdate_window_open_animationis_open_animation_finishedを使って書くと次のようになります。

async fn window_open_animation() {
  while !is_open_animation_finished() {
    update_window_open_animation();
    runtime::next_frame().await;
  }
}

ここで注目してほしいのがruntime::next_frame().awaitです。
非同期ランタイムがゲームループと協調して動くのに重要な役割を果たしています。

既存の非同期ランタイムとゲーム用途

Rustにはすでに素晴らしい非同期ランタイムとしてtokio、あるいは次点でasync-stdが存在します。
また、Wasmで利用する場合、wasm-bindgen-futuresが使えるでしょう。

しかし、これらのランタイムはゲームループと協調して動くように設計されていません。
例えばファイルIOなどの処理を非同期で扱うような場合にはtokioなどを使えば良いでしょうが、ゲーム用途となると上記特徴から扱いが難しいです。

Wasm用となるとtokioはそのままでは現在は動きません。
シングルスレッド前提のランタイムが必要になります。

では、早速そのようなランタイムを作ってみましょう。

ランタイムを作る

作成したランタイムのソースコードはこちらにあります。

Future

非同期ランタイムのキーとなるtraitがFutureです。

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

このFutureのpollを適時呼び出すことで非同期処理を進めていきます。

poll()を実行して、その結果Poll::Ready(T)が返ってきたら非同期処理が完了したことになります。
Poll::Pendingが帰ってきた場合は、非同期処理が完了するまで待つ必要があります。

非同期処理が完了するとContextの中のwakerのwakeが呼び出されます。
wakeが呼び出されるまでFutureを保持しておいて、wakeが呼び出されたらpollを実行して先へ進める、というのを繰り返していくことになります。

ランタイムの状態

ランタイムが保持する状態について説明します。

今回作成する非同期ランタイムは、非同期タスクを2つのコンテナで保持します。
一つはrunning_queueで、現在実行中のタスクを保持します。
もう一つはwaiting_queueで、このフレームでは実行しない中断中のタスクを表現します。

running_queueの中身にあるのを順次pollしていき、Poll::Pendingが返ってきたらwaiting_queueに移動します。
この中断中のqueueに移動することをParkすると表現します。

また、今回の非同期ランタイムではnext_frameの対応のために、新しいフレームが始まるときに起動するwakerを登録する口を用意しています。

最後にタスクの識別のためのtask_counterを保持します。

pub(crate) struct Shared {
    pub(crate) frame_change_wakers: Vec<Waker>,
    pub(crate) running_queue: VecDeque<Rc<RefCell<Task>>>,
    pub(crate) waiting_queue: BTreeMap<usize, Rc<RefCell<Task>>>,
    task_counter: usize,
}

#[derive(Clone)]
pub struct Runtime {
    pub(crate) shared: Rc<RefCell<Shared>>,
}

ランタイムのruntime_updateの実装

ランタイムは毎フレームrequestAnimationFrameでruntime_updateを呼び出すことで勧めていきます。
そのruntime_updateの処理を下に全部貼り付けます。

impl Runtime {
    pub fn runtime_update(&self) {
        let wakers = self
            .shared
            .borrow_mut()
            .frame_change_wakers
            .drain(..)
            .collect::<Vec<_>>();
        for w in wakers {
            w.wake_by_ref();
        }

        'current_frame: loop {
            let task = self.shared.borrow_mut().running_queue.pop_front();

            match task {
                None => break 'current_frame,
                Some(task) => {
                    let id = {
                        let id = self.shared.borrow().task_counter;
                        self.shared.borrow_mut().task_counter += 1;
                        id
                    };

                    let waker = TaskWaker::waker(self.clone(), id);
                    let mut cx = Context::from_waker(&waker);

                    let result = task.borrow_mut().poll(&mut cx);
                    match result {
                      Poll::Ready(()) => {
                          // taskの完了をJoinHandleに通知する
                            task.borrow_mut()
                                .callback_waker
                                .iter()
                                .for_each(|w| w.wake_by_ref());
                        }
                        Poll::Pending => {
                            self.shared.borrow_mut().waiting_queue.insert(id, task);
                        }
                    }
                }
            }
        }
    }
}

最初にframe_change_wakersに登録してあるWakerを全部wakeします。

その次に現在のフレームで処理すべきタスクを'current_frameループで回します。

ループの中ではrunning_queueからタスクを取り出して、タスクが存在しない場合はループを打ち切っています。

実行中のタスクが存在する場合、このタスクのidを取得しています。
そしてタスクの実行が完了したかどうかのフラグも用意してTaskWakerを作成し、そこからContextを作成してpoll(&mut cx)を呼び出しています。

poll(&mut cx)した結果がReadyならば、taskに登録されているwakerを呼び出します。
このtaskに登録されているwakerと言うのは、spawnが返すJoinHandleをwakeするためのものです。

poll(&mut cx)した結果がPendingならばTaskをwaiting_queueidをキーとして入れています。

あとはrunning_queueにあるタスクがなくなるまでループを繰り返してこのフレームの処理は終わりです。

spawnの実装

非同期処理の起点となるRuntime::spawnの処理を貼ります。

impl Runtime {
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + 'static,
        F::Output: 'static,
    {
        let (task, handle) = joinable(future);
        self.shared.borrow_mut().running_queue.push_back(task);
        handle
    }
}

futureをjoinableという関数でtaskとhandleに分割し、running_queueにtaskを登録しています。

JoinHandleの実装

次にJoinHandleの実装を貼ります。

handle.rs
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

use super::task::Task;

pub struct JoinHandle<T> {
    pub(super) value: Rc<RefCell<Option<T>>>,
    pub(super) task: Rc<RefCell<Task>>,
}
impl<T> Future for JoinHandle<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Some(val) = self.value.borrow_mut().take() {
            Poll::Ready(val)
        } else {
            let waker = cx.waker().clone();
            self.task.borrow_mut().register_callback(waker);
            Poll::Pending
        }
    }
}

JoinHandleはFutureを実装しています。

JoinHandleは返り値であるvalueをOptionで保持しています。
pollした時点でvalueが保持されていたらPoll::Ready(T)のその値を入れます。
valueが保持されていなかったら、taskWakerを登録してPoll::Pendingを返します。

このtaskに登録されたwakerが、先のランタイムでpollしてtaskがreadyだった場合に呼び出していたものです。

Taskの実装

次にTaskの実装を貼ります。

task.rs
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use crate::handle::JoinHandle;

pub struct Task {
    future: Pin<Box<dyn Future<Output = ()>>>,
    pub(super) callback_waker: Option<Waker>,
}
impl Task {
    pub(super) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        Future::poll(self.future.as_mut(), cx)
    }

    pub(super) fn register_callback(&mut self, waker: Waker) {
        self.callback_waker = Some(waker);
    }
}

pub(crate) fn joinable<F>(future: F) -> (Rc<RefCell<Task>>, JoinHandle<F::Output>)
where
    F: Future + 'static,
    F::Output: 'static,
{
    let value = Rc::new(RefCell::new(None));

    let task = {
        let value = Rc::clone(&value);
        Rc::new(RefCell::new(Task {
            future: Box::pin(async move {
                let output = future.await;
                let mut value = value.borrow_mut();
                *value = Some(output);
            }),
            callback_waker: None,
        }))
    };

    let handle = JoinHandle {
        value,
        task: Rc::clone(&task),
    };

    (task, handle)
}

TaskFutureではなく、Futureをくるんだものとなっています。
FutureがOutputを持つのに対して、TaskOutput=()なFutureを保持しています。

Taskpollはそのまま内部のfuturepollに移譲しています。
Taskの内部のfutureは、joinableに渡されたfutureをawaitしてその値をhandlevalueにセットするfutureとなっています。

先に述べた通り、Taskはspawnが完了したことを通知するためのwakerを保持しています。

TaskWakerの実装

ここが今回一番トリッキーな実装です。

task_waker.rsuse std
use std::rc::Rc;
use std::task::{RawWaker, RawWakerVTable, Waker};

use crate::runtime::Runtime;

pub(crate) struct TaskWaker {
    runtime: Runtime,
    task_id: usize,
}
impl TaskWaker {
    pub(crate) fn waker(runtime: Runtime, task_id: usize) -> Waker {
        let waker = Rc::new(TaskWaker { runtime, task_id });
        unsafe { Waker::from_raw(Self::into_raw_waker(waker)) }
    }

    pub(crate) fn wake_by_ref(this: &Rc<Self>) {
        // // waiting queueから取り出してrunning queueに入れる
        let task = this
            .runtime
            .shared
            .borrow_mut()
            .waiting_queue
            .remove(&this.task_id);
        if let Some(task) = task {
            this.runtime
                .shared
                .borrow_mut()
                .running_queue
                .push_back(task);
        }
    }

    // RawWakerを作る。
    // WakerはSync + Sendを要求するが、wasmではシングルスレッドなので、
    // Rcのポインタだけで良いということにする。
    unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
        unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const TaskWaker));
            TaskWaker::into_raw_waker((*ptr).clone())
        }

        unsafe fn raw_wake(ptr: *const ()) {
            let ptr = Rc::from_raw(ptr as *const TaskWaker);
            TaskWaker::wake_by_ref(&ptr);
        }

        unsafe fn raw_wake_by_ref(ptr: *const ()) {
            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const TaskWaker));
            TaskWaker::wake_by_ref(&ptr);
        }

        unsafe fn raw_drop(ptr: *const ()) {
            drop(Rc::from_raw(ptr as *const TaskWaker));
        }

        const VTABLE: RawWakerVTable =
            RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);

        RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
    }
}

Wakerは常に別のスレッドから取得され呼び出される可能性があるので、その実装にはSync + Sendを要求します。

このようなSync + Sendを実装するのにArcでくるむ形のWakerを実装するには簡単にはArcWakerというものが用意されていて使えます。
しかし、WasmではArcのようなマルチスレッド用の構造体は使えません。

その場合はunsafeを利用して注意深く実装する必要があります。
unsafeで実装しているのはinto_raw_wakerというメソッドです。
Rcからポインタを取り出して、VTABLEとセットにしてRawWakerを作っています。

wakeされたときの処理はwake_by_refというメソッドに実装しています。
この処理ではランタイムのwaiting_queueからタスクを取り出してrunning_queueに入れることを行っています。

next_frameの実装

Runtimenext_frameの実装は次のとおりです。

impl Runtime {
    pub fn next_frame(&self) -> impl Future<Output = ()> {
        crate::next_frame::NextFrameFuture::new(self.clone())
    }
}

実態はNextFrameFutureです。
NextFrameFutureの実装は次のとおりです。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::runtime::Runtime;

pub struct NextFrameFuture {
    called: bool,
    runtime: Runtime,
}
impl NextFrameFuture {
    pub(crate) fn new(runtime: Runtime) -> Self {
        Self {
            called: false,
            runtime,
        }
    }
}
impl Future for NextFrameFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.called {
            Poll::Ready(())
        } else {
            self.called = true;
            let waker = cx.waker().clone();
            self.runtime
                .shared
                .borrow_mut()
                .frame_change_wakers
                .push(waker);
            Poll::Pending
        }
    }
}

まだ一度もpollを呼ばれていない場合は、frame_change_wakersにwakerを登録してPoll::Pendingを返します。
frame_change_wakersは次のフレームになったときにwakerを呼び出すためのものでした。
そして、二度目のpollの呼び出し、つまり次のフレームの頭でのpollの呼び出してReadyを返すようにしています。

requestAnimationFrameの登録

requestAnimationFrameへのruntime_updateの登録は次のようになります。

impl Runtime {
    pub(crate) fn new() -> Self {
        let runtime = Self {
            shared: Rc::new(RefCell::new(Shared {
                frame_change_wakers: vec![],
                running_queue: VecDeque::new(),
                waiting_queue: BTreeMap::new(),
                task_counter: 0,
            })),
        };
        runtime.run();
        runtime
    }

    fn run(&self) {
        let runtime = self.clone();

        fn window() -> web_sys::Window {
            web_sys::window().expect("no global `window` exists")
        }

        fn request_animation_frame(f: &Closure<dyn FnMut()>) {
            window()
                .request_animation_frame(f.as_ref().unchecked_ref())
                .expect("should register `requestAnimationFrame` OK");
        }

        let f = Rc::new(RefCell::new(None));
        let g = f.clone();

        *g.borrow_mut() = Some(Closure::new(move || {
            runtime.runtime_update();
            request_animation_frame(f.borrow().as_ref().unwrap());
        }));

        request_animation_frame(g.borrow().as_ref().unwrap());
    }
}

web-sys: requestAnimationFrame - The wasm-bindgen Guideを参考にしました。

lib.rsの実装

デフォルトのRuntimeをthread_localで保持して、操作するための関数を用意しておきます。

lib.rs
use std::future::Future;

mod handle;
mod next_frame;
mod runtime;
mod task;
mod task_waker;

pub use handle::JoinHandle;
pub use runtime::Runtime;

thread_local! {
    pub static RUNTIME: Runtime = Runtime::new();
}

pub fn runtime() -> Runtime {
    RUNTIME.with(|r| r.clone())
}

pub fn runtime_update() {
    runtime().runtime_update();
}

pub fn next_frame() -> impl Future<Output = ()> {
    runtime().next_frame()
}

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + 'static,
    F::Output: 'static,
{
    runtime().spawn(future)
}

作った非同期ランタイムを動かしてみる

作った非同期欄ライムを動かしてみます。

main.rs
async fn delay(duration: std::time::Duration) {
    let start = instant::Instant::now();
    loop {
        let now = instant::Instant::now();
        let duration_from_start = now.duration_since(start);
        if duration_from_start > duration {
            break;
        }
        runtime::next_frame().await;
    }
}

async fn loop_log(n: usize) {
    loop {
        for _ in 0..n {
            runtime::next_frame().await;
        }
        log::info!("loop_log: {n}");
    }
}

fn main() {
    wasm_logger::init(wasm_logger::Config::default());

    log::info!("Start app!");

    runtime::spawn({
        async move {
            log::info!("Hello, world!");
            delay(std::time::Duration::from_secs(1)).await;
            log::info!("Hello, Wasm!");
            futures::join!(loop_log(60), loop_log(240));
        }
    });
}

指定秒数待ったり、あるいは数フレーム毎にログを出力するような非同期処理を実行しています。

実行結果は次のようになります。

screenshot

問題なく動いていました。

まとめ

今回はゲームループのupdateを想定したrequestAnimationFrameで駆動する非同期ランタイムを作りました。

今回作ったランタイムはrequestAnimationFrameでruntime_updateを呼び出していますが、別の方法で毎フレームruntime_updateを呼んでやれば、他のゲームのシステムにこのランタイムを組み込むこともできなくはないかもしれません。

Discussion