Open4

rust + mongo メモ

yunayuna

Time Series Collectionsのtimestampの入れ方

結論は最後に書いてます

試行錯誤の記録

mongoには、時系列データを効率的に処理してくれる Time Series Collections というのがある。
https://www.mongodb.com/docs/v5.0/core/timeseries-collections/

その時、時系列の時刻を表すフィールドをtimeField、
時系列のキーとなるフィールドをmetaFieldとして事前に設定する必要があるので、
例として都市の気温を時系列で保持するような、コレクション設定を考えてみる。

db.createCollection(
    "weather",
    {
       timeseries: {
          timeField: "timestamp",
          metaField: "city",
          granularity: "hours"
       }
    }
)

これをrustで実装したいので、とりあえずシンプルなstructを作る。

cargo.toml
chrono = {version="0.4.24", features=["serde"] }
use chrono::{DateTime, Utc};

#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct  Temperature {
    pub city: String,
    pub temperature: f64
    pub created_at: DateTime<Utc>, //   timeField on time series
}

chronoのSerizlizerが自動的に採用されるのだが、
これはmongoに挿入する形式とは異なるので、
例えば以下のように処理をすると、created_atの形式がBSON UTC形式じゃない、と怒られる。

let mut document = bson::to_document(self)?;
Error {
    kind: Write(
        WriteError(
            WriteError {
                code: 2,
                code_name: None,
                message: \"'created_at' must be present and contain a valid BSON UTC datetime value\",
                details: None,
            },
        ),
    ),
    labels: {},
    wire_version: None,
    source: None,
}

では、Serializerをmongoの形式にあったものにすれば良いのでは?
mongo-rust-driverでは、色んなformatに適したSerizlizer用意されてるので、その中からこちらを使ってみる。

use mongodb::bson::serde_helpers::chrono_datetime_as_bson_datetime;
use chrono::{DateTime, Utc};

#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct  Temperature {
    pub city: String,
    pub temperature: f64
  #[serde(with = "chrono_datetime_as_bson_datetime")]
    pub created_at: DateTime<Utc>, //   timeField on time series
}

これで、mongoへのinsertは問題なく動くようになる。

ただ、ここで問題が。

データの流れとして、以下のように①と②のserialize/desirializeタイミングがあるのだが、
①と②で求めるフォーマットが異なる。
front <- ① -> rust <- ② -> mongo

①温度計から取得した生データをrustのapiに送ったり、
 mongoに保存されている時系列データを取得してwebにグラフ表示したりする。
 このときのデータとしてはrfc3339のような形式が扱いやすい。

②rustからmongoにinsertしたり、mongoからreadしてrustのstructに変換したりする。

上記の記述だと、②は対応できるが、①に対応できない。
rfc3339で扱うためのSerializerも用意されているのだが、以下のようにしてみても、
結局①か②どちらかには対応できても、両方には対応できない。

#[serde(with = "rfc3339_string_as_bson_datetime")]
pub created_at: String,
    #[serde(with = "bson_datetime_as_rfc3339_string")]
    pub created_at: mongodb::bson::DateTime,

ひとまず、以下のような方法で回避を検討中。

・structはrfc3339で扱えるよう設計しつつ、
mongoへのinsert/select 時は、個別にcreated_atだけbson::DateTime形式に変換するよう、専用の変換メソッドを用意する。
mongoからデータ取得した際、そのままdeserializeするとcreated_atフィールドの型が合わずErrorになるので、
created_atフィールドのdeserializeは、エラー時にNoneが入るよう、
serde_with crateの、#[serde_as(deserialize_as = "DefaultOnError")]を使って実装する。

この時点の完成形

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnError};

#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct  Temperature {
    pub city: String,
    pub temperature: f64
   #[serde_as(deserialize_as = "DefaultOnError")]
    pub created_at: Option<DateTime<Utc>>, //   timeField on time series
}

impl Temperature {
    pub fn to_document(&self) -> anyhow::Result<mongodb::bson::Document> {
        let mut document = mongodb::bson::to_document(self)?;
        document.insert("created_at", bson::DateTime::from(self.created_at));
        Ok(document)
    }
    pub fn from_document(document: mongodb::bson::Document) -> anyhow::Result<Self> {
        let mut metric = mongodb::bson::from_document::<Temperature>(document.clone())?;
        let created_date = document
            .get("created_at")
            .and_then(|x| x.as_datetime())
            .map(|x| x.to_chrono());
        metric.created_at = created_date;
        Ok(metric)
    }
}

でも、専用の変換メソッドto_document, from_documentを書くのが面倒、、
ということで、
Deserialize処理を自分で記述する方法を検証した

結論(2023/11/13)

mongoで、timeseriesを設定したフィールドに関しては、
上記のようにbsonのUTC形式で厳密にformatしないとエラーになるのだが、
通常のフィールドについては、なぜかエラーにならない。

