🌟

tokioの基本的なフロー制御と資源管理方法(Rust)

2024/04/03に公開


tokioを用いて非同期処理のコードを書く中で、どの書き方がどの制御パターンだったか確認するのが面倒だったため自分用に纏めました。学習は参考文献のサイトが適しているような気がします。

基本的な非同期処理フロー制御

コード全体
use std::future::Future;
use tokio::{time::{sleep, Duration}, spawn, join, select};

#[tokio::main]
async fn main() {
    let (pf1, pf2) = reload_print_func();
    // Parallel run async functions as carefree.
    spawn(pf1);
    spawn(pf2);

    let (sf1, sf2) = reload_string_func();
    // Concurrent run, and sync results of all async funcs.
    let (s1, s2) = join!(sf1, sf2);
    println!("{}\n{}", s1, s2);

    let (sf1, sf2) = reload_string_func();
    // Parallel run, and sync results of all async funcs.
    let h1 = spawn(sf1);
    let h2 = spawn(sf2);
    if let (Ok(s1), Ok(s2)) = join!(h1, h2) {
        println!("{}\n{}", s1, s2);
    }

    let (sf1, sf2) = reload_string_func();
    // Parallel run, and get one result of some async funcs.
    select! {
        s1 = sf1 => { println!("{}", s1); },
        s2 = sf2 => { println!("{}", s2); },
        else => { println!("Any async funcs were not successed."); },
    }

    // Hold until end of async func
    hold_program().await;
    println!("Finish program");
}

async fn delay_print_1000() {
    sleep(Duration::from_millis(1000)).await;
    println!("Hello after 1000");
}
async fn delay_print_2000() {
    sleep(Duration::from_millis(2000)).await;
    println!("Hello after 2000");
}
async fn delay_string_1000() -> String {
    sleep(Duration::from_millis(1000)).await;
    String::from("Hello after 1000")
}
async fn delay_string_2000() -> String {
    sleep(Duration::from_millis(2000)).await;
    String::from("Hello after 2000")
}
async fn hold_program() {
    sleep(Duration::from_millis(5000)).await;
}
fn reload_print_func() -> (impl Future<Output = ()>, impl Future<Output = ()>) {
    (delay_print_1000(), delay_print_2000())
}
fn reload_string_func() -> (impl Future<Output = String>, impl Future<Output = String>) {
    (delay_string_1000(), delay_string_2000())
}
  • 並列処理を実行 (同期なし)
    spawn(pf1);
    spawn(pf2);
    
  • 並行処理を実行して同期
    let (s1, s2) = join!(sf1, sf2);
    println!("{}\n{}", s1, s2);
    
  • 並列処理を実行して同期
    let h1 = spawn(sf1);
    let h2 = spawn(sf2);
    if let (Ok(s1), Ok(s2)) = join!(h1, h2) {
        println!("{}\n{}", s1, s2);
    }
    
  • 並列処理を実行してどれか一つのみ結果取得
    select! {
        s1 = sf1 => { println!("{}", s1); },
        s2 = sf2 => { println!("{}", s2); },
        else => { println!("Any async funcs were not successed."); },
    }
    
    • 並列処理の一つを時間計測にしてタイムアウト処理にも使える

その他

  • #[tokio::main]を変更することでスレッド数を制御できる
    • #[tokio::main(flavor = "current_thread")]
      • メインスレッドのみの指定
      • その処理専用のスレッドを用意するspawn_blockingと併用する
      • 100μsを超えるとわかっている処理はspawn_blockingを使用するのが望ましい(らしい)
    • #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
      • ワーカースレッド数の指定

基本的な資源管理

Mutexを使う

関数を用いる処理

コード全体
use std::sync::{Arc, Mutex};
use tokio::{task::spawn, time::{sleep, Duration}};

#[tokio::main]
async fn main() {
    let share_box1 = Arc::new(Mutex::new(Box::new(0)));
    let share_box2 = share_box1.clone();

    spawn(async move {
        set_int1(&share_box1, 2).await;
        sleep(Duration::from_millis(500)).await;
        set_int1(&share_box1, 3).await;
    });
    spawn(async move {
        set_int1(&share_box2, 4).await;
        set_int1(&share_box2, 5).await;
    });

    // Hold until end of async func
    hold_program().await;
    println!("Finish program");
}

async fn set_int1(share_int: &Arc<Mutex<Box<i32>>>, int: i32) {
    { // If no blocking the codes, compile error due to living of Mutex at next await.
        let mut lock_int = share_int.lock().unwrap();
        **lock_int = int;
    }
    println!("Set value: {}", int);
    sleep(Duration::from_secs(1)).await;
}
async fn hold_program() {
    sleep(Duration::from_millis(5000)).await;
}
  • Mutexに値を設定する関数
    async fn set_int1(share_int: &Arc<Mutex<Box<i32>>>, int: i32) {
        { // If no blocking the codes, compile error due to living of Mutex at next await.
            let mut lock_int = share_int.lock().unwrap();
            **lock_int = int;
        }
        println!("Set value: {}", int);
        sleep(Duration::from_secs(1)).await;
    }
    

構造体を用いる処理

コード全体
use std::sync::{Arc, Mutex};
use tokio::{task::spawn, time::{sleep, Duration}};

