rust + mongo メモ
mongo公式driverライブラリを使って開発していく上でハマったポイントをメモします。
Time Series Collectionsのtimestampの入れ方
結論は最後に書いてます
試行錯誤の記録
mongoには、時系列データを効率的に処理してくれる Time Series Collections というのがある。
その時、時系列の時刻を表すフィールドをtimeField、
時系列のキーとなるフィールドをmetaFieldとして事前に設定する必要があるので、
例として都市の気温を時系列で保持するような、コレクション設定を考えてみる。
db.createCollection(
"weather",
{
timeseries: {
timeField: "timestamp",
metaField: "city",
granularity: "hours"
}
}
)
これをrustで実装したいので、とりあえずシンプルなstructを作る。
chrono = {version="0.4.24", features=["serde"] }
use chrono::{DateTime, Utc};
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct Temperature {
pub city: String,
pub temperature: f64
pub created_at: DateTime<Utc>, // timeField on time series
}
chronoのSerizlizerが自動的に採用されるのだが、
これはmongoに挿入する形式とは異なるので、
例えば以下のように処理をすると、created_atの形式がBSON UTC形式じゃない、と怒られる。
let mut document = bson::to_document(self)?;
Error {
kind: Write(
WriteError(
WriteError {
code: 2,
code_name: None,
message: \"'created_at' must be present and contain a valid BSON UTC datetime value\",
details: None,
},
),
),
labels: {},
wire_version: None,
source: None,
}
では、Serializerをmongoの形式にあったものにすれば良いのでは?
mongo-rust-driverでは、色んなformatに適したSerizlizer用意されてるので、その中からこちらを使ってみる。
use mongodb::bson::serde_helpers::chrono_datetime_as_bson_datetime;
use chrono::{DateTime, Utc};
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct Temperature {
pub city: String,
pub temperature: f64
#[serde(with = "chrono_datetime_as_bson_datetime")]
pub created_at: DateTime<Utc>, // timeField on time series
}
これで、mongoへのinsertは問題なく動くようになる。
ただ、ここで問題が。
データの流れとして、以下のように①と②のserialize/desirializeタイミングがあるのだが、
①と②で求めるフォーマットが異なる。
front <- ① -> rust <- ② -> mongo
①温度計から取得した生データをrustのapiに送ったり、
mongoに保存されている時系列データを取得してwebにグラフ表示したりする。
このときのデータとしてはrfc3339のような形式が扱いやすい。
②rustからmongoにinsertしたり、mongoからreadしてrustのstructに変換したりする。
上記の記述だと、②は対応できるが、①に対応できない。
rfc3339で扱うためのSerializerも用意されているのだが、以下のようにしてみても、
結局①か②どちらかには対応できても、両方には対応できない。
#[serde(with = "rfc3339_string_as_bson_datetime")]
pub created_at: String,
#[serde(with = "bson_datetime_as_rfc3339_string")]
pub created_at: mongodb::bson::DateTime,
ひとまず、以下のような方法で回避を検討中。
・structはrfc3339で扱えるよう設計しつつ、
mongoへのinsert/select 時は、個別にcreated_atだけbson::DateTime形式に変換するよう、専用の変換メソッドを用意する。
mongoからデータ取得した際、そのままdeserializeするとcreated_atフィールドの型が合わずErrorになるので、
created_atフィールドのdeserializeは、エラー時にNoneが入るよう、
serde_with crateの、#[serde_as(deserialize_as = "DefaultOnError")]
を使って実装する。
この時点の完成形
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnError};
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct Temperature {
pub city: String,
pub temperature: f64
#[serde_as(deserialize_as = "DefaultOnError")]
pub created_at: Option<DateTime<Utc>>, // timeField on time series
}
impl Temperature {
pub fn to_document(&self) -> anyhow::Result<mongodb::bson::Document> {
let mut document = mongodb::bson::to_document(self)?;
document.insert("created_at", bson::DateTime::from(self.created_at));
Ok(document)
}
pub fn from_document(document: mongodb::bson::Document) -> anyhow::Result<Self> {
let mut metric = mongodb::bson::from_document::<Temperature>(document.clone())?;
let created_date = document
.get("created_at")
.and_then(|x| x.as_datetime())
.map(|x| x.to_chrono());
metric.created_at = created_date;
Ok(metric)
}
}
でも、専用の変換メソッドto_document, from_documentを書くのが面倒、、
ということで、
Deserialize処理を自分で記述する方法を検証した
結論(2023/11/13)
mongoで、timeseriesを設定したフィールドに関しては、
上記のようにbsonのUTC形式で厳密にformatしないとエラーになるのだが、
通常のフィールドについては、なぜかエラーにならない。
ということで、timeseries用のフィールド(indexが貼られる)と、人が確認するためのフィールドを分けて書くのが良さそうだ。
人が確認するためのフィールド(特にtimeseries等の設定を行わないフィールド)については、
serialize時の処理と、
deserialize時に、文字列からのDeserializeも
mongo(bson)のDocumentからのDeserializeにも対応しつつ、
エラーの場合はDefault値(None)をセットすることができた。
Deserialize関数は他のstructにも流用可能なので、一度書いて使い回せばOKです。
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnError};
use serde::{Deserializer, Serializer};
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct Temperature {
pub city: String,
pub temperature: f64
#[serde(default, serialize_with = "to_mongo_datetime", deserialize_with = "from_mongo_datetime")]
pub created_at: Option<DateTime<Utc>>, // timeField on time series
#[serde(default, deserialize_with = "from_mongo_datetime")]
pub created_at_not_time_series: Option<DateTime<Utc>>, // not time series
}
fn to_mongo_datetime<S>(dt: &Option<DateTime<Utc>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match dt {
Some(dt) => {
info!("to_mongo_datetime: {:?}", dt);
let dt = bson::DateTime::from(dt.clone());
dt.serialize(serializer)
// let dt_str = dt.to_rfc3339();
// serializer.serialize_str(dt_str.as_str())
},
None => serializer.serialize_none()
}
}
fn from_mongo_datetime<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<bson::Bson> = Option::deserialize(deserializer)?;
match opt {
Some(bson::Bson::DateTime(dt)) => Ok(Some(dt.to_chrono())),
Some(bson::Bson::String(dt)) => {
let ret_data = chrono::DateTime::parse_from_rfc3339(dt.as_str()).ok().map(|x| x.with_timezone(&Utc));
Ok(ret_data)
},
None => Ok(None),
// _ => Err(serde::de::Error::custom("DateTime format is invalid"))
_ => Ok(None) //default on Error.
}
}
もう一つハマッたこと
structを書く時、#[serde_as]の記述場所を間違えると、正しく動かない
NGパターン(serde_asが後ろにある)
#[derive(Debug, Serialize, Deserialize, Default)]
#[serde_as]
pub struct PostSchedule {
#[serde(rename = "_id")]
pub oid: Option<ObjectId>,
#[serde_as(deserialize_as = "DefaultOnError")]
pub post_at: Option<DateTime<Utc>>, //投稿予定日時
pub posted: Option<bool>, //投稿済みか否か
}
OKパターン(#[serde_as]が最初にある)
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct PostSchedule {
#[serde(rename = "_id")]
pub oid: Option<ObjectId>,
#[serde_as(deserialize_as = "DefaultOnError")]
pub post_at: Option<DateTime<Utc>>, //投稿予定日時
pub posted: Option<bool>, //投稿済みか否か
}
mongoのidや、created_atなどの共通フィールドは、毎回structに書かず、
以下のように設計しています。
#[serde(flatten)]をつけることで、serialize, deserialize時に"mongo_field"というフィールドは省略されて、直接oid,created_at,updated_atに項目が展開されます。
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Validate, Default, Clone)]
pub struct BaseMongoField {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
#[serde(default, deserialize_with = "from_mongo_datetime")]
pub created_at: Option<DateTime<Utc>>,
// #[serde_as(deserialize_as = "DefaultOnError")]
#[serde(default, deserialize_with = "from_mongo_datetime")]
pub updated_at: Option<DateTime<Utc>>,
}
//
#[derive(Debug, Serialize, Deserialize, Validate, Default)]
pub struct Article {
#[serde(flatten)]
pub mongo_field: BaseMongoField,
pub url: Option<String>,
pub text: Option<String>,
pub memo: Option<String>,
}
fn from_mongo_datetime<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<bson::Bson> = Option::deserialize(deserializer)?;
match opt {
Some(bson::Bson::DateTime(dt)) => Ok(Some(dt.to_chrono())),
Some(bson::Bson::String(dt)) => {
let ret_data = chrono::DateTime::parse_from_rfc3339(dt.as_str()).ok().map(|x| x.with_timezone(&Utc));
Ok(ret_data)
},
None => Ok(None),
// _ => Err(serde::de::Error::custom("DateTime format is invalid"))
_ => Ok(None) //default on Error.
}
}