😎
デスクトップアプリとして動かしたRustサーバーのログをCloudWatchに送りたい
Rust Advent Calendar 2022 カレンダー 2 の 21 日目の記事です。
Electron の裏側として起動した Rust のサーバーのログを CloudWatch にどうにかして送ろうと思ったときに、ログをカスタマイズしないといけなくなってしまったのでメモとして残しておきます。
前提条件
- デスクトップアプリ側から、 AWS の credential を読み込んでCloudWatchのログを送る
なので、Lambda などのように自動的に CloudWatch にログを送る仕組みを作らなくてはならない。
使用クレート
- log
- env_logger log をラップしたもの
- pretty-env-logger (env_logger をラップし、いい感じに着色してくれる)
- rusoto AWS SDK
- again 成功するまで実行
コード
以下は実際に動かしてるものではなく、今回のためのサンプルのコードになります
ログ
use again::RetryPolicy;
use chrono::Utc;
use log::Level;
use pretty_env_logger::env_logger::fmt::{Color, Style, StyledValue};
use rusoto_logs::{
CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsRequest, InputLogEvent,
PutLogEventsRequest,
};
use std::fmt;
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
static MAX_MODULE_WIDTH: AtomicUsize = AtomicUsize::new(0);
const LOG_GROUP_NAME: &str = "test-group";
const LOG_STREAM_NAME: &str = "test-stream";
fn max_target_width(target: &str) -> usize {
let max_width = MAX_MODULE_WIDTH.load(Ordering::Relaxed);
if max_width < target.len() {
MAX_MODULE_WIDTH.store(target.len(), Ordering::Relaxed);
target.len()
} else {
max_width
}
}
fn colored_level<'a>(style: &'a mut Style, level: Level) -> StyledValue<'a, &'static str> {
match level {
Level::Trace => style.set_color(Color::Magenta).value("TRACE"),
Level::Debug => style.set_color(Color::Blue).value("DEBUG"),
Level::Info => style.set_color(Color::Green).value("INFO "),
Level::Warn => style.set_color(Color::Yellow).value("WARN "),
Level::Error => style.set_color(Color::Red).value("ERROR"),
}
}
struct Padded<T> {
value: T,
width: usize,
}
impl<T: fmt::Display> fmt::Display for Padded<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{: <width$}", self.value, width = self.width)
}
}
pub struct CloudWatchLogger {
client: CloudWatchLogsClient,
}
async fn send(client: CloudWatchLogsClient, message: String) {
let timestamp = Utc::now().timestamp_millis();
let policy = RetryPolicy::exponential(Duration::from_secs(1))
.with_jitter(false)
.with_max_delay(Duration::from_secs(60))
.with_max_retries(4);
policy
.retry(|| async {
let mut desc_streams_req: DescribeLogStreamsRequest = Default::default();
desc_streams_req.log_group_name = LOG_GROUP_NAME.to_string();
let p = RetryPolicy::exponential(Duration::from_secs(1))
.with_jitter(false)
.with_max_delay(Duration::from_secs(60))
.with_max_retries(10);
let log_streams = p
.retry(|| async {
let streams_resp = client.describe_log_streams(desc_streams_req.clone()).await;
return if streams_resp.is_err() {
Err("throttle error")
} else {
Ok(streams_resp.unwrap())
};
})
.await;
let log_streams = log_streams.unwrap().log_streams.unwrap();
let stream = &log_streams
.iter()
.find(|s| s.log_stream_name == Some(LOG_STREAM_NAME.to_string()))
.unwrap();
let sequence_token = stream.upload_sequence_token.clone();
let input_log_event = InputLogEvent {
message: message.clone(),
timestamp,
};
let put_log_events_request = PutLogEventsRequest {
log_events: vec![input_log_event],
log_group_name: LOG_GROUP_NAME.to_string(),
log_stream_name: LOG_STREAM_NAME.to_string(),
sequence_token: sequence_token.clone(),
};
client.put_log_events(put_log_events_request).await
})
.await
.expect("should send log");
}
impl CloudWatchLogger {
pub fn new(client: CloudWatchLogsClient) -> Self {
Self { client }
}
pub fn build(&self) -> pretty_env_logger::env_logger::Logger {
let client = self.client.clone();
let mut logger = pretty_env_logger::formatted_builder();
let builder = logger.format(move |f, record| {
let target = record.target();
let max_width = max_target_width(target);
let mut style = f.style();
let level = colored_level(&mut style, record.level());
let mut style = f.style();
let target = style.set_bold(true).value(Padded {
value: target,
width: max_width,
});
let message = format!(" {} {} > {}", level, target, record.args());
tokio::spawn(send(client.clone(), message));
writeln!(f, " {} {} > {}", level, target, record.args(),)
});
if let Ok(s) = std::env::var("RUST_LOG") {
builder.parse_filters(&s);
}
builder.build()
}
}
サーバー
mod cloud_watch_logger;
use axum::{response::Html, routing::get, Router};
use std::net::SocketAddr;
use rusoto_core::Region;
use rusoto_iam::{GetUserRequest, Iam, IamClient};
use rusoto_logs::CloudWatchLogsClient;
use crate::cloud_watch_logger::CloudWatchLogger;
#[macro_use]
extern crate log;
#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "info");
let iam_client = IamClient::new(Region::UsEast1);
let client = CloudWatchLogsClient::new(rusoto_signature::region::Region::ApNortheast1);
let logger = CloudWatchLogger::new(client);
log::set_boxed_logger(Box::new(logger.build())).unwrap();
let filter = log::LevelFilter::Info;
log::set_max_level(filter);
let user = iam_client
.get_user(GetUserRequest { user_name: None })
.await
.expect("should get user");
info!("{:?}", user.user.user_name);
let app = Router::new().route("/", get(handler));
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
println!("listening on {}", addr);
trace!("Commencing yak shaving");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn handler() -> Html<&'static str> {
debug!("aaa");
trace!("trace");
info!("Info");
warn!("Warn");
error!("error");
println!("printIn");
Html("<h1>Hello, World!</h1>")
}
コードの解説
ログ側のコードは、 pretty-env-logger のコードを参考に、必要なものはインポートし、エクスポートされていないものはコピペしています。(colored_level
や Padded
など)
ログを書き込む処理のところに tokio::spawn で CloudWatch に非同期で処理を待たずに送る処理を呼んでいます。
CloudWatch はたまに上限エラーになるため、 againで成功するまで事項するようにしています。
サーバー側は今回は axum
で簡単にサーバーを建てて、カスタマイズされたログを log::set_boxed_logger
で呼び出すようにしています。
以下はサンプルのリポジトリになります。
ログが動いている様子
サーバーにアクセス
アクセスがあると指定していたレベルでログがコンソールに吐き出されます。
無事CloudWatchにログが送れました!
今回のようなケースはあまりないとは思いますが、誰かの役に立てば幸いです。
tokio::spawn
と send
の箇所がなんだかキレイじゃない気がするので、他にいいやり方を知っている方は教えてもらえると嬉しいです!
Discussion