[Rust]Try smol ! Task + mpmc channel
目的
async_stdの開発サポートが終了し,smolへの移行が推奨されている.smolの雰囲気を知るために,非同期タスクとchannel周りを見てみる.
(基本的にTokioしか使ってこなかったためasync_stdをあまり知らず,かつasync_stdはsmolを内部で使用しているため、smol特有のものではないかもしれない..🙇♂️)
Main
Tokio
Tokio
では#[tokio::main]
アトリビュートでmain関数をasyncにしている.
内部ではblock_onをしている.
#[tokio::main]
async fn main() {
// async task
}
smol(block_on)
smolクレート単体ではtokio::main
のようなマクロは存在せず、block_on
を呼び出す必要がある.
fn main() {
smol::block_on(async move {
// async task
}
}
smol(手続き型)
マクロを使用したい場合はsmol_macros
を使用する必要がある.
use smol_macros::main;
main! {
async fn main() {
// async task
}
}
smol(宣言的)
これでも違和感があり,手続き型ではなく,宣言的にしたい場合は更にmacro_rules_attribute
クレートを導入する必要がある.
use macro_rules_attribute::apply;
use smol_macros::main;
#[apply(main!)]
async fn main() {
// async task
}
Tasks
Task概要
smolで提供されている関数はsmol::spawn
とsmol::unblock
.
スレッドプール周りとpanicした時の挙動がTokio
とは異なる.
spawn
1. single / multi
ドキュメントによるとデフォルトでsingle threadになっている.
Tokioはデフォルトではマルチスレッドという違いがあり,
#[tokio::main(worker_threads = 1)]
に近い.
2. Task / JoinHandle
smolの定義はこの様になっている.
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> Task<T>
一方、Tokioは以下のようになっている.
pub fn spawn<F>(future: F) -> JoinHandle<F::Output> ⓘ
where
F: Future + Send + 'static,
F::Output: Send + 'static,
TokioではタスクがpanicしたらJoinHandleをawaitした際にResult型として伝播することができ,プロセスは停止しない.
smolではタスクがpanicしたらプロセスごと停止する.
そのためTask
の値を取得するのに非同期タスクのエラーハンドリングを必要としない.
例えば戻り値がResult<String>
のとき
Tokio
let res: String = tokio_task.await.unwrap().unwrap();
// ------ -------
// | |
// JoinHandle |
// Result<T>
smol
let res: String = smol_task.await.unwrap();
// ------
// |
// Result<T>
これに関してはなかなか癖があると感じました.
unblock
これはブロッキング処理を行うために専用のスレッドプールを作成してオフロードするための関数.
tokio::task::spawn_blocking
に近い.
Examples
use std::time;
fn main() {
smol::block_on(async move {
// ===spawn async task====
// 出力までに3sかかる
let task1 = smol::spawn(task());
let task2 = smol::spawn(task());
let res1 = task1.await;
let res2 = task2.await;
println!("{}:{}", res1, res2);
// ====spawn blocking task====
// ここでは同期スリープしているが、CPUバウンドな重い計算などが相当
// 出力に6s程度かかる
// これはスレッドを掴んだまま、譲らないため
let task1 = smol::spawn(task_blocking());
let task2 = smol::spawn(task_blocking());
let res1 = task1.await;
let res2 = task2.await;
println!("{}:{}", res1, res2);
// ====unblock blocking task====
// 出力に3s程度
// 別スレッドプールにオフロードする
// tokio::task::spawn_blocking()に近い
// クロージャを渡す
let task1 = smol::unblock(sync_blocking);
let task2 = smol::unblock(sync_blocking);
let res1 = task1.await;
let res2 = task2.await;
println!("{}:{}", res1, res2);
});
}
async fn task() -> String {
smol::Timer::after(time::Duration::from_secs(3)).await;
"Hello".to_string()
}
async fn task_blocking() -> String {
std::thread::sleep(time::Duration::from_secs(3));
"Hello".to_string()
}
fn sync_blocking() -> String {
std::thread::sleep(time::Duration::from_secs(3));
"Hello".to_string()
}
Note
マルチスレッドにすることもでき,その場合はExecuter
を使用する.
use async_executor::Executor;
let ex = Executor::new();
let task = ex.spawn(async {
println!("Hello world");
});
mpmc channel
mpmc概要
提供されているchannelはmulti-producer multi-consumer
なmpmc channel
.
そのためSender
とReceiver
それぞれにClone
トレイトが実装されている.
これに関してはasync_std
と一緒.
作成にはunboud
とboud
があり、キューに書き込める数を制限するかどうかで使い分ける.
use async_channel::{bounded, TryRecvError, TrySendError};
let (s, r) = bounded(1);
assert_eq!(s.send(10).await, Ok(()));
assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
assert_eq!(r.recv().await, Ok(10));
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
値自体は、異なる非同期タスクのすべてが受信できるわけではなく,どれかのタスク1つだけが受信(消費)できる.
crossbeam_channel
に近い.
リバースプロキシの実装のような感じでしょうか...
Example
データを送信する非同期タスクとデータを受信して、キューにpushする受信タスクがそれぞれ2つ存在する.
送信タスクは3 s周期で送る. また、受信タスクはid1番は1 ms delayし,わざと複数のタスクが値を消費できるようにしている.
use std::{
sync::{Arc, Mutex},
time,
};
use smol::channel::{self, Receiver, Sender};
type StoreType = Arc<Mutex<Vec<String>>>;
fn main() {
let (tx, rx) = channel::unbounded();
smol::block_on(async move {
// メッセージの作成
let messages: Vec<String> = ["Hello", "smol", "😽"]
.iter()
.map(|v| v.to_string())
.collect();
let messages2: Vec<String> = ["Goodby", "async_std", "😢"]
.iter()
.map(|v| v.to_string())
.collect();
// メッセージを保存するためのスタック
// ここではstd::sync::Mutexを使っている
let msg_store: StoreType = Arc::default();
// Sender Tasks
let send_task1 = smol::spawn(send_task(1, messages, tx.clone()));
let send_task2 = smol::spawn(send_task(2, messages2, tx));
// Receiver Tasks
let receive_task1 = smol::spawn(receive_task(1, rx.clone(), msg_store.clone()));
let receive_task2 = smol::spawn(receive_task(2, rx.clone(), msg_store.clone()));
// タスク完了の待機
// PanicするとmainもPanicする
send_task1.await;
send_task2.await;
receive_task1.await;
receive_task2.await;
// 保存されたメッセージの出力
println!("----Result----");
msg_store.lock().unwrap().iter().for_each(|msg| {
println!("{}", msg);
});
});
}
// 渡されたメッセージを3s周期で送信する
async fn send_task(id: u32, messages: Vec<String>, tx: Sender<String>) {
for msg in messages.iter() {
smol::Timer::after(time::Duration::from_secs(3)).await;
// メッセージの送信
if let Err(e) = tx.send(msg.to_string()).await {
println!("[Sender-{}] Send error {}", id, e);
}
println!("[Sender-{}] Sended message '{}'", id, msg);
}
println!("[Sender-{}] Sended all messages", id);
}
// 受け取ったメッセージを保存する
async fn receive_task(id: u32, rx: Receiver<String>, store: StoreType) {
while let Ok(v) = rx.recv().await {
// idによってdelayさせる
if id % 2 == 1 {
smol::Timer::after(time::Duration::from_micros(1)).await;
}
println!("[Receiver-{}] Got '{}'", id, v);
// 保存
store.lock().unwrap().push(v);
}
}
出力.
Receiver1, Receiver2が同じ値を取得することはない.
[Sender-1] Sended message 'Hello'
[Sender-2] Sended message 'Goodby'
[Receiver-2] Got 'Goodby'
[Receiver-1] Got 'Hello'
[Sender-1] Sended message 'smol'
[Sender-2] Sended message 'async_std'
[Receiver-2] Got 'smol'
[Receiver-2] Got 'async_std'
[Sender-1] Sended message '😽'
[Sender-1] Sended all messages
[Sender-2] Sended message '😢'
[Sender-2] Sended all messages
[Receiver-2] Got '😽'
[Receiver-2] Got '😢'
----Result----
Goodby
Hello
smol
async_std
😽
😢
感想
今までにない、新しい技術の紹介という記事ではなく,既存の内容の調査になってしまいましたが、Tokio以外の世界も見ることができました. (非同期ランタイムの世界は広い...🌎)
Tokioだと最初適当にfeatures = [ "full"]
にして大っきなコンパイルが走りますが、(※実際にはfeaturesフラグを適切に選択する必要があります)smolではそんなことはなく、サクサク進む印象でした. そのかわり、色々クレートを追加する面倒さはありますが...
それでも軽量さは魅力的で自身の研究などでマイコンで非同期を扱いたいときは、使っていこうかなとも考えています.
Discussion