struct SharedInt {
    val: Mutex<i32>,
}
impl SharedInt {
    fn set(&self, int: i32) { // "sync" function
        let mut lock = self.val.lock().unwrap();
        *lock = int;
    }
}
#[tokio::main]
async fn main() {
    let share_struct1 = Arc::new(SharedInt{val: Mutex::new(0)});
    let share_struct2 = share_struct1.clone();

    spawn(async move {
        set_int2(&share_struct1, 6).await;
        set_int2(&share_struct1, 7).await;
    });
    spawn(async move {
        set_int2(&share_struct2, 8).await;
        set_int2(&share_struct2, 9).await;
    });

    // Hold until end of async func
    hold_program().await;
    println!("Finish program");
}

async fn set_int2(share_int: &SharedInt, int: i32) {
    share_int.set(int);
    println!("Set value: {}", int);
}
async fn hold_program() {
    sleep(Duration::from_millis(5000)).await;
}
  • Mutexに値を設定する構造体
    struct SharedInt {
        val: Mutex<i32>,
    }
    impl SharedInt {
        fn set(&self, int: i32) { // "sync" function
            let mut lock = self.val.lock().unwrap();
            *lock = int;
        }
    }
    

メッセージを使う

メッセージのみの例

コード全体
use tokio::{task::spawn, time::{sleep, Duration}, sync::{mpsc, mpsc::{Sender, Receiver}}};

#[tokio::main]
async fn main() {
    let (s1, s2, mut r) = get_mpsc_channel_w2sender(8);
    spawn(async move {
        sleep(Duration::from_millis(500)).await;
        let _ = s1.send("sender1 text").await;
    });
    spawn(async move {
        let _ = s2.send("sender2 text").await;
    });

    while let Some(msg) = r.recv().await {
        println!("msg -> {}", msg);
    }
    println!("Finish program");
}

fn get_mpsc_channel_w2sender<T>(buffer: usize) -> (Sender<T>, Sender<T>, Receiver<T>) {
    let (sender1, receiver) = mpsc::channel(buffer);
    let sender2 = sender1.clone();
    (sender1, sender2, receiver)
}
  • SenderとReceiverを用いる
    spawn(async move { let _ = s1.send("sender1 text").await; });
    spawn(async move { let _ = s2.send("sender2 text").await; });
    while let Some(msg) = r.recv().await {
        println!("msg -> {}", msg);
    }
    
    • mpscは Sender:Receiver = N:1 のパターン
      • bufferはキューのバッファ数
    • Receiverは全てのSenderを認識している
    • Receiverは全てのSenderからメッセージを受け取るとNoneを返す

メッセージを用いた資源管理

コード全体
use tokio::{join, sync::{mpsc::{self, Receiver, Sender}, oneshot}, task::spawn};

struct Pack {
    num: i32,
    res: oneshot::Sender<bool>,
}

#[tokio::main]
async fn main() {
    let mut share_int = Box::new(0);
    let (s1, s2, mut r) = get_mpsc_channel_w2sender(16);
    let server = spawn(async move { get_server(&mut r, &mut share_int).await });
    let task1 = spawn(async move { send_msg(s1, 1).await });
    let task2 = spawn(async move { send_msg(s2, 2).await });

    let (_, _) = join!(task1, task2);
    let _ = server.await.unwrap();

    println!("Finish program");
}

fn get_mpsc_channel_w2sender<T>(buffer: usize) -> (Sender<T>, Sender<T>, Receiver<T>) {
    let (sender1, receiver) = mpsc::channel(buffer);
    let sender2 = sender1.clone();
    (sender1, sender2, receiver)
}

async fn get_server(r: &mut Receiver<Pack>, share_int: &mut Box<i32>) {
    while let Some(Pack{num, res}) = (*r).recv().await {
        println!("Set int: {} -> {}", share_int, num);
        **share_int = num;
        let _ = res.send(true); // Ignore error
    }
}

async fn send_msg(s: Sender<Pack>, num: i32) {
    let (res_s, res_r) = oneshot::channel();
    let pack = Pack{num, res: res_s};
    if s.send(pack).await.is_err() {
        eprintln!("Conn shutdown");
        return;
    }
    if res_r.await.unwrap() {
        println!("Success set int.");
    } else {
        println!("Fail set int");
    }
}
  • 結果応答用のチャネルを送信データに含める
    struct Pack {
        num: i32,
        res: oneshot::Sender<bool>,
    }
    async fn get_server(r: &mut Receiver<Pack>, share_int: &mut Box<i32>) {
        while let Some(Pack{num, res}) = (*r).recv().await {
            println!("Set int: {} -> {}", share_int, num);
            **share_int = num;
            let _ = res.send(true); // Ignore error
        }
    }
    async fn send_msg(s: Sender<Pack>, num: i32) {
        let (res_s, res_r) = oneshot::channel();
        let pack = Pack{num, res: res_s};
        if s.send(pack).await.is_err() {
            eprintln!("Conn shutdown");
            return;
        }
        if res_r.await.unwrap() {
            println!("Success set int.");
        } else {
            println!("Fail set int");
        }
    }
    
    • oneshotは Sender:Receiver = 1:1 のパターン
    • 簡単な結果応答にはoneshotを用いる

参考文献

Discussion