Open4

rustでAws Timestreamのwrite/query処理

yunayuna

Amazon(aws) Timestreamとは?

時系列データベースに特化した、高速でスケーラブル・従量課金で比較的低コストな、サーバーレスストレージです。

公式サイトより

  • 平滑化、近似、補間などの時系列関数が組み込まれており、SQL を使用して時系列データを迅速に分析できます。
  • サーバーレスデータベースは、毎日数百万のクエリを処理し、必要に応じて自動的にスケーリングします。
  • 最近のデータ用のメモリストアや履歴データ用のマグネティックストアなどのストレージ層を使用して、データライフサイクル管理を簡素化します。
  • データからより迅速な分析情報を導き出し、既存の時系列ソリューションの数分の一のコストでビジネス上の意思決定を行います。

https://aws.amazon.com/jp/timestream/

rustの使用モジュール

write用とquery用のモジュールが別れてます。

aws-sdk-timestreamquery

https://docs.rs/aws-sdk-timestreamquery/latest/aws_sdk_timestreamquery/

aws-sdk-timestreamwrite

https://docs.rs/aws-sdk-timestreamwrite/latest/aws_sdk_timestreamwrite/

yunayuna

書き込み(write)サンプル

公式サイトにもexampleが見当たらなかったので、コード見ながら手探りで動作確認まで。

下準備

  • databaseとtable(metricsという名前で作成)を、amazon timestream上に作成

  • crateを追加しておく

aws-config = "1.1.2"
aws-types = "1.1.2"
aws-sdk-timestreamwrite = "1.10.0"
aws-sdk-timestreamquery = "1.10.0"

書き込み

sampleとして、measure_name: "cpu"、measure_value: f64のランダム値 の値を書き込み

use chrono::Utc;
use aws_types::region::Region;
use tracing::{error, info};
use rand::Rng;
use anyhow::Context;

pub async fn wirte() {
    let timestream_database = std::env::var("AWS_TIMESTREAM_DB").unwrap();
    let timestream_region = std::env::var("AWS_TIMESTREAM_REGION").unwrap();

    let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
        .region(Region::new(timestream_region.clone())).load().await;

    let (client, reload_endpoint) = aws_sdk_timestreamwrite::Client::new(&config).with_endpoint_discovery_enabled().await.unwrap();

    let dim: Dimension = Dimension::builder()
        .dimension_value_type(DimensionValueType::Varchar)
        .name("region")
        .value(timestream_region.clone()).build()?;

    let mut rng = rand::thread_rng();
    let time = Utc::now().timestamp_nanos_opt().context("value can not be represented in a timestamp with nanosecond precision.")?;
    let time = time.to_string();

    let record = Record::builder()
        .dimensions(dim)
        .measure_name("cpu")
        .measure_value(rng.gen::<f64>().to_string())
        .measure_value_type(MeasureValueType::Double)
        .time(time)
        .time_unit(TimeUnit::Nanoseconds)
        .build();

        let result = client.write_records()
                .database_name(timestream_database.clone())
                .table_name("metrics")
                .set_records(Some(vec![record]))
                .send().await;

        match result {
            Ok(output) => info!("timestream metrics write success: {:?}", output),
            Err(e) => error!("timestream metrics write error: {:?}", e),
        }
    });
}

読み込み(query)サンプル

pub async query() {
    let timestream_database = std::env::var("AWS_TIMESTREAM_DB").unwrap();
    let timestream_region = std::env::var("AWS_TIMESTREAM_REGION").unwrap();

    let config = aws_config::defaults(aws_config::BehaviorVersion::latest()).region(Region::new(timestream_region.clone())).load().await;
    
    let (client, reload_endpoint) = aws_sdk_timestreamquery::Client::new(&config).with_endpoint_discovery_enabled().await.unwrap();
    

    let query_output = client.query().query_string(format!("select * from {timestream_database}.metrics")).send().await?;
    println!("query status: {:?}", query_output.query_status());

    query_output.rows().iter().for_each(|x| {
        println!("x. type: {:?}, data:{:?}", x.type_id(), x.data);
    });
}
yunayuna

メモ
メトリクスデータ登録は処理が比較的多いので、
どの程度捌けるのかを検証した上で利用を検討する。

もし一定以上の挿入処理に耐えられない場合、アプリケーション側である程度まとめて挿入するなどコントロールする事で制御できるか否かも検討する

yunayuna

読み込みデータの形式は独特なので注意

先に、column_infoを取得しておく。
後で取得するデータは、基本的にこのカラムの並び順で取得することになる。


let mut columns_index_map: HashMap<String, usize> = HashMap::new();

for (index, column_info) in query_output.column_info.clone().iter().enumerate() {
        if let Some(name) = column_info.name.clone() {
            println!("name:{}, index:{}", name.as_str(), index);
            columns_index_map.insert(name, index);
        }
    }

const TIMESTREAM_FIELD_MEASURE_NAME: &str = "measure_name"; //登録時に指定したデータ種別対象の名前

const TIMESTREAM_FIELD_MACHINE_ID: &str = "m_id"; //具体的なデータ種別名
const TIMESTREAM_FIELD_TIME: &str = "time"; //時刻カラム名
const TIMESTREAM_FIELD_VALUE_DOUBLE: &str = "measure_value::double"; //double値のカラム名
const TIMESTREAM_FIELD_VALUE_INT: &str = "measure_value::bigint"; //bigint値のカラム名

for row in query_output.rows().iter() {
        let data = row.data();
        let measure_name = data.get(*columns_index_map.get(TIMESTREAM_FIELD_MEASURE_NAME).unwrap()).unwrap().scalar_value().unwrap();
        let timestamp = data.get(*columns_index_map.get(TIMESTREAM_FIELD_TIME).unwrap()).unwrap().scalar_value().unwrap();
        let value_double_opt = data.get(*columns_index_map.get(TIMESTREAM_FIELD_VALUE_DOUBLE).unwrap()).unwrap().scalar_value();
        let value_int_opt = data.get(*columns_index_map.get(TIMESTREAM_FIELD_VALUE_INT).unwrap()).unwrap().scalar_value();

        println!("timestamp:{:?}", timestamp);
        let format = "%Y-%m-%d %H:%M:%S%.f";
        let naive_datetime = NaiveDateTime::parse_from_str(timestamp, format).unwrap();
        let datetime_utc: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive_datetime, Utc);

        ......