😽

[Rust]Try smol ! Task + mpmc channel

に公開

目的

async_stdの開発サポートが終了し,smolへの移行が推奨されている.smolの雰囲気を知るために,非同期タスクとchannel周りを見てみる.
(基本的にTokioしか使ってこなかったためasync_stdをあまり知らず,かつasync_stdはsmolを内部で使用しているため、smol特有のものではないかもしれない..🙇‍♂️)

Main

Tokio

Tokioでは#[tokio::main]アトリビュートでmain関数をasyncにしている.
内部ではblock_onをしている.

#[tokio::main]
async fn main() {
    // async task
}

smol(block_on)

smolクレート単体ではtokio::mainのようなマクロは存在せず、block_onを呼び出す必要がある.

fn main() {
    smol::block_on(async move {
        // async task
    }
}

smol(手続き型)

マクロを使用したい場合はsmol_macrosを使用する必要がある.

use smol_macros::main;
main! {
    async fn main() {
        // async task
    }
}

smol(宣言的)

これでも違和感があり,手続き型ではなく,宣言的にしたい場合は更にmacro_rules_attributeクレートを導入する必要がある.

use macro_rules_attribute::apply;
use smol_macros::main;

#[apply(main!)]
async fn main() {
    // async task
}

Tasks

Task概要

smolで提供されている関数はsmol::spawnsmol::unblock.
スレッドプール周りとpanicした時の挙動がTokioとは異なる.

spawn

1. single / multi

ドキュメントによるとデフォルトでsingle threadになっている.

Tokioはデフォルトではマルチスレッドという違いがあり,
#[tokio::main(worker_threads = 1)]に近い.

2. Task / JoinHandle

smolの定義はこの様になっている.

pub fn spawn<T: Send + 'static>(
    future: impl Future<Output = T> + Send + 'static,
) -> Task<T>

一方、Tokioは以下のようになっている.

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>where
    F: Future + Send + 'static,
    F::Output: Send + 'static,

TokioではタスクがpanicしたらJoinHandleをawaitした際にResult型として伝播することができ,プロセスは停止しない.

smolではタスクがpanicしたらプロセスごと停止する.

そのためTaskの値を取得するのに非同期タスクのエラーハンドリングを必要としない.

例えば戻り値がResult<String>のとき

Tokio

let res: String = tokio_task.await.unwrap().unwrap();
//                               ------    -------
//                                  |         |
//                              JoinHandle    |
//                                         Result<T>

smol

let res: String = smol_task.await.unwrap();
//                                ------
//                                  |
//                              Result<T>

これに関してはなかなか癖があると感じました.

unblock

これはブロッキング処理を行うために専用のスレッドプールを作成してオフロードするための関数.
tokio::task::spawn_blockingに近い.

Examples

use std::time;

fn main() {
    smol::block_on(async move {
        // ===spawn async task====
        // 出力までに3sかかる
        let task1 = smol::spawn(task());
        let task2 = smol::spawn(task());

        let res1 = task1.await;
        let res2 = task2.await;
        println!("{}:{}", res1, res2);

        // ====spawn blocking task====
        // ここでは同期スリープしているが、CPUバウンドな重い計算などが相当
        // 出力に6s程度かかる
     // これはスレッドを掴んだまま、譲らないため
        let task1 = smol::spawn(task_blocking());
        let task2 = smol::spawn(task_blocking());

        let res1 = task1.await;
        let res2 = task2.await;
        println!("{}:{}", res1, res2);

        // ====unblock blocking task====
        // 出力に3s程度
        // 別スレッドプールにオフロードする
        // tokio::task::spawn_blocking()に近い
        // クロージャを渡す
        let task1 = smol::unblock(sync_blocking);
        let task2 = smol::unblock(sync_blocking);

        let res1 = task1.await;
        let res2 = task2.await;
        println!("{}:{}", res1, res2);
    });
}

async fn task() -> String {
    smol::Timer::after(time::Duration::from_secs(3)).await;
    "Hello".to_string()
}

async fn task_blocking() -> String {
    std::thread::sleep(time::Duration::from_secs(3));
    "Hello".to_string()
}

fn sync_blocking() -> String {
    std::thread::sleep(time::Duration::from_secs(3));
    "Hello".to_string()
}

Note

マルチスレッドにすることもでき,その場合はExecuterを使用する.

use async_executor::Executor;

let ex = Executor::new();

let task = ex.spawn(async {
    println!("Hello world");
});

mpmc channel

mpmc概要

提供されているchannelはmulti-producer multi-consumermpmc channel.
そのためSenderReceiverそれぞれにCloneトレイトが実装されている.
これに関してはasync_stdと一緒.

