🧙

非同期処理の裏側を覗き見 ~Rustのasync/awaitの場合~

2024/07/18に公開

イントロダクション

非同期処理は、タスクの完了を待つことなく多くのタスクをプログラム上で実行することができる強力なツールです。例えば、Webサーバーのリクエスト処理やファイルの読み書きなど、I/Oバウンドタスクの効率的な処理に非常に有用です。
この記事では、Rust公式のAsynchronous Programming in Rustの第1章と第2章を参考にし、並行プログラミングモデルや非同期処理の実装例、非同期ランタイムの内部構造までまとめました。

並行プログラミングモデルの比較

並行プログラミングにはさまざまなモデルがあり、それぞれに特有の利点と欠点があります。Asynchronous Programming in Rust 第1章の内容に基づき、OSスレッド、イベント駆動、コルーチン、アクターモデルの各モデルについて例を交えながら説明します。

OSスレッド

OSスレッドは、オペレーティングシステムによって管理されるスレッドを使用する並行プログラミングモデルです。既存の同期コードのプログラミングモデルを大きく変更せず利用できる点で優れており、少数のタスクに対しては簡単に並行処理を表現できます。しかし、スレッド間の同期の難しさやコンテキストスイッチによるパフォーマンスのオーバーヘッドが大きいため、大量のタスクやIOバウンドのワークロードには向いていません。スレッドプールを使用することで一部のコストを軽減できますが、すべての問題を解決するわけではありません。

public class ThreadExample {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                System.out.println("Hello from the spawned thread: " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        thread.start();

        for (int i = 1; i <= 5; i++) {
            System.out.println("Hello from the main thread: " + i);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        try {
            thread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

イベント駆動プログラミング

イベント駆動プログラミングは、イベントが発生したときにそれに対応する処理を実行するモデルです。コールバックやイベントハンドラーを使用して、非同期イベントに応答します。

const fs = require('fs');

fs.readFile('example.txt', 'utf8', (err, data) => {
    if (err) {
        console.error(err);
        return;
    }
    console.log(data);
});

console.log('This will run before the file is read');

高いパフォーマンスを実現できる一方でコードが非線形になりがちであり、データフローやエラーの伝搬が追跡しにくいです。

コルーチン

コルーチンは、一時停止と再開が可能な関数のようなものです。自然な制御フローを保ちながら並行処理を行うことができます。

import asyncio

async def async_function():
    print('Start')
    await asyncio.sleep(2)
    print('End')

asyncio.run(async_function())

スレッドと同じくプログラミングモデルを変える必要がなく使いやすいですが、低レベルの詳細は抽象化されています。

アクターモデル

アクターモデルは、すべての並行計算をアクターと呼ばれる独立した単位に分割するモデルです。アクターは、メッセージを通じて通信します。

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class AkkaExample {

    // Pingアクター
    static class PingActor extends AbstractActor {
        private final ActorRef pongActor;

        public PingActor(ActorRef pongActor) {
            this.pongActor = pongActor;
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals("ping", msg -> {
                        System.out.println("Ping received");
                        pongActor.tell("pong", getSelf());
                    })
                    .build();
        }
    }

    // Pongアクター
    static class PongActor extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals("pong", msg -> {
                        System.out.println("Pong received");
                        getSender().tell("ping", getSelf());
                    })
                    .build();
        }
    }

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create("pingpong-system");
        final ActorRef pongActor = system.actorOf(Props.create(PongActor.class), "pongActor");
        final ActorRef pingActor = system.actorOf(Props.create(PingActor.class, pongActor), "pingActor");

        // 初めのメッセージ送信
        pingActor.tell("ping", ActorRef.noSender());

        // システムを終了させる前に少し待機
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            system.terminate();
        }
    }
}

アクター間の通信はメッセージを通じて行われるためスレッドセーフな並行処理が実現できますが、フロー制御やリトライロジックなどの実用的な問題を解決する必要があります。

OSスレッドと非同期(Async)の比較

OSスレッドは少数のタスクであれば既存の同期コードをほぼそのまま使える利便性がありますが、CPUとメモリのオーバーヘッドが大きく、大規模なIOバウンドタスクには向きません。
非同期(Async)は軽量で効率的に大量のIOバウンドタスクを処理できますが、非同期用のランタイムがバンドルされるため、バイナリサイズが大きくなる傾向があります。
それぞれ向き不向きがあるため、シナリオに応じて適切に選択することが大切です。

Rustでのasync/.await

非同期処理の例として、Rustの async/.awaitを説明します。以下のポイントに基づいて動作します:

async

asyncキーワードは、コードブロックをFutureというトレイトを実装した状態機械に変換します。
async fnは非同期関数を定義し、戻り値はFutureです。

