RustでWasmのゲーム用シングルスレッド非同期ランタイムを作成する
はじめに
この記事ではWasmのゲーム用を想定したrequestAnimationFrame駆動/シングルスレッドのRust非同期ランタイムを作ります。
ゲームとasync/await
複数フレームにまたがる処理とasync/await
ゲームを扱う上でよく必要になる複数フレームにまたがる処理というのを考えてみます。
これはasync/awaitがとてもうまく使える場面です。
ゲーム分野のasync/awaitはUniTaskの事例が参考になります。
UniTaskを利用することで、複数フレームにまたがる処理をまっすぐ書くことができるようになりました。
例えば、簡単な例で「メッセージウィンドウをアニメーションで表示し、決定ボタンを押すまで待機し、ボタンが押されたらアニメーションでウィンドウを閉じる」という処理を書くとします。
async/awaitを使わないでRustで書いてみると次のようになります。
enum Phase {
WindowOpenAnimation,
WaitKey,
WindowCloseAnimation,
End,
}
struct State {
phase: Phase,
}
impl State {
fn update(&mut self) {
match self.phase {
Phase::WindowOpenAnimation => {
// アニメーション処理
update_window_open_animation();
if is_open_animation_finished() {
self.phase = Phase::WaitKey;
}
}
Phase::WaitKey => {
// キー入力待ち
if is_key_pressed() {
self.phase = Phase::WindowCloseAnimation;
}
}
Phase::WindowCloseAnimation => {
// アニメーション処理
update_close_animation();
if is_close_animation_finished() {
self.phase = Phase::End;
}
}
}
}
}
現在のフェーズを保持してupdateでの処理を分岐しています。
async/awaitを用いて書くイメージは次のとおりです。
async fn update() {
// アニメーション処理
window_open_animation().await;
// キー入力待ち
wait_key().await;
// アニメーション処理
window_close_animation().await;
}
async/awaitを使うことで直列に書くことができ、処理の流れが分かりやすくなりました。
フレームアニメーションベースの非同期処理
先程のasync/awaitを利用した例のwindow_open_animation
の中身も想像してみましょう。
async/awaitを利用しない場合で使っているupdate_window_open_animation
とis_open_animation_finished
を使って書くと次のようになります。
async fn window_open_animation() {
while !is_open_animation_finished() {
update_window_open_animation();
runtime::next_frame().await;
}
}
ここで注目してほしいのがruntime::next_frame().await
です。
非同期ランタイムがゲームループと協調して動くのに重要な役割を果たしています。
既存の非同期ランタイムとゲーム用途
Rustにはすでに素晴らしい非同期ランタイムとしてtokio、あるいは次点でasync-stdが存在します。
また、Wasmで利用する場合、wasm-bindgen-futuresが使えるでしょう。
しかし、これらのランタイムはゲームループと協調して動くように設計されていません。
例えばファイルIOなどの処理を非同期で扱うような場合にはtokioなどを使えば良いでしょうが、ゲーム用途となると上記特徴から扱いが難しいです。
Wasm用となるとtokioはそのままでは現在は動きません。
シングルスレッド前提のランタイムが必要になります。
では、早速そのようなランタイムを作ってみましょう。
ランタイムを作る
作成したランタイムのソースコードはこちらにあります。
Future
非同期ランタイムのキーとなるtraitがFutureです。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
このFutureのpollを適時呼び出すことで非同期処理を進めていきます。
poll()
を実行して、その結果Poll::Ready(T)
が返ってきたら非同期処理が完了したことになります。
Poll::Pending
が帰ってきた場合は、非同期処理が完了するまで待つ必要があります。
非同期処理が完了するとContext
の中のwakerのwakeが呼び出されます。
wakeが呼び出されるまでFutureを保持しておいて、wakeが呼び出されたらpollを実行して先へ進める、というのを繰り返していくことになります。
ランタイムの状態
ランタイムが保持する状態について説明します。
今回作成する非同期ランタイムは、非同期タスクを2つのコンテナで保持します。
一つはrunning_queue
で、現在実行中のタスクを保持します。
もう一つはwaiting_queue
で、このフレームでは実行しない中断中のタスクを表現します。
running_queue
の中身にあるのを順次poll
していき、Poll::Pending
が返ってきたらwaiting_queue
に移動します。
この中断中のqueueに移動することをParkすると表現します。
また、今回の非同期ランタイムではnext_frame
の対応のために、新しいフレームが始まるときに起動するwakerを登録する口を用意しています。
最後にタスクの識別のためのtask_counter
を保持します。
pub(crate) struct Shared {
pub(crate) frame_change_wakers: Vec<Waker>,
pub(crate) running_queue: VecDeque<Rc<RefCell<Task>>>,
pub(crate) waiting_queue: BTreeMap<usize, Rc<RefCell<Task>>>,
task_counter: usize,
}
#[derive(Clone)]
pub struct Runtime {
pub(crate) shared: Rc<RefCell<Shared>>,
}
runtime_update
の実装
ランタイムのランタイムは毎フレームrequestAnimationFrameでruntime_update
を呼び出すことで勧めていきます。
そのruntime_update
の処理を下に全部貼り付けます。
impl Runtime {
pub fn runtime_update(&self) {
let wakers = self
.shared
.borrow_mut()
.frame_change_wakers
.drain(..)
.collect::<Vec<_>>();
for w in wakers {
w.wake_by_ref();
}
'current_frame: loop {
let task = self.shared.borrow_mut().running_queue.pop_front();
match task {
None => break 'current_frame,
Some(task) => {
let id = {
let id = self.shared.borrow().task_counter;
self.shared.borrow_mut().task_counter += 1;
id
};
let waker = TaskWaker::waker(self.clone(), id);
let mut cx = Context::from_waker(&waker);
let result = task.borrow_mut().poll(&mut cx);
match result {
Poll::Ready(()) => {
// taskの完了をJoinHandleに通知する
task.borrow_mut()
.callback_waker
.iter()
.for_each(|w| w.wake_by_ref());
}
Poll::Pending => {
self.shared.borrow_mut().waiting_queue.insert(id, task);
}
}
}
}
}
}
}
最初にframe_change_wakers
に登録してあるWakerを全部wakeします。
その次に現在のフレームで処理すべきタスクを'current_frame
ループで回します。
ループの中ではrunning_queue
からタスクを取り出して、タスクが存在しない場合はループを打ち切っています。
実行中のタスクが存在する場合、このタスクのidを取得しています。
そしてタスクの実行が完了したかどうかのフラグも用意してTaskWakerを作成し、そこからContextを作成してpoll(&mut cx)
を呼び出しています。
poll(&mut cx)
した結果がReadyならば、taskに登録されているwakerを呼び出します。
このtaskに登録されているwakerと言うのは、spawn
が返すJoinHandle
をwakeするためのものです。
poll(&mut cx)
した結果がPendingならばTaskをwaiting_queue
にid
をキーとして入れています。
あとはrunning_queue
にあるタスクがなくなるまでループを繰り返してこのフレームの処理は終わりです。
spawnの実装
非同期処理の起点となるRuntime::spawn
の処理を貼ります。
impl Runtime {
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let (task, handle) = joinable(future);
self.shared.borrow_mut().running_queue.push_back(task);
handle
}
}
futureをjoinableという関数でtaskとhandleに分割し、running_queueにtaskを登録しています。
JoinHandleの実装
次にJoinHandleの実装を貼ります。
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::task::Task;
pub struct JoinHandle<T> {
pub(super) value: Rc<RefCell<Option<T>>>,
pub(super) task: Rc<RefCell<Task>>,
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(val) = self.value.borrow_mut().take() {
Poll::Ready(val)
} else {
let waker = cx.waker().clone();
self.task.borrow_mut().register_callback(waker);
Poll::Pending
}
}
}
JoinHandleはFuture
を実装しています。
JoinHandleは返り値であるvalue
をOptionで保持しています。
poll
した時点でvalue
が保持されていたらPoll::Ready(T)
のその値を入れます。
value
が保持されていなかったら、task
にWaker
を登録してPoll::Pending
を返します。
このtaskに登録されたwakerが、先のランタイムでpollしてtaskがreadyだった場合に呼び出していたものです。
Taskの実装
次にTaskの実装を貼ります。
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use crate::handle::JoinHandle;
pub struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
pub(super) callback_waker: Option<Waker>,
}
impl Task {
pub(super) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
Future::poll(self.future.as_mut(), cx)
}
pub(super) fn register_callback(&mut self, waker: Waker) {
self.callback_waker = Some(waker);
}
}
pub(crate) fn joinable<F>(future: F) -> (Rc<RefCell<Task>>, JoinHandle<F::Output>)
where
F: Future + 'static,
F::Output: 'static,
{
let value = Rc::new(RefCell::new(None));
let task = {
let value = Rc::clone(&value);
Rc::new(RefCell::new(Task {
future: Box::pin(async move {
let output = future.await;
let mut value = value.borrow_mut();
*value = Some(output);
}),
callback_waker: None,
}))
};
let handle = JoinHandle {
value,
task: Rc::clone(&task),
};
(task, handle)
}
Task
はFuture
ではなく、Future
をくるんだものとなっています。
Future
がOutputを持つのに対して、Task
はOutput=()
なFutureを保持しています。
Task
のpoll
はそのまま内部のfuture
のpoll
に移譲しています。
Task
の内部のfuture
は、joinable
に渡されたfuture
をawaitしてその値をhandle
のvalue
にセットするfutureとなっています。
先に述べた通り、Taskはspawnが完了したことを通知するためのwakerを保持しています。
TaskWakerの実装
ここが今回一番トリッキーな実装です。
use std::rc::Rc;
use std::task::{RawWaker, RawWakerVTable, Waker};
use crate::runtime::Runtime;
pub(crate) struct TaskWaker {
runtime: Runtime,
task_id: usize,
}
impl TaskWaker {
pub(crate) fn waker(runtime: Runtime, task_id: usize) -> Waker {
let waker = Rc::new(TaskWaker { runtime, task_id });
unsafe { Waker::from_raw(Self::into_raw_waker(waker)) }
}
pub(crate) fn wake_by_ref(this: &Rc<Self>) {
// // waiting queueから取り出してrunning queueに入れる
let task = this
.runtime
.shared
.borrow_mut()
.waiting_queue
.remove(&this.task_id);
if let Some(task) = task {
this.runtime
.shared
.borrow_mut()
.running_queue
.push_back(task);
}
}
// RawWakerを作る。
// WakerはSync + Sendを要求するが、wasmではシングルスレッドなので、
// Rcのポインタだけで良いということにする。
unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const TaskWaker));
TaskWaker::into_raw_waker((*ptr).clone())
}
unsafe fn raw_wake(ptr: *const ()) {
let ptr = Rc::from_raw(ptr as *const TaskWaker);
TaskWaker::wake_by_ref(&ptr);
}
unsafe fn raw_wake_by_ref(ptr: *const ()) {
let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const TaskWaker));
TaskWaker::wake_by_ref(&ptr);
}
unsafe fn raw_drop(ptr: *const ()) {
drop(Rc::from_raw(ptr as *const TaskWaker));
}
const VTABLE: RawWakerVTable =
RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
}
}
Wakerは常に別のスレッドから取得され呼び出される可能性があるので、その実装にはSync + Sendを要求します。
このようなSync + Sendを実装するのにArcでくるむ形のWakerを実装するには簡単にはArcWaker
というものが用意されていて使えます。
しかし、WasmではArcのようなマルチスレッド用の構造体は使えません。
その場合はunsafe
を利用して注意深く実装する必要があります。
unsafe
で実装しているのはinto_raw_waker
というメソッドです。
Rcからポインタを取り出して、VTABLEとセットにしてRawWakerを作っています。
wakeされたときの処理はwake_by_ref
というメソッドに実装しています。
この処理ではランタイムのwaiting_queue
からタスクを取り出してrunning_queue
に入れることを行っています。
next_frameの実装
Runtime
のnext_frame
の実装は次のとおりです。
impl Runtime {
pub fn next_frame(&self) -> impl Future<Output = ()> {
crate::next_frame::NextFrameFuture::new(self.clone())
}
}
実態はNextFrameFuture
です。
NextFrameFuture
の実装は次のとおりです。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::runtime::Runtime;
pub struct NextFrameFuture {
called: bool,
runtime: Runtime,
}
impl NextFrameFuture {
pub(crate) fn new(runtime: Runtime) -> Self {
Self {
called: false,
runtime,
}
}
}
impl Future for NextFrameFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.called {
Poll::Ready(())
} else {
self.called = true;
let waker = cx.waker().clone();
self.runtime
.shared
.borrow_mut()
.frame_change_wakers
.push(waker);
Poll::Pending
}
}
}
まだ一度もpoll
を呼ばれていない場合は、frame_change_wakers
にwakerを登録してPoll::Pending
を返します。
frame_change_wakers
は次のフレームになったときにwakerを呼び出すためのものでした。
そして、二度目のpoll
の呼び出し、つまり次のフレームの頭でのpoll
の呼び出してReadyを返すようにしています。
requestAnimationFrameの登録
requestAnimationFrameへのruntime_updateの登録は次のようになります。
impl Runtime {
pub(crate) fn new() -> Self {
let runtime = Self {
shared: Rc::new(RefCell::new(Shared {
frame_change_wakers: vec![],
running_queue: VecDeque::new(),
waiting_queue: BTreeMap::new(),
task_counter: 0,
})),
};
runtime.run();
runtime
}
fn run(&self) {
let runtime = self.clone();
fn window() -> web_sys::Window {
web_sys::window().expect("no global `window` exists")
}
fn request_animation_frame(f: &Closure<dyn FnMut()>) {
window()
.request_animation_frame(f.as_ref().unchecked_ref())
.expect("should register `requestAnimationFrame` OK");
}
let f = Rc::new(RefCell::new(None));
let g = f.clone();
*g.borrow_mut() = Some(Closure::new(move || {
runtime.runtime_update();
request_animation_frame(f.borrow().as_ref().unwrap());
}));
request_animation_frame(g.borrow().as_ref().unwrap());
}
}
web-sys: requestAnimationFrame - The wasm-bindgen
Guideを参考にしました。
lib.rsの実装
デフォルトのRuntimeをthread_localで保持して、操作するための関数を用意しておきます。
use std::future::Future;
mod handle;
mod next_frame;
mod runtime;
mod task;
mod task_waker;
pub use handle::JoinHandle;
pub use runtime::Runtime;
thread_local! {
pub static RUNTIME: Runtime = Runtime::new();
}
pub fn runtime() -> Runtime {
RUNTIME.with(|r| r.clone())
}
pub fn runtime_update() {
runtime().runtime_update();
}
pub fn next_frame() -> impl Future<Output = ()> {
runtime().next_frame()
}
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
runtime().spawn(future)
}
作った非同期ランタイムを動かしてみる
作った非同期欄ライムを動かしてみます。
async fn delay(duration: std::time::Duration) {
let start = instant::Instant::now();
loop {
let now = instant::Instant::now();
let duration_from_start = now.duration_since(start);
if duration_from_start > duration {
break;
}
runtime::next_frame().await;
}
}
async fn loop_log(n: usize) {
loop {
for _ in 0..n {
runtime::next_frame().await;
}
log::info!("loop_log: {n}");
}
}
fn main() {
wasm_logger::init(wasm_logger::Config::default());
log::info!("Start app!");
runtime::spawn({
async move {
log::info!("Hello, world!");
delay(std::time::Duration::from_secs(1)).await;
log::info!("Hello, Wasm!");
futures::join!(loop_log(60), loop_log(240));
}
});
}
指定秒数待ったり、あるいは数フレーム毎にログを出力するような非同期処理を実行しています。
実行結果は次のようになります。
問題なく動いていました。
まとめ
今回はゲームループのupdateを想定したrequestAnimationFrameで駆動する非同期ランタイムを作りました。
今回作ったランタイムはrequestAnimationFrameでruntime_update
を呼び出していますが、別の方法で毎フレームruntime_update
を呼んでやれば、他のゲームのシステムにこのランタイムを組み込むこともできなくはないかもしれません。
Discussion