📜

Rustでユーザー入力を受けつつログも流す

に公開

イベント駆動で色々するものを書いていたが、金の掛かるAPI(OpenAI API)だけ確認を入れたかったので。

やりたいこと

  • マルチスレッドのプログラムで一部にステップ実行を入れたい
  • 普通に標準入出力だとプロンプト(入力を促すcontinue? [y/n] みたいなの)が流されてしまって嫌
  • プロンプトは固定表示してログは別で流したい

コード例

tui-loggerが丁度良いので、大袈裟な気もするがratatuiでTUIを作る。

[dependencies]
tokio = { version = "1.45.0", features = ["full"] }
color-eyre = "0.6.3"
crossterm = "0.29.0"
ratatui = "0.29.0"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
tui-logger = { version = "0.17.1", features = ["tracing-support"] }

標準入出力の場合

簡単な例として定期的にイベントを発行するtickerと、それを受け取って処理するprocessorを考える。

use std::{
    error::Error,
    io,
    time::Duration,
};

use tokio::{select, sync::mpsc::{self, Receiver, Sender}, task, time};
use tracing::{error, info};

const CHANNEL_BUFFER_SIZE: usize = 16;

struct Event {
    n: u32,
}

async fn ticker(sender: Sender<Event>) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut interval = time::interval(Duration::from_secs(10));

    for n in 1.. {
        interval.tick().await;
        sender.send(Event { n }).await?;
        info!(n = n, "event sent");
    }

    Ok(())
}

async fn processor(mut receiver: Receiver<Event>) -> Result<(), Box<dyn Error + Send + Sync>> {
    while let Some(event) = receiver.recv().await {
        info!(n = event.n, "event received");

        let mut dummy = String::new();
        println!("press enter to continue...");
        io::stdin().read_line(&mut dummy)?;

        let mut result = 1;
        for i in 2..event.n {
            result *= i;
        }
        info!(result = result, "task finished");
    }
    Ok(())
}

async fn app() -> Result<(), Box<dyn Error + Send + Sync>> {
    let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);

    let ticker = task::spawn(async move { ticker(sender).await });
    let processor = task::spawn(async move { processor(receiver).await });

    Ok(select! {
        result = ticker => {
            result??
        }
        result = processor => {
            result??
        }
    })
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();
    if let Err(e) = app().await {
        error!(e)
    }
}

このコードの場合はどんどん新しいイベントが溜まるからtickerがprocessorの処理完了イベントか何かを待つべきだが、実際には更に別のスレッドが色々いて出力が流れていくような状況を考えている。

ratatui + tui-logger

これにratatuiの仕組みを入れると以下のようになる。

use std::{error::Error, time::Duration};

use crossterm::event::{self, KeyCode};
use ratatui::{
    DefaultTerminal, Frame,
    layout::{Constraint, Layout},
};
use tokio::{
    select,
    sync::mpsc::{self, Receiver, Sender},
    task::{self, yield_now},
    time,
};
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

const CHANNEL_BUFFER_SIZE: usize = 16;
const RENDER_INTERVAL: u64 = 20;

struct Event {
    n: u32,
}

enum Signal {
    Continue,
}

async fn ticker(sender: Sender<Event>) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut interval = time::interval(Duration::from_secs(10));

    for n in 1.. {
        interval.tick().await;
        sender.send(Event { n }).await?;
        info!(n = n, "event sent");
    }

    Ok(())
}

async fn processor(
    mut receiver: Receiver<Event>,
    mut signal_receiver: Receiver<Signal>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
    while let Some(event) = receiver.recv().await {
        info!(n = event.n, "event received");

        match signal_receiver.recv().await {
            Some(_) => (),
            None => break,
        }

        let mut result = 1;
        for i in 2..event.n {
            result *= i;
        }
        info!(result = result, "task finished");
    }
    Ok(())
}

struct InteractiveApp {
    signal_sender: mpsc::Sender<Signal>,
}

impl InteractiveApp {
    pub async fn run(
        &mut self,
        terminal: &mut DefaultTerminal,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
        loop {
            terminal.draw(|frame| self.render(frame))?;
            if event::poll(Duration::from_millis(RENDER_INTERVAL))? {
                match event::read()? {
                    crossterm::event::Event::Key(key) => match key.code {
                        KeyCode::Esc => {
                            break Ok(())
                        }
                        KeyCode::Enter => {
                            self.signal_sender.send(Signal::Continue).await?;
                        }
                        _ => {}
                    },
                    _ => {}
                }
            }
            // Required to be cancelled by select!
            yield_now().await;
        }
    }

    fn render(&self, frame: &mut Frame) {
        let [logs, prompt] =
            Layout::vertical([Constraint::Fill(1), Constraint::Length(1)]).areas(frame.area());

        frame.render_widget(tui_logger::TuiLoggerWidget::default(), logs);
        frame.render_widget("[esc] exit [enter] continue", prompt);
    }
}

async fn app() -> Result<(), Box<dyn Error + Send + Sync>> {
    tracing_subscriber::registry()
        .with(tui_logger::TuiTracingSubscriberLayer {})
        .init();
    tui_logger::init_logger(tui_logger::LevelFilter::Debug)?;

    let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
    let (signal_sender, signal_receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);

    let ticker = task::spawn(async move { ticker(sender).await });
    let processor = task::spawn(async move { processor(receiver, signal_receiver).await });

    color_eyre::install()?;
    let mut terminal = ratatui::init();
    let mut interactive = InteractiveApp { signal_sender };

    let result = select! {
        result = interactive.run(&mut terminal) => {
            result?
        }
        result = ticker => {
            result??
        }
        result = processor => {
            result??
        }
    };

    ratatui::restore();
    Ok(result)
}

#[tokio::main]
async fn main() {
    if let Err(e) = app().await {
        error!(e)
    }
}

これでTuiLoggerWidget部分にログを流しつつ、ターミナル下部に[esc] exit [enter] continueのプロンプトを固定することができる。(Ctrl+Cが効かなくなるので終了手段が必要になる。)

ターミナル表示

processorが待ち状態の場合のみ[enter] continueの部分を表示したりすると更に親切だろう。

Discussion