await

async fn内で、.awaitを使用して他のFutureの完了を待機します。
.awaitはスレッドをブロックせずに非同期に待機し、他のタスクが実行されるのを許可します。

Future

Futureは、非同期計算を表現するためのトレイトであり、最終的に値を生成します(値が空の場合もあります)。例として、簡易版のFutureトレイトは以下のようになります。

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Futureはpoll関数を呼び出すことで進行します。pollが呼ばれると、Futureは完了に向けて可能な限り進行し、完了するとPoll::Ready(result)を返します。まだ完了していない場合はPoll::Pendingを返し、Futureが再び進行可能になったときにwake()関数が呼ばれるように設定します。wake()が呼ばれると、Futureを駆動するExecutorがpollを呼び出し、Futureの処理が再度進行します。

Executor

Executorは非同期タスクをスケジュールし、管理します。Futureは Executorにより実行されます。
ブロッキング関数の場合、呼び出すとスレッド全体がブロックされます。一方で、 asyncにより生成されたFutureはブロックされるとスレッドの制御を他のFutureに譲渡します。これにより、スレッドが効率的に利用され、複数のタスクを同時に実行することが可能になります。
以下の例を見てみます。

use futures::executor::block_on;
use futures::join;
use std::time::Duration;
use futures_timer::Delay;

async fn learn_song() -> String {
    println!("Learning the song...");
    Delay::new(Duration::from_secs(2)).await;
    println!("Learned the song!");
    "La La La".to_string()
}

async fn sing_song(song: String) {
    println!("Singing the song: {}", song);
    Delay::new(Duration::from_secs(2)).await;
    println!("Finished singing the song!");
}

async fn learn_and_sing() {
    let song = learn_song().await;
    sing_song(song).await;
}

async fn dance() {
    for _ in 0..3 {
        println!("Dancing...");
        Delay::new(Duration::from_secs(1)).await;
    }
    println!("Finished dancing!");
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

async fn learn_and_sing()は、歌を学んでから歌う非同期関数です。learn_song().awaitで歌を学び、その後sing_song(song).awaitで歌を歌います。
async fn async_main()は、learn_and_singとdanceを並行して実行する非同期関数です。futures::join!を使用して、両方のタスクが完了するまで待ちます。
fn main()は、block_on(async_main())を使用して非同期関数を実行します。block_onは、非同期タスクが完了するまで現在のスレッドをブロックします。
このコードを実行すると、以下のような出力が得られます。

Learning the song...
Dancing...
Dancing...
Learned the song!
Singing the song: La La La
Dancing...
Finished dancing!
Finished singing the song!

この結果から、learn_and_singとdanceが並行して実行され、異なるタイミングで完了することが確認できます。learn_and_sing 内では、learn_song().awaitにより learn_songが完了するまでsing_songは待機しますが、その間danceは並行して実行されます。これにより、スレッドが効率的に利用され、パフォーマンスが向上します。

非同期処理の裏側

Rust言語において、 asyncと await の裏側でどのような処理が動いているのか確認するために、カスタム非同期ランタイムを実装してみます。Asynchronous Programming in Rust 第2章のカスタムタイマーの例を見ていきます。この例では、ExecutorとSpawnerという2つの主要なコンポーネントを持つ非同期タスク実行環境を構築し、TimerFutureを使用してタイマーの非同期タスクを実行します。 lib.rs と main.rs の二つのファイルに実装します。
まずは、lib.rsファイルです。

lib.rs
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

lib.rs ファイルでは、TimerFutureというカスタム非同期タイマーを実装しています。このタイマーは指定された期間が経過するまで非同期に待機します。
各要素に関して説明します。

TimerFuture構造体

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

TimerFutureは、 Futureトレイトの実装クラスで、一定の期間後に完了する非同期タスクを表します。SharedStateは、タイマーの時間が経過したかどうかの状態(completed)と、時間経過後にタスクが再びポーリングされるためのwakerを保持します。

Futureトレイトの実装

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

Futureトレイトを実装することで、TimerFutureは非同期タスクとして扱えるようになります。
pollメソッドはエグゼキューターによって実行されるメソッドで、タイマーが完了しているかをチェックします。完了していればPoll::Readyを返します。完了していない場合、Contextに設定されたwakerをshared_stateに保存し、Poll::Pendingを返します。Contextへのwakerの設定はエグゼキュータ側で行います。

TimerFutureの新規作成

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

newメソッドは、TimerFutureのインスタンスを作成します。shared_stateを初期化し、タイマーの完了状態をfalseに設定します。
また、時間カウント用のスレッドを生成します。このスレッドは、指定された期間が経過した後にcompletedフラグをtrueに設定し、保存されたwakerを呼び出すことでタスクを再スケジュールします。

次に、main.rsです。こちらでは、非同期タスクを管理・実行するカスタム非同期ランタイムを構築します。これには、タスクの生成と実行を管理するエグゼキュータとスポーナーが含まれています。

main.rs
use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};

