tokioで対話型プログラムを取り扱ってみる
この記事はRust Advent Calendar 2023 9日目の記事です。
はじめに
対話型プログラムを使う機会があり、いろいろ試してみて形になったので共有したいと思います。
単にcat
コマンドを使うのでもいいのですが、せっかくなので、今回は対話型のサイコロアプリを作って、それを取り扱うプログラムを作ってみます。
サイコロアプリのソースコード
[package]
name = "dice"
version = "0.1.0"
edition = "2021"
[dependencies]
rand = { version = "0.8.5", features = [] }
use std::io::stdin;
use std::process::exit;
use rand::Rng;
fn main() {
let mut rng = rand::thread_rng();
let mut stats = [0, 0, 0, 0, 0, 0];
loop {
let mut s = String::new();
stdin().read_line(&mut s).expect("input error");
let cmd = s.trim().split(" ").collect::<Vec<_>>();
match cmd.get(0) {
Some(&"roll") => {
let num = match cmd.get(1) {
Some(&n) => n.parse::<u8>().ok(),
None => Some(1)
};
match num {
Some(n) => {
if n < 1 || n > 20 {
println!("invalid num");
continue;
}
let mut values = vec![];
for _i in 0..n {
let r: usize = rng.gen_range(0..6);
stats[r] += 1;
values.push((r+1).to_string());
}
println!("{}", values.join(" "));
},
None => println!("invalid num")
}
},
Some(&"stat") => {
for i in 0..6 {
println!("{}: {}", i+1, stats[i]);
}
},
Some(&"reset") => {
stats = [0, 0, 0, 0, 0, 0];
println!("ok")
},
Some(&"quit") => {
exit(0);
},
_ => println!("invalid command")
}
}
}
コマンド
-
roll [n]
- 6面サイコロを
n
個振る -
n
を省略した場合は1個振る -
n
は最大20まで指定できる - 結果はスペース区切りで1行で返される
- 6面サイコロを
-
stat
- 1~6の出目数を表示する
- 結果は出目ごとに行分けされ、6行分で返される
-
reset
- 出目情報をリセットする
- 結果は
ok
の1行で返される
-
quit
- 終了する
シンプルに使ってみる
tokio::process
を使うことで、非同期処理を前提とした子プロセスを取り扱うことができます。
[package]
name = "use-dice"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.34.0", features = ["full"] }
use tokio::process::Command;
#[tokio::main]
async fn main() {
let mut cmd = Command::new("./dice").spawn().unwrap();
let res = cmd.wait().await;
println!("{}", res.unwrap());
}
特別なことはしていないので、単にサイコロアプリを起動した場合とほぼ同じ挙動です。
標準出力を取り扱ってみる
Command#stdout()
にStdio::piped()
を渡すことで標準出力をプログラムから扱うことができます。
ただし、メインスレッドはChild#wait()
でブロックしているため、tokio::spawn
を使って別スレッドで標準出力を取り扱うようにしています。
+use std::process::Stdio;
+use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[tokio::main]
async fn main() {
- let mut cmd = Command::new("./dice").spawn().unwrap();
+ let mut cmd = Command::new("./dice")
+ .stdout(Stdio::piped())
+ .spawn()
+ .unwrap();
+
+ let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
+ tokio::spawn(async move {
+ while let Ok(Some(text)) = stdout.next_line().await {
+ println!("{}", text);
+ }
+ });
let res = cmd.wait().await;
println!("{}", res.unwrap());
}
これも単にサイコロアプリを起動した場合とほぼ同じ挙動をしますが、表示はprintln!
を経由して行われます。
標準入力を取り扱ってみる
Command#stdin()
にStdio::piped()
を渡すことで標準入力をプログラムから扱うことができます。
use std::process::Stdio;
-use tokio::io::{AsyncBufReadExt, BufReader};
+use std::time::Duration;
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
+use tokio::time::sleep;
#[tokio::main]
async fn main() {
let mut cmd = Command::new("./dice")
+ .stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
tokio::spawn(async move {
while let Ok(Some(text)) = stdout.next_line().await {
println!("{}", text);
}
});
+
+ let mut stdin = cmd.stdin.take().unwrap();
+ tokio::spawn(async move {
+ let _ = stdin.write_all("roll\n".as_bytes()).await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = stdin.write_all("roll 2\n".as_bytes()).await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = stdin.write_all("stat\n".as_bytes()).await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = stdin.write_all("quit\n".as_bytes()).await;
+ sleep(Duration::from_secs(1)).await;
+ });
let res = cmd.wait().await;
println!("{}", res.unwrap());
}
これを実行すると、
- サイコロを1つ振った結果を表示する
- サイコロを2つ振った結果を表示する
- サイコロの出目を表示する
が1秒ごとに行われて終了します。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.78s
Running `target/debug/use-dice`
1
6 5
1: 1
2: 0
3: 0
4: 0
5: 1
6: 1
exit status: 0
ようやく対話型プログラムを取り扱っているような雰囲気になってきました。
構造体にする
ロジックが少々見づらくなってきたので、サイコロアプリの取り扱いを構造体に切り出しておきます。
use std::process::{ExitStatus, Stdio};
use std::time::Duration;
use tokio::io;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let mut dice = Dice::new();
let res = dice.wait().await;
println!("{}", res.unwrap());
}
struct Dice {
cmd: Child
}
impl Dice {
pub fn new() -> Dice {
let mut cmd = Command::new("./dice")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
tokio::spawn(async move {
while let Ok(Some(text)) = stdout.next_line().await {
println!("{}", text);
}
});
let mut stdin = cmd.stdin.take().unwrap();
tokio::spawn(async move {
let _ = stdin.write_all("roll\n".as_bytes()).await;
sleep(Duration::from_secs(1)).await;
let _ = stdin.write_all("roll 2\n".as_bytes()).await;
sleep(Duration::from_secs(1)).await;
let _ = stdin.write_all("stat\n".as_bytes()).await;
sleep(Duration::from_secs(1)).await;
let _ = stdin.write_all("quit\n".as_bytes()).await;
sleep(Duration::from_secs(1)).await;
});
Dice { cmd }
}
pub async fn wait(&mut self) -> io::Result<ExitStatus> {
self.cmd.wait().await
}
}
コマンドを実装してみる
現状では構造体の実装にビジネスロジックが入りすぎており、これではあまりにも再利用性が低いので、サイコロアプリの持つコマンドを取り扱えるようにしてみます。
use std::process::{ExitStatus, Stdio};
+use std::sync::Arc;
use std::time::Duration;
use tokio::io;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
-use tokio::process::{Child, Command};
+use tokio::process::{Child, ChildStdin, Command};
+use tokio::sync::Mutex;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let mut dice = Dice::new();
+
+ let _ = dice.roll(None).await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = dice.roll(Some(2)).await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = dice.stat().await;
+ sleep(Duration::from_secs(1)).await;
+ let _ = dice.quit().await;
+ sleep(Duration::from_secs(1)).await;
let res = dice.wait().await;
println!("{}", res.unwrap());
}
struct Dice {
- cmd: Child
+ cmd: Child,
+ stdin: Arc<Mutex<ChildStdin>>
}
impl Dice {
pub fn new() -> Dice {
let mut cmd = Command::new("./dice")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
tokio::spawn(async move {
while let Ok(Some(text)) = stdout.next_line().await {
println!("{}", text);
}
});
- let mut stdin = cmd.stdin.take().unwrap();
- tokio::spawn(async move {
- let _ = stdin.write_all("roll\n".as_bytes()).await;
- sleep(Duration::from_secs(1)).await;
- let _ = stdin.write_all("roll 2\n".as_bytes()).await;
- sleep(Duration::from_secs(1)).await;
- let _ = stdin.write_all("stat\n".as_bytes()).await;
- sleep(Duration::from_secs(1)).await;
- let _ = stdin.write_all("quit\n".as_bytes()).await;
- sleep(Duration::from_secs(1)).await;
- });
+ let stdin = cmd.stdin.take().unwrap();
+ let stdin = Arc::new(Mutex::new(stdin));
- Dice { cmd }
+ Dice { cmd, stdin }
}
+
+ pub async fn roll(&mut self, num: Option<u8>) {
+ match num {
+ None => self.send("roll".to_string()).await,
+ Some(n) => self.send(format!("roll {n}")).await
+ }
+ }
+
+ pub async fn stat(&mut self) {
+ self.send("stat".to_string()).await;
+ }
+
+ pub async fn reset(&mut self) {
+ self.send("reset".to_string()).await;
+ }
+
+ async fn quit(&mut self) {
+ self.send("quit".to_string()).await;
+ }
+
+ async fn send(&mut self, text: String) {
+ let stdin = self.stdin.clone();
+ tokio::spawn(async move {
+ let mut locked = stdin.lock().await;
+ let _ = locked.write_all(format!("{text}\n").as_bytes()).await;
+ });
+ }
pub async fn wait(&mut self) -> io::Result<ExitStatus> {
self.cmd.wait().await
}
}
対話型プログラムが持つコマンドを関数として定義したことで扱いやすくなっています。
なお、標準入力(ChildStdin
)は、使用の都度、別スレッドにmoveして可変参照として使用するため、Arc<Mutex<T>>
でラップしています。
コマンド実行の結果を受け取るようにする
ところで、本来であれば、こうしたいところです。
let result = dice.roll(None).await;
println!("{}", result);
しかし、今のままではコマンドの結果は常に標準出力に出力されてしまいます。そこで、標準出力に出てきたテキストをコマンド実行の結果として受け取れるようにします。
use std::process::{ExitStatus, Stdio};
use std::sync::Arc;
use std::time::Duration;
use tokio::io;
-use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
-use tokio::process::{Child, ChildStdin, Command};
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
+use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let mut dice = Dice::new();
- let _ = dice.roll(None).await;
+ let result = dice.roll(None).await;
+ println!("{}", result.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(" "));
sleep(Duration::from_secs(1)).await;
- let _ = dice.roll(Some(2)).await;
+ let result = dice.roll(Some(2)).await;
+ println!("{}", result.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(" "));
sleep(Duration::from_secs(1)).await;
- let _ = dice.stat().await;
+ let result = dice.stat().await;
+ println!("{}", result);
sleep(Duration::from_secs(1)).await;
let _ = dice.quit().await;
sleep(Duration::from_secs(1)).await;
let res = dice.wait().await;
println!("{}", res.unwrap());
}
struct Dice {
cmd: Child,
- stdin: Arc<Mutex<ChildStdin>>
+ stdin: Arc<Mutex<ChildStdin>>,
+ stdout: Arc<Mutex<Lines<BufReader<ChildStdout>>>>,
}
impl Dice {
pub fn new() -> Dice {
let mut cmd = Command::new("./dice")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
- tokio::spawn(async move {
- while let Ok(Some(text)) = stdout.next_line().await {
- println!("{}", text);
- }
- });
+ let stdout = Arc::new(Mutex::new(stdout));
let stdin = cmd.stdin.take().unwrap();
let stdin = Arc::new(Mutex::new(stdin));
- Dice { cmd, stdin }
+ Dice { cmd, stdin, stdout }
}
- pub async fn roll(&mut self, num: Option<u8>) {
- match num {
- None => self.send("roll".to_string()).await,
- Some(n) => self.send(format!("roll {n}")).await
- }
+ pub async fn roll(&mut self, num: Option<u8>) -> Vec<u8> {
+ let result = match num {
+ None => self.send("roll".to_string(), 1).await,
+ Some(n) => self.send(format!("roll {n}"), 1).await
+ };
+ result[0].split(" ").map(|s| s.parse::<u8>().unwrap()).collect()
}
- pub async fn stat(&mut self) {
- self.send("stat".to_string()).await;
+ pub async fn stat(&mut self) -> String {
+ let result = self.send("stat".to_string(), 6).await;
+ result.join("\n")
}
pub async fn reset(&mut self) {
- self.send("reset".to_string()).await;
+ self.send("reset".to_string(), 1).await;
}
async fn quit(&mut self) {
- self.send("quit".to_string()).await;
+ self.send("quit".to_string(), 1).await;
}
- async fn send(&mut self, text: String) {
+ async fn send(&mut self, text: String, line_count: usize) -> Vec<String> {
let stdin = self.stdin.clone();
+ let stdout = self.stdout.clone();
+
tokio::spawn(async move {
let mut locked = stdin.lock().await;
let _ = locked.write_all(format!("{text}\n").as_bytes()).await;
});
+
+ tokio::spawn(async move {
+ let mut r = vec![];
+
+ for _i in 0..line_count {
+ let mut locked = stdout.lock().await;
+ match locked.next_line().await {
+ Ok(Some(s)) => r.push(s),
+ _ => {}
+ };
+ }
+
+ r
+ }).await.unwrap()
}
pub async fn wait(&mut self) -> io::Result<ExitStatus> {
self.cmd.wait().await
}
}
要領は「コマンドを実装してみる」のセクションでやったことと同じです。
ただし、標準出力への出力ルールは対話型プログラムに委ねられているため、コマンド実行後にどこまで標準出力を取得したら打ち切るかのロジックが難しく、ここをきちんと作る難易度は高めです。
今回のサイコロアプリはコマンドごとに行数が決まっていたので、コマンド実行後に一定の行数のデータを取得したら打ち切るという方法にしています。
他にも
- 特徴的な文字(例えば
END
など)を検知したら打ち切る - 特徴的な終端行(例えば空行など)を検知したら打ち切る
- 一定時間出力されなかったら打ち切る
- 一定の文字数を取得したら打ち切る
などの方法が考えられます。
また、本来はエラー出力についても考慮すべきですが、要領は同じなので、今回はそこまではやっていません。
おわりに
今回はtokioを使って対話型プログラムを取り扱ってみました。
子プロセスを使うようなプログラムを実装する機会は多くないかも知れませんが、一例として参考にしていただければ幸いです。
Discussion