Rust非同期処理の仕組み:Future実行器とタスク管理
Future の定義
Future は Rust の非同期プログラミングの中核です。Future トレイトの定義は次の通りです:
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[lang = "future_trait"]
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
Ready(T),
Pending,
}
Future には関連型 Output があり、poll() メソッドを持っています。これは PollSelf::Output を返します。Poll は列挙型で、Ready と Pending の 2 つの状態を持ちます。poll() メソッドを呼び出すことで Future の実行を進め、タスクが完了するか中断されるまで処理を進めることができます。
現在の poll において Future が完了していれば、Poll::Ready(result) を返します。これは Future の値を得て返すことを意味します。Future がまだ完了していない場合、Poll::Pending() を返します。この時点で Future は中断され、あるイベントによって再び起動(wake)されるのを待つ必要があります(wake は起動関数です)。
実行スケジューラ executor
executor(エグゼキュータ)は Future のスケジューラです。オペレーティングシステムはスレッドのスケジューリングを担当しますが、ユーザー空間のコルーチン(例えば Future)まではスケジューリングしません。そのため、コルーチンを使って並行処理を行うプログラムでは、必ず executor が必要となり、それがコルーチンのスケジューリングを担当します。
Rust の Future は「遅延評価型」です:poll によってポーリングされない限り実行されません。この poll を進めるひとつの方法は、async 関数の中で .await
を使って他の async 関数を呼び出すことです。しかし、これは async 関数内部でのみ効果があり、最外層の async 関数は executor によって推進される必要があります。
executor ランタイム
Rust は Future というコルーチンの仕組みを提供していますが、言語レベルでは executor(実行ランタイム)を提供していません。つまり、コルーチンを使わない場合は、何のランタイムも導入する必要がありません。一方で、コルーチンを使う場合は、エコシステムの中から最適な executor を選ぶことができます。
Rust には代表的な executor が次の 4 種類あります:
- futures:このライブラリにはシンプルな executor が内蔵されています
-
tokio:executor を提供しており、
#[tokio::main]
を使うことで暗黙的に tokio の executor を導入できます - async-std:executor を提供しており、tokio に類似しています
-
smol:
async-executor
を提供し、主にblock_on
関数をサポートしています
wake 通知メカニズム
executor は複数の Future(最外層の async 関数)を管理し、それらを継続的に poll(ポーリング)することで処理を完了させます。最初に executor は Future を一度 poll しますが、その後は自発的には poll を行いません。poll メソッドが Poll::Pending
を返した場合、その Future は中断され、何らかのイベントが発生して wake()
関数を通じて再び起動(wake)されるまで待機します。Future は自分自身で executor に通知することで、再度 poll が行われ、処理が進みます。
この wake → poll のサイクルは Future が完了するまで繰り返されます。
Waker
は wake()
メソッドを提供します。これは、executor に対して関連するタスクを起動できる状態であることを通知する役割を担います。これにより、executor は対応する Future を再び poll できます。
Context
は Waker
のラッパーです。まず poll メソッド内の Context
を見てみましょう:
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}
Waker
の定義および関連コードは非常に抽象的であり、内部では vtable(仮想関数テーブル)を使用して様々な waker
の動作を実現しています:
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
Rust 自体は非同期ランタイムを提供しておらず、標準ライブラリでは基本的なインターフェースのみを定義しています。各ランタイムはそれらのインターフェースを使って独自の実装を提供します。したがって、標準ライブラリ内ではこれらのインターフェースの定義と、高レベルのインターフェース実装しか確認できません。
たとえば、Waker
の wake
メソッドは、内部で vtable の wake()
を呼び出しているだけです:
impl Waker {
/// この `Waker` に関連づけられたタスクを起動する
#[inline]
pub fn wake(self) {
// 実際の起動処理は仮想関数呼び出しに委譲される
let wake = self.waker.vtable.wake;
let data = self.waker.data;
// `drop` を呼ばない — `wake` によって `waker` は消費される
crate::mem::forget(self);
// SAFETY: `Waker::from_raw` によってのみ `wake` と `data` が初期化されるため安全
unsafe { (wake)(data) };
}
...
}
vtable の具体的な実装は標準ライブラリには含まれておらず、例えば futures
ライブラリなどのサードパーティの非同期ランタイムにて定義されています。
タイマーの構築
タイマーの例を使って、Future のスケジューリングメカニズムを理解しましょう。目標は次の通りです:
タイマーを作成するときに新しいスレッドを生成し、一定時間スリープさせ、その時間が経過したらタイマー Future に通知(signal)する。
補足:この例では futures
クレートの ArcWake
トレイトを使用します。これは Waker
を構築するための便利な手段を提供します。Cargo.toml
に以下の依存を追加してください:
[dependencies]
futures = "0.3"
タイマー Future の完全なコードは以下の通りです:
// future_timer.rs
use futures;
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Future と待機スレッド間で状態を共有する構造体
struct SharedState {
/// タイマー(スリープ)が完了したかどうか
completed: bool,
/// スリープ完了時、スレッドは `waker` を使って `TimerFuture` を起こすことができる
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 共有状態を確認して、タイマーが完了しているかどうかを判定
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
println!("future ready. execute poll to return.");
Poll::Ready(())
} else {
println!("future not ready, tell the future task how to wakeup to executor");
// `waker` を設定する。これにより、スリープ完了後に現在のタスクを再度 poll するよう通知可能になる。
// 下の `clone` は poll のたびに実行される。実際には 1 回だけ clone すれば理想的だが、
// Future は異なる executor のタスク間を移動する可能性があるため、
// 毎回 clone することで安全性を確保している。
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
/// 指定時間後に完了する新しい `TimerFuture` を作成する
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 新しいスレッドを作成
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
// 指定された時間だけスリープ
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// タイマーが完了したことを executor に通知
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
println!("detect future is ready, wakeup the future task to executor.");
waker.wake()
}
});
TimerFuture { shared_state }
}
}
fn main() {
// まだ独自のスケジューラを実装していないので、futures クレートの executor を使う
futures::executor::block_on(TimerFuture::new(Duration::new(10, 0)));
}
実行結果は以下のようになります:
future not ready, tell the future task how to wakeup to executor
detect future is ready, wakeup the future task to executor.
future ready. execute poll to return.
ご覧の通り、最初の時点では 10 秒タイマーが未完了で Pending
状態です。この時点でタスクが後でどうやって wake されるかを指定します。10 秒後、タイマーが完了し、事前に設定しておいた Waker
により、Future タスクが wake されて executor によって実行されます。
実行スケジューラの構築
前述のコードでは、自前のスケジューラは実装せず、futures
クレートが提供するスケジューラを使って実行していました。ここでは、スケジューラを自分で実装し、その仕組みを見てみましょう。
実際に Rust を使って非同期処理を行う場合は、tokio
クレートを学ぶのが一般的です。しかし、ここではあくまで実装原理を説明し、非同期の仕組みを理解することを目的とします。以下が主要なコードです:
// future_executor.rs
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
},
};
mod future_timer;
// 前で実装したタイマーのモジュールをインポート
use future_timer::TimerFuture;
/// タスク実行器:チャンネルからタスクを受け取り、poll により実行する
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner`:新しい Future を作成し、それをタスクチャンネルに送る役割
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// Future 本体:自分自身をスケジューリング(タスクチャンネルへ投入)し、executor によって poll されるのを待つ
struct Task {
/// 実行中の Future。ある時点で完了する
///
/// 本来 `Mutex` は不要ですが、Rust のコンパイラは Future がスレッドを跨がないことを理解できないため、
/// スレッドセーフのために `Mutex` を使っています。
///
/// 実際のプロダクションレベルの executor では、性能上の理由から `Mutex` の代わりに `UnsafeCell` を使います。
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// このタスク自身を再度タスクチャンネルへ投入し、poll の対象とする
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// タスクキューの最大長(単純な実装のため制限あり)
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
println!("first dispatch the future task to executor.");
self.task_sender.send(task).expect("too many tasks queued.");
}
}
/// `ArcWake` を実装し、タスクがどのように wake(起動)されて再スケジュールされるかを定義
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// タスクをチャンネルに再送信することで wake を実現
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
impl Executor {
/// 実行ループ:チャンネルからタスクを受け取り、poll を通じて Future を進める
fn run(&self) {
let mut count = 0;
while let Ok(task) = self.ready_queue.recv() {
count += 1;
println!("received task. {}", count);
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if future.as_mut().poll(context).is_pending() {
println!("executor run the future task, but is not ready, create a future again.");
*future_slot = Some(future);
} else {
println!("executor run the future task, is ready. the future task is done.");
}
}
}
}
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// TimerFuture をタスクとしてスケジューラに登録
spawner.spawn(async {
println!("TimerFuture await");
TimerFuture::new(Duration::new(10, 0)).await;
println!("TimerFuture Done");
});
// spawner を drop:これにより新たなタスクが来ないことを executor に知らせる
drop(spawner);
// タスクキューが空になるまで executor を実行
executor.run();
}
実行結果は以下の通り:
first dispatch the future task to executor.
received task. 1
TimerFuture await
future not ready, tell the future task how to wakeup to executor
executor run the future task, but is not ready, create a future again.
detect future is ready, wakeup the future task to executor.
received task. 2
future ready. execute poll to return.
TimerFuture Done
executor run the future task, is ready. the future task is done.
最初のスケジューリング時点では、タスクはまだ完了しておらず Pending
状態です。このとき、wake の方法を指定しておく必要があります。その後、イベントが発生すると、事前に設定しておいた Waker
によってタスクが wake され、executor によって再 poll されます。
非同期処理の流れ
Reactor パターンは、高性能なイベント駆動システムを構築する際によく使われる典型的なパターンです。executor と reactor は、この Reactor パターンの構成要素にあたります。
Reactor パターンは以下の 3 つの構成要素からなります:
- task(タスク):処理すべき処理単位。中断されることがあり、制御を executor に返して再スケジューリングを待つ
- executor(実行器):スケジューラ。実行可能なタスク(ready queue)とブロック中のタスク(wait queue)を管理
- reactor(リアクター):イベントキューを管理し、イベントが到着したときに executor に通知してタスクを再実行させる
executor は task をスケジューリングして実行します。もしタスクが継続できず未完了であれば、そのタスクを一旦中断し、適切な wake 条件をセットします。その後、reactor が条件を満たすイベントを検出すると、中断されていたタスクを起動(wake)し、executor が再び poll を行ってタスクを実行します。これを繰り返すことで、タスクが最終的に完了します。
Rust における Future を使った非同期処理は、この Reactor パターンの典型例です。
例:tokio の場合
async/await
は構文的なサポートを提供し、Future
は非同期タスクのデータ構造を表します。.await
によって executor がそれをスケジューリングして実行します。
tokio のスケジューラは複数のスレッド上で動作し、それぞれのスレッドは自分の ready queue 上のタスク(Future)を実行します。キューが空なら、他のスレッドのスケジューラからタスクを奪ってくる(work-stealing)仕組みもあります。
タスクが進行できなくなると、Future はPoll::Pending
を返します。スケジューラはこの時点でタスクを中断し、Waker
を使って wake 条件を登録します。
一方、reactor は OS の非同期 I/O(例えば epoll / kqueue / IOCP)を用いて OS の I/O イベントを監視します。条件に合致するイベントが届いた場合、reactor はWaker.wake()
を呼び出して Future を起こします。この Future は ready queue に戻され、実行対象として再度 poll されます。
まとめ
Future は Rust の非同期プログラミングの中核であり、将来完了する処理を表します。Rust の Future
は遅延評価型で、executor(実行器) によってスケジューリングされて初めて実行されます。
このスケジューリングは poll による実装で構成されており、現在の poll において Future が完了していれば Poll::Ready(result)
を返します。これは Future の値を取得して返すことを意味します。一方、Future がまだ完了していない場合、Poll::Pending()
を返し、その時点で Future は中断されます。そして、何らかのイベントが発生したときに Waker によって再び起動される必要があります。
Waker は wake()
メソッドを提供し、executor に対して「どのタスクを起こすか」を知らせます。wake()
が呼び出されると、executor は対応するタスクが再度実行可能であると判断し、Future を再び poll します。
この wake → poll のサイクルが繰り返され、Future が最終的に完了するまで処理が進みます。
各非同期タスクは、次の 3 つの段階に分かれます:
-
Poll(ポーリング)段階:executor が Future を poll し始めて実行開始。処理が進まず
Pending
状態になると、中断されて待機フェーズに入る。 -
待機段階:reactor(イベント発生源)が
Waker
を登録し、特定のイベントが発生するのを待つ。条件が満たされるとwake()
によって Future を起こす。 -
起動段階:イベントが発生すると、対応する Future が
Waker
によって起こされる。executor が Future を再 poll して、処理を一歩進める。この処理は Future が完了するか、再びPending
を返すまで繰り返される。
私たちはLeapcell、Rustプロジェクトのホスティングの最適解です。
Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:
複数言語サポート
- Node.js、Python、Go、Rustで開発できます。
無制限のプロジェクトデプロイ
- 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。
比類のないコスト効率
- 使用量に応じた支払い、アイドル時間は課金されません。
- 例: $25で6.94Mリクエスト、平均応答時間60ms。
洗練された開発者体験
- 直感的なUIで簡単に設定できます。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- 実行可能なインサイトのためのリアルタイムのメトリクスとログ。
簡単なスケーラビリティと高パフォーマンス
- 高い同時実行性を容易に処理するためのオートスケーリング。
- ゼロ運用オーバーヘッド — 構築に集中できます。
Xでフォローする:@LeapcellHQ
Discussion