use timer::TimerFuture;

struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        };
    }
}

#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    spawner.spawn(async {
        println!("howdy!");
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!")
    });

    drop(spawner);

    executor.run();
}

各要素に関して説明します。

Executor

struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        };
    }
}

Executorは、タスクを実行する責任を持つ構造体です。タスクはReceiver<Arc<Task>>というキューに保存され、順次実行されます。
runメソッドは、キューからタスクを取り出し、タスクのFutureのpollメソッドを実行します。タスクが完了していない場合(Poll::Pendingが返ってきた場合)は、再びfutureをタスクに設定します。

Spawner

#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

Spawnerは、新しい非同期タスクを生成し、エグゼキュータのキューに送信します。

Task

struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

Taskは、個々の非同期タスクを表す構造体です。ArcWakeトレイトのwake_by_refによって、ウェイクアップ時の処理を設定しています。TimerFuture::newのメソッドで起動した時間経過カウント用の別スレッドは、指定時間経過後にWaker.wake()を呼び出してwake_by_refの処理を実行することで、エグゼキュータのキューへタスクを再度送信しています。

main メソッドを実行すると、 howdy! が表示され、2秒後に done! が表示されます。

全体処理の流れ

全体の処理の流れをまとめると、以下のようになります。

  1. ExecutorとSpawnerの作成
    new_executor_and_spawner関数で、非同期タスクを管理・実行するためのエグゼキュータとタスクを生成するスポーナーを作成します。
  2. 非同期タスクの生成と送信
    Spawnerが非同期タスクを生成し、エグゼキュータのキューに送信します。このタスクはTimerFutureを使用して2秒間待機する非同期タスクであり、ウェイクアップ用の処理(今回の場合は、エグゼキュータのキューへタスクを送信する処理)もArcWakeのwake_by_refにより設定されています。
  3. エグゼキュータの実行
    エグゼキュータはキューからタスクを取り出します。そのタスクのTimerFuture::pollを実行します。その際、タスクに設定してあったWakerを引数のContextに含めて渡します。
  4. TimerFutureのポーリング
    エグゼキュータによってTimerFuture::pollメソッドが実行され、タスクが完了していない場合(shared_state.completed = false)はPoll::Pendingを返します。このとき、引数で受け取ったWakerをshared_stateに設定します。
  5. カウント用スレッドの実行
    TimerFuture::newのタイミングで別スレッドでカウントが開始され、指定された時間が経過すると、shared_state.completedをtrueに設定し、shared_stateに設定されたWakerを呼び出します。
  6. Waker の呼び出し
    Wakerにより、タスクが再度エグゼキュータに送信されます。
  7. エグゼキュータの再ポーリング
    エグゼキュータがキューからタスクを再度取り出し、TimerFuture::pollを実行します。今度はshared_state.completed = trueとなっているため、Poll::Readyを返します。
  8. タスクの完了
    エグゼキュータはタスクが完了したと判定し、続きの処理を実行します。

このように、async/.awaitのコルーチン的な書き方の背後では、Futureの状態機械とExecutorのカスタムランタイムによるイベントループの処理が動いていることが分かります。

まとめ

非同期プログラミングは、現代のソフトウェア開発において不可欠な技術です。非同期処理を利用することで、効率的なリソース管理と高いパフォーマンスを実現できます。

非同期処理の基礎

非同期処理は、タスクの完了を待たずに他のタスクを実行できるため、I/Oバウンドな作業や大規模な並行処理において特に有用です。

並行プログラミングモデルの比較

  • OSスレッド: シンプルで使いやすいが、リソースのオーバーヘッドが大きく、スケーラビリティに課題がある。
  • イベント駆動: 高性能だが、コードが非線形になりがち。
  • コルーチン: 自然な制御フローを保ちながら並行処理を実現可能。
  • アクターモデル: スレッドセーフな並行処理が可能だが、制御フローが複雑。

Rustでのasync/.await

Rustのasync/.awaitは、非同期プログラミングをシンプルにし、高性能な実装を可能にします。

非同期処理の裏側

自作の非同期ランタイムを実装することで、非同期処理のメカニズムの理解が深まりました。Rustのasync/.awaitにおいては、裏側でFuture、Waker、Executorが協力することでイベントループによる非同期処理を実現していました。

非同期プログラミングは複雑ですが、基本的な概念とモデルを理解することで、効果的に活用できるようになると思います。

Discussion