Open4
rustでAws Timestreamのwrite/query処理
Amazon(aws) Timestreamとは?
時系列データベースに特化した、高速でスケーラブル・従量課金で比較的低コストな、サーバーレスストレージです。
公式サイトより
- 平滑化、近似、補間などの時系列関数が組み込まれており、SQL を使用して時系列データを迅速に分析できます。
- サーバーレスデータベースは、毎日数百万のクエリを処理し、必要に応じて自動的にスケーリングします。
- 最近のデータ用のメモリストアや履歴データ用のマグネティックストアなどのストレージ層を使用して、データライフサイクル管理を簡素化します。
- データからより迅速な分析情報を導き出し、既存の時系列ソリューションの数分の一のコストでビジネス上の意思決定を行います。
rustの使用モジュール
write用とquery用のモジュールが別れてます。
aws-sdk-timestreamquery
aws-sdk-timestreamwrite
書き込み(write)サンプル
公式サイトにもexampleが見当たらなかったので、コード見ながら手探りで動作確認まで。
下準備
-
databaseとtable(metricsという名前で作成)を、amazon timestream上に作成
-
crateを追加しておく
aws-config = "1.1.2"
aws-types = "1.1.2"
aws-sdk-timestreamwrite = "1.10.0"
aws-sdk-timestreamquery = "1.10.0"
書き込み
sampleとして、measure_name: "cpu"、measure_value: f64のランダム値 の値を書き込み
use chrono::Utc;
use aws_types::region::Region;
use tracing::{error, info};
use rand::Rng;
use anyhow::Context;
pub async fn wirte() {
let timestream_database = std::env::var("AWS_TIMESTREAM_DB").unwrap();
let timestream_region = std::env::var("AWS_TIMESTREAM_REGION").unwrap();
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(Region::new(timestream_region.clone())).load().await;
let (client, reload_endpoint) = aws_sdk_timestreamwrite::Client::new(&config).with_endpoint_discovery_enabled().await.unwrap();
let dim: Dimension = Dimension::builder()
.dimension_value_type(DimensionValueType::Varchar)
.name("region")
.value(timestream_region.clone()).build()?;
let mut rng = rand::thread_rng();
let time = Utc::now().timestamp_nanos_opt().context("value can not be represented in a timestamp with nanosecond precision.")?;
let time = time.to_string();
let record = Record::builder()
.dimensions(dim)
.measure_name("cpu")
.measure_value(rng.gen::<f64>().to_string())
.measure_value_type(MeasureValueType::Double)
.time(time)
.time_unit(TimeUnit::Nanoseconds)
.build();
let result = client.write_records()
.database_name(timestream_database.clone())
.table_name("metrics")
.set_records(Some(vec![record]))
.send().await;
match result {
Ok(output) => info!("timestream metrics write success: {:?}", output),
Err(e) => error!("timestream metrics write error: {:?}", e),
}
});
}
読み込み(query)サンプル
pub async query() {
let timestream_database = std::env::var("AWS_TIMESTREAM_DB").unwrap();
let timestream_region = std::env::var("AWS_TIMESTREAM_REGION").unwrap();
let config = aws_config::defaults(aws_config::BehaviorVersion::latest()).region(Region::new(timestream_region.clone())).load().await;
let (client, reload_endpoint) = aws_sdk_timestreamquery::Client::new(&config).with_endpoint_discovery_enabled().await.unwrap();
let query_output = client.query().query_string(format!("select * from {timestream_database}.metrics")).send().await?;
println!("query status: {:?}", query_output.query_status());
query_output.rows().iter().for_each(|x| {
println!("x. type: {:?}, data:{:?}", x.type_id(), x.data);
});
}
メモ
メトリクスデータ登録は処理が比較的多いので、
どの程度捌けるのかを検証した上で利用を検討する。
もし一定以上の挿入処理に耐えられない場合、アプリケーション側である程度まとめて挿入するなどコントロールする事で制御できるか否かも検討する
読み込みデータの形式は独特なので注意
先に、column_infoを取得しておく。
後で取得するデータは、基本的にこのカラムの並び順で取得することになる。
let mut columns_index_map: HashMap<String, usize> = HashMap::new();
for (index, column_info) in query_output.column_info.clone().iter().enumerate() {
if let Some(name) = column_info.name.clone() {
println!("name:{}, index:{}", name.as_str(), index);
columns_index_map.insert(name, index);
}
}
const TIMESTREAM_FIELD_MEASURE_NAME: &str = "measure_name"; //登録時に指定したデータ種別対象の名前
const TIMESTREAM_FIELD_MACHINE_ID: &str = "m_id"; //具体的なデータ種別名
const TIMESTREAM_FIELD_TIME: &str = "time"; //時刻カラム名
const TIMESTREAM_FIELD_VALUE_DOUBLE: &str = "measure_value::double"; //double値のカラム名
const TIMESTREAM_FIELD_VALUE_INT: &str = "measure_value::bigint"; //bigint値のカラム名
for row in query_output.rows().iter() {
let data = row.data();
let measure_name = data.get(*columns_index_map.get(TIMESTREAM_FIELD_MEASURE_NAME).unwrap()).unwrap().scalar_value().unwrap();
let timestamp = data.get(*columns_index_map.get(TIMESTREAM_FIELD_TIME).unwrap()).unwrap().scalar_value().unwrap();
let value_double_opt = data.get(*columns_index_map.get(TIMESTREAM_FIELD_VALUE_DOUBLE).unwrap()).unwrap().scalar_value();
let value_int_opt = data.get(*columns_index_map.get(TIMESTREAM_FIELD_VALUE_INT).unwrap()).unwrap().scalar_value();
println!("timestamp:{:?}", timestamp);
let format = "%Y-%m-%d %H:%M:%S%.f";
let naive_datetime = NaiveDateTime::parse_from_str(timestamp, format).unwrap();
let datetime_utc: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive_datetime, Utc);
......