😎

デスクトップアプリとして動かしたRustサーバーのログをCloudWatchに送りたい

2022/12/21に公開

Rust Advent Calendar 2022 カレンダー 2 の 21 日目の記事です。

Electron の裏側として起動した Rust のサーバーのログを CloudWatch にどうにかして送ろうと思ったときに、ログをカスタマイズしないといけなくなってしまったのでメモとして残しておきます。

前提条件

  • デスクトップアプリ側から、 AWS の credential を読み込んでCloudWatchのログを送る

なので、Lambda などのように自動的に CloudWatch にログを送る仕組みを作らなくてはならない。

使用クレート

コード

以下は実際に動かしてるものではなく、今回のためのサンプルのコードになります
ログ

use again::RetryPolicy;
use chrono::Utc;

use log::Level;
use pretty_env_logger::env_logger::fmt::{Color, Style, StyledValue};
use rusoto_logs::{
    CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsRequest, InputLogEvent,
    PutLogEventsRequest,
};
use std::fmt;
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

static MAX_MODULE_WIDTH: AtomicUsize = AtomicUsize::new(0);

const LOG_GROUP_NAME: &str = "test-group";
const LOG_STREAM_NAME: &str = "test-stream";

fn max_target_width(target: &str) -> usize {
    let max_width = MAX_MODULE_WIDTH.load(Ordering::Relaxed);
    if max_width < target.len() {
        MAX_MODULE_WIDTH.store(target.len(), Ordering::Relaxed);
        target.len()
    } else {
        max_width
    }
}

fn colored_level<'a>(style: &'a mut Style, level: Level) -> StyledValue<'a, &'static str> {
    match level {
        Level::Trace => style.set_color(Color::Magenta).value("TRACE"),
        Level::Debug => style.set_color(Color::Blue).value("DEBUG"),
        Level::Info => style.set_color(Color::Green).value("INFO "),
        Level::Warn => style.set_color(Color::Yellow).value("WARN "),
        Level::Error => style.set_color(Color::Red).value("ERROR"),
    }
}

struct Padded<T> {
    value: T,
    width: usize,
}

impl<T: fmt::Display> fmt::Display for Padded<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{: <width$}", self.value, width = self.width)
    }
}

pub struct CloudWatchLogger {
    client: CloudWatchLogsClient,
}

async fn send(client: CloudWatchLogsClient, message: String) {
    let timestamp = Utc::now().timestamp_millis();

    let policy = RetryPolicy::exponential(Duration::from_secs(1))
        .with_jitter(false)
        .with_max_delay(Duration::from_secs(60))
        .with_max_retries(4);

    policy
        .retry(|| async {
            let mut desc_streams_req: DescribeLogStreamsRequest = Default::default();
            desc_streams_req.log_group_name = LOG_GROUP_NAME.to_string();

            let p = RetryPolicy::exponential(Duration::from_secs(1))
                .with_jitter(false)
                .with_max_delay(Duration::from_secs(60))
                .with_max_retries(10);

            let log_streams = p
                .retry(|| async {
                    let streams_resp = client.describe_log_streams(desc_streams_req.clone()).await;
                    return if streams_resp.is_err() {
                        Err("throttle error")
                    } else {
                        Ok(streams_resp.unwrap())
                    };
                })
                .await;

            let log_streams = log_streams.unwrap().log_streams.unwrap();

            let stream = &log_streams
                .iter()
                .find(|s| s.log_stream_name == Some(LOG_STREAM_NAME.to_string()))
                .unwrap();
            let sequence_token = stream.upload_sequence_token.clone();
            let input_log_event = InputLogEvent {
                message: message.clone(),
                timestamp,
            };
            let put_log_events_request = PutLogEventsRequest {
                log_events: vec![input_log_event],
                log_group_name: LOG_GROUP_NAME.to_string(),
                log_stream_name: LOG_STREAM_NAME.to_string(),
                sequence_token: sequence_token.clone(),
            };
            client.put_log_events(put_log_events_request).await
        })
        .await
        .expect("should send log");
}

impl CloudWatchLogger {
    pub fn new(client: CloudWatchLogsClient) -> Self {
        Self { client }
    }
    pub fn build(&self) -> pretty_env_logger::env_logger::Logger {
        let client = self.client.clone();
        let mut logger = pretty_env_logger::formatted_builder();

        let builder = logger.format(move |f, record| {
            let target = record.target();
            let max_width = max_target_width(target);
            let mut style = f.style();
            let level = colored_level(&mut style, record.level());
            let mut style = f.style();
            let target = style.set_bold(true).value(Padded {
                value: target,
                width: max_width,
            });
            let message = format!(" {} {} > {}", level, target, record.args());
            tokio::spawn(send(client.clone(), message));

            writeln!(f, " {} {} > {}", level, target, record.args(),)
        });

        if let Ok(s) = std::env::var("RUST_LOG") {
            builder.parse_filters(&s);
        }

        builder.build()
    }
}

サーバー

mod cloud_watch_logger;

use axum::{response::Html, routing::get, Router};

use std::net::SocketAddr;

use rusoto_core::Region;
use rusoto_iam::{GetUserRequest, Iam, IamClient};
use rusoto_logs::CloudWatchLogsClient;

use crate::cloud_watch_logger::CloudWatchLogger;

#[macro_use]
extern crate log;

#[tokio::main]
async fn main() {
    std::env::set_var("RUST_LOG", "info");
    let iam_client = IamClient::new(Region::UsEast1);

    let client = CloudWatchLogsClient::new(rusoto_signature::region::Region::ApNortheast1);
    let logger = CloudWatchLogger::new(client);

    log::set_boxed_logger(Box::new(logger.build())).unwrap();
    let filter = log::LevelFilter::Info;
    log::set_max_level(filter);

    let user = iam_client
        .get_user(GetUserRequest { user_name: None })
        .await
        .expect("should get user");

    info!("{:?}", user.user.user_name);

    let app = Router::new().route("/", get(handler));

    let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
    println!("listening on {}", addr);
    trace!("Commencing yak shaving");

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn handler() -> Html<&'static str> {
    debug!("aaa");
    trace!("trace");
    info!("Info");
    warn!("Warn");
    error!("error");
    println!("printIn");
    Html("<h1>Hello, World!</h1>")
}

コードの解説

ログ側のコードは、 pretty-env-logger のコードを参考に、必要なものはインポートし、エクスポートされていないものはコピペしています。(colored_levelPadded など)
ログを書き込む処理のところに tokio::spawn で CloudWatch に非同期で処理を待たずに送る処理を呼んでいます。
CloudWatch はたまに上限エラーになるため、 againで成功するまで事項するようにしています。

サーバー側は今回は axumで簡単にサーバーを建てて、カスタマイズされたログを log::set_boxed_logger で呼び出すようにしています。

以下はサンプルのリポジトリになります。
https://github.com/tanshio/rust-cloudwatch-example

ログが動いている様子

axumで動かしたサーバー
サーバーにアクセス

ターミナルのログ
アクセスがあると指定していたレベルでログがコンソールに吐き出されます。

CloudWatchのログ

無事CloudWatchにログが送れました!


今回のようなケースはあまりないとは思いますが、誰かの役に立てば幸いです。
tokio::spawnsend の箇所がなんだかキレイじゃない気がするので、他にいいやり方を知っている方は教えてもらえると嬉しいです!

Discussion