🌟
tokioの基本的なフロー制御と資源管理方法(Rust)
tokioを用いて非同期処理のコードを書く中で、どの書き方がどの制御パターンだったか確認するのが面倒だったため自分用に纏めました。学習は参考文献のサイトが適しているような気がします。
基本的な非同期処理フロー制御
コード全体
use std::future::Future;
use tokio::{time::{sleep, Duration}, spawn, join, select};
#[tokio::main]
async fn main() {
let (pf1, pf2) = reload_print_func();
// Parallel run async functions as carefree.
spawn(pf1);
spawn(pf2);
let (sf1, sf2) = reload_string_func();
// Concurrent run, and sync results of all async funcs.
let (s1, s2) = join!(sf1, sf2);
println!("{}\n{}", s1, s2);
let (sf1, sf2) = reload_string_func();
// Parallel run, and sync results of all async funcs.
let h1 = spawn(sf1);
let h2 = spawn(sf2);
if let (Ok(s1), Ok(s2)) = join!(h1, h2) {
println!("{}\n{}", s1, s2);
}
let (sf1, sf2) = reload_string_func();
// Parallel run, and get one result of some async funcs.
select! {
s1 = sf1 => { println!("{}", s1); },
s2 = sf2 => { println!("{}", s2); },
else => { println!("Any async funcs were not successed."); },
}
// Hold until end of async func
hold_program().await;
println!("Finish program");
}
async fn delay_print_1000() {
sleep(Duration::from_millis(1000)).await;
println!("Hello after 1000");
}
async fn delay_print_2000() {
sleep(Duration::from_millis(2000)).await;
println!("Hello after 2000");
}
async fn delay_string_1000() -> String {
sleep(Duration::from_millis(1000)).await;
String::from("Hello after 1000")
}
async fn delay_string_2000() -> String {
sleep(Duration::from_millis(2000)).await;
String::from("Hello after 2000")
}
async fn hold_program() {
sleep(Duration::from_millis(5000)).await;
}
fn reload_print_func() -> (impl Future<Output = ()>, impl Future<Output = ()>) {
(delay_print_1000(), delay_print_2000())
}
fn reload_string_func() -> (impl Future<Output = String>, impl Future<Output = String>) {
(delay_string_1000(), delay_string_2000())
}
- 並列処理を実行 (同期なし)
spawn(pf1); spawn(pf2);
- 並行処理を実行して同期
let (s1, s2) = join!(sf1, sf2); println!("{}\n{}", s1, s2);
- 並列処理を実行して同期
let h1 = spawn(sf1); let h2 = spawn(sf2); if let (Ok(s1), Ok(s2)) = join!(h1, h2) { println!("{}\n{}", s1, s2); }
- 並列処理を実行してどれか一つのみ結果取得
select! { s1 = sf1 => { println!("{}", s1); }, s2 = sf2 => { println!("{}", s2); }, else => { println!("Any async funcs were not successed."); }, }
- 並列処理の一つを時間計測にしてタイムアウト処理にも使える
その他
-
#[tokio::main]
を変更することでスレッド数を制御できる-
#[tokio::main(flavor = "current_thread")]
- メインスレッドのみの指定
- その処理専用のスレッドを用意する
spawn_blocking
と併用する - 100μsを超えるとわかっている処理は
spawn_blocking
を使用するのが望ましい(らしい)
-
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
- ワーカースレッド数の指定
-
基本的な資源管理
Mutexを使う
関数を用いる処理
コード全体
use std::sync::{Arc, Mutex};
use tokio::{task::spawn, time::{sleep, Duration}};
#[tokio::main]
async fn main() {
let share_box1 = Arc::new(Mutex::new(Box::new(0)));
let share_box2 = share_box1.clone();
spawn(async move {
set_int1(&share_box1, 2).await;
sleep(Duration::from_millis(500)).await;
set_int1(&share_box1, 3).await;
});
spawn(async move {
set_int1(&share_box2, 4).await;
set_int1(&share_box2, 5).await;
});
// Hold until end of async func
hold_program().await;
println!("Finish program");
}
async fn set_int1(share_int: &Arc<Mutex<Box<i32>>>, int: i32) {
{ // If no blocking the codes, compile error due to living of Mutex at next await.
let mut lock_int = share_int.lock().unwrap();
**lock_int = int;
}
println!("Set value: {}", int);
sleep(Duration::from_secs(1)).await;
}
async fn hold_program() {
sleep(Duration::from_millis(5000)).await;
}
- Mutexに値を設定する関数
async fn set_int1(share_int: &Arc<Mutex<Box<i32>>>, int: i32) { { // If no blocking the codes, compile error due to living of Mutex at next await. let mut lock_int = share_int.lock().unwrap(); **lock_int = int; } println!("Set value: {}", int); sleep(Duration::from_secs(1)).await; }
構造体を用いる処理
コード全体
use std::sync::{Arc, Mutex};
use tokio::{task::spawn, time::{sleep, Duration}};
struct SharedInt {
val: Mutex<i32>,
}
impl SharedInt {
fn set(&self, int: i32) { // "sync" function
let mut lock = self.val.lock().unwrap();
*lock = int;
}
}
#[tokio::main]
async fn main() {
let share_struct1 = Arc::new(SharedInt{val: Mutex::new(0)});
let share_struct2 = share_struct1.clone();
spawn(async move {
set_int2(&share_struct1, 6).await;
set_int2(&share_struct1, 7).await;
});
spawn(async move {
set_int2(&share_struct2, 8).await;
set_int2(&share_struct2, 9).await;
});
// Hold until end of async func
hold_program().await;
println!("Finish program");
}
async fn set_int2(share_int: &SharedInt, int: i32) {
share_int.set(int);
println!("Set value: {}", int);
}
async fn hold_program() {
sleep(Duration::from_millis(5000)).await;
}
- Mutexに値を設定する構造体
struct SharedInt { val: Mutex<i32>, } impl SharedInt { fn set(&self, int: i32) { // "sync" function let mut lock = self.val.lock().unwrap(); *lock = int; } }
メッセージを使う
メッセージのみの例
コード全体
use tokio::{task::spawn, time::{sleep, Duration}, sync::{mpsc, mpsc::{Sender, Receiver}}};
#[tokio::main]
async fn main() {
let (s1, s2, mut r) = get_mpsc_channel_w2sender(8);
spawn(async move {
sleep(Duration::from_millis(500)).await;
let _ = s1.send("sender1 text").await;
});
spawn(async move {
let _ = s2.send("sender2 text").await;
});
while let Some(msg) = r.recv().await {
println!("msg -> {}", msg);
}
println!("Finish program");
}
fn get_mpsc_channel_w2sender<T>(buffer: usize) -> (Sender<T>, Sender<T>, Receiver<T>) {
let (sender1, receiver) = mpsc::channel(buffer);
let sender2 = sender1.clone();
(sender1, sender2, receiver)
}
- SenderとReceiverを用いる
spawn(async move { let _ = s1.send("sender1 text").await; }); spawn(async move { let _ = s2.send("sender2 text").await; }); while let Some(msg) = r.recv().await { println!("msg -> {}", msg); }
- mpscは Sender:Receiver = N:1 のパターン
- bufferはキューのバッファ数
- Receiverは全てのSenderを認識している
- Receiverは全てのSenderからメッセージを受け取るとNoneを返す
- mpscは Sender:Receiver = N:1 のパターン
メッセージを用いた資源管理
コード全体
use tokio::{join, sync::{mpsc::{self, Receiver, Sender}, oneshot}, task::spawn};
struct Pack {
num: i32,
res: oneshot::Sender<bool>,
}
#[tokio::main]
async fn main() {
let mut share_int = Box::new(0);
let (s1, s2, mut r) = get_mpsc_channel_w2sender(16);
let server = spawn(async move { get_server(&mut r, &mut share_int).await });
let task1 = spawn(async move { send_msg(s1, 1).await });
let task2 = spawn(async move { send_msg(s2, 2).await });
let (_, _) = join!(task1, task2);
let _ = server.await.unwrap();
println!("Finish program");
}
fn get_mpsc_channel_w2sender<T>(buffer: usize) -> (Sender<T>, Sender<T>, Receiver<T>) {
let (sender1, receiver) = mpsc::channel(buffer);
let sender2 = sender1.clone();
(sender1, sender2, receiver)
}
async fn get_server(r: &mut Receiver<Pack>, share_int: &mut Box<i32>) {
while let Some(Pack{num, res}) = (*r).recv().await {
println!("Set int: {} -> {}", share_int, num);
**share_int = num;
let _ = res.send(true); // Ignore error
}
}
async fn send_msg(s: Sender<Pack>, num: i32) {
let (res_s, res_r) = oneshot::channel();
let pack = Pack{num, res: res_s};
if s.send(pack).await.is_err() {
eprintln!("Conn shutdown");
return;
}
if res_r.await.unwrap() {
println!("Success set int.");
} else {
println!("Fail set int");
}
}
- 結果応答用のチャネルを送信データに含める
struct Pack { num: i32, res: oneshot::Sender<bool>, } async fn get_server(r: &mut Receiver<Pack>, share_int: &mut Box<i32>) { while let Some(Pack{num, res}) = (*r).recv().await { println!("Set int: {} -> {}", share_int, num); **share_int = num; let _ = res.send(true); // Ignore error } } async fn send_msg(s: Sender<Pack>, num: i32) { let (res_s, res_r) = oneshot::channel(); let pack = Pack{num, res: res_s}; if s.send(pack).await.is_err() { eprintln!("Conn shutdown"); return; } if res_r.await.unwrap() { println!("Success set int."); } else { println!("Fail set int"); } }
- oneshotは Sender:Receiver = 1:1 のパターン
- 簡単な結果応答にはoneshotを用いる
Discussion