作成にはunboudboudがあり、キューに書き込める数を制限するかどうかで使い分ける.

use async_channel::{bounded, TryRecvError, TrySendError};

let (s, r) = bounded(1);

assert_eq!(s.send(10).await, Ok(()));
assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));

assert_eq!(r.recv().await, Ok(10));
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

値自体は、異なる非同期タスクのすべてが受信できるわけではなく,どれかのタスク1つだけが受信(消費)できる.
crossbeam_channelに近い.
リバースプロキシの実装のような感じでしょうか...

Example

データを送信する非同期タスクとデータを受信して、キューにpushする受信タスクがそれぞれ2つ存在する.
送信タスクは3 s周期で送る. また、受信タスクはid1番は1 ms delayし,わざと複数のタスクが値を消費できるようにしている.


use std::{
    sync::{Arc, Mutex},
    time,
};

use smol::channel::{self, Receiver, Sender};

type StoreType = Arc<Mutex<Vec<String>>>;
fn main() {
    let (tx, rx) = channel::unbounded();

    smol::block_on(async move {
        // メッセージの作成
        let messages: Vec<String> = ["Hello", "smol", "😽"]
            .iter()
            .map(|v| v.to_string())
            .collect();

        let messages2: Vec<String> = ["Goodby", "async_std", "😢"]
            .iter()
            .map(|v| v.to_string())
            .collect();

        // メッセージを保存するためのスタック
        // ここではstd::sync::Mutexを使っている
        let msg_store: StoreType = Arc::default();

        // Sender Tasks
        let send_task1 = smol::spawn(send_task(1, messages, tx.clone()));
        let send_task2 = smol::spawn(send_task(2, messages2, tx));
        // Receiver Tasks
        let receive_task1 = smol::spawn(receive_task(1, rx.clone(), msg_store.clone()));
        let receive_task2 = smol::spawn(receive_task(2, rx.clone(), msg_store.clone()));

        // タスク完了の待機
        // PanicするとmainもPanicする
        send_task1.await;
        send_task2.await;
        receive_task1.await;
        receive_task2.await;

        // 保存されたメッセージの出力
        println!("----Result----");
        msg_store.lock().unwrap().iter().for_each(|msg| {
            println!("{}", msg);
        });
    });
}

// 渡されたメッセージを3s周期で送信する
async fn send_task(id: u32, messages: Vec<String>, tx: Sender<String>) {
    for msg in messages.iter() {
        smol::Timer::after(time::Duration::from_secs(3)).await;
        // メッセージの送信
        if let Err(e) = tx.send(msg.to_string()).await {
            println!("[Sender-{}] Send error {}", id, e);
        }
        println!("[Sender-{}] Sended message '{}'", id, msg);
    }
    println!("[Sender-{}] Sended all messages", id);
}

// 受け取ったメッセージを保存する
async fn receive_task(id: u32, rx: Receiver<String>, store: StoreType) {
    while let Ok(v) = rx.recv().await {
        // idによってdelayさせる
        if id % 2 == 1 {
            smol::Timer::after(time::Duration::from_micros(1)).await;
        }
        println!("[Receiver-{}] Got '{}'", id, v);
        // 保存
        store.lock().unwrap().push(v);
    }
}

出力.
Receiver1, Receiver2が同じ値を取得することはない.

[Sender-1] Sended message 'Hello'
[Sender-2] Sended message 'Goodby'
[Receiver-2] Got 'Goodby'
[Receiver-1] Got 'Hello'
[Sender-1] Sended message 'smol'
[Sender-2] Sended message 'async_std'
[Receiver-2] Got 'smol'
[Receiver-2] Got 'async_std'
[Sender-1] Sended message '😽'
[Sender-1] Sended all messages
[Sender-2] Sended message '😢'
[Sender-2] Sended all messages
[Receiver-2] Got '😽'
[Receiver-2] Got '😢'
----Result----
Goodby
Hello
smol
async_std
😽
😢

感想

今までにない、新しい技術の紹介という記事ではなく,既存の内容の調査になってしまいましたが、Tokio以外の世界も見ることができました. (非同期ランタイムの世界は広い...🌎)
Tokioだと最初適当にfeatures = [ "full"] にして大っきなコンパイルが走りますが、(※実際にはfeaturesフラグを適切に選択する必要があります)smolではそんなことはなく、サクサク進む印象でした. そのかわり、色々クレートを追加する面倒さはありますが...
それでも軽量さは魅力的で自身の研究などでマイコンで非同期を扱いたいときは、使っていこうかなとも考えています.

Discussion