ということで、timeseries用のフィールド(indexが貼られる)と、人が確認するためのフィールドを分けて書くのが良さそうだ。

人が確認するためのフィールド(特にtimeseries等の設定を行わないフィールド)については、
serialize時の処理と、
deserialize時に、文字列からのDeserializeも
mongo(bson)のDocumentからのDeserializeにも対応しつつ、
エラーの場合はDefault値(None)をセットすることができた。

Deserialize関数は他のstructにも流用可能なので、一度書いて使い回せばOKです。

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnError};
use serde::{Deserializer, Serializer};

#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct  Temperature {
    pub city: String,
    pub temperature: f64
    #[serde(default, serialize_with = "to_mongo_datetime", deserialize_with = "from_mongo_datetime")]
    pub created_at: Option<DateTime<Utc>>, //   timeField on time series
    #[serde(default, deserialize_with = "from_mongo_datetime")]
    pub created_at_not_time_series: Option<DateTime<Utc>>, //  not  time series
}

fn to_mongo_datetime<S>(dt: &Option<DateTime<Utc>>, serializer: S) -> Result<S::Ok, S::Error>
where
    S: Serializer,
{
    match dt {
        Some(dt) => {
            info!("to_mongo_datetime: {:?}", dt);
            let dt = bson::DateTime::from(dt.clone());
            dt.serialize(serializer)
            // let dt_str = dt.to_rfc3339();
            // serializer.serialize_str(dt_str.as_str())
        },
        None => serializer.serialize_none()
    }
}

fn from_mongo_datetime<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
where
    D: Deserializer<'de>,
{
    let opt: Option<bson::Bson> = Option::deserialize(deserializer)?;
    match opt {
        Some(bson::Bson::DateTime(dt)) => Ok(Some(dt.to_chrono())),
        Some(bson::Bson::String(dt)) => {
            let ret_data = chrono::DateTime::parse_from_rfc3339(dt.as_str()).ok().map(|x| x.with_timezone(&Utc));
            Ok(ret_data)
        },
        None => Ok(None),
        // _ => Err(serde::de::Error::custom("DateTime format is invalid"))
        _ => Ok(None) //default on Error.
    }
}
yunayuna

もう一つハマッたこと

structを書く時、#[serde_as]の記述場所を間違えると、正しく動かない

NGパターン(serde_asが後ろにある)

#[derive(Debug, Serialize, Deserialize,  Default)]
#[serde_as]
pub struct PostSchedule {
    #[serde(rename = "_id")]
    pub oid: Option<ObjectId>,

    #[serde_as(deserialize_as = "DefaultOnError")]
    pub post_at: Option<DateTime<Utc>>, //投稿予定日時
    pub posted: Option<bool>, //投稿済みか否か
}

OKパターン(#[serde_as]が最初にある)

#[serde_as]
#[derive(Debug, Serialize, Deserialize,  Default)]
pub struct PostSchedule {
    #[serde(rename = "_id")]
    pub oid: Option<ObjectId>,

    #[serde_as(deserialize_as = "DefaultOnError")]
    pub post_at: Option<DateTime<Utc>>, //投稿予定日時
    pub posted: Option<bool>, //投稿済みか否か
}
yunayuna

mongoのidや、created_atなどの共通フィールドは、毎回structに書かず、
以下のように設計しています。
#[serde(flatten)]をつけることで、serialize, deserialize時に"mongo_field"というフィールドは省略されて、直接oid,created_at,updated_atに項目が展開されます。

#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate, Default, Clone)]
pub struct BaseMongoField {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    #[serde(default, deserialize_with = "from_mongo_datetime")]
    pub created_at: Option<DateTime<Utc>>,
    // #[serde_as(deserialize_as = "DefaultOnError")]
    #[serde(default, deserialize_with = "from_mongo_datetime")]
    pub updated_at: Option<DateTime<Utc>>,
}

//
#[derive(Debug, Serialize, Deserialize, Validate, Default)]
pub struct Article {
    #[serde(flatten)]
    pub mongo_field: BaseMongoField,
    pub url: Option<String>,
    pub text: Option<String>,
    pub memo: Option<String>,
}

fn from_mongo_datetime<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
where
    D: Deserializer<'de>,
{
    let opt: Option<bson::Bson> = Option::deserialize(deserializer)?;
    match opt {
        Some(bson::Bson::DateTime(dt)) => Ok(Some(dt.to_chrono())),
        Some(bson::Bson::String(dt)) => {
            let ret_data = chrono::DateTime::parse_from_rfc3339(dt.as_str()).ok().map(|x| x.with_timezone(&Utc));
            Ok(ret_data)
        },
        None => Ok(None),
        // _ => Err(serde::de::Error::custom("DateTime format is invalid"))
        _ => Ok(None) //default on Error.
    }
}