RustのMongoDBドライバーとRayon
この記事では、Rust
のMongoDB
ドライバーの使い方の紹介と、データのパラレル処理Rayon
の説明をします。
インストール
mongodb
のcrate
をインストールします。
インサート
複数ドキュメントをインサートします。
serdeを使用してMongoDBのドキュメントをRustのstruct(Book)にマッピングします。
use anyhow::Result;
use mongodb::bson::doc;
use mongodb::bson::oid::ObjectId;
use mongodb::{options::ClientOptions, Client};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Book {
#[serde(rename = "_id", skip_serializing)]
id: Option<ObjectId>,
title: String,
author: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
let client = Client::with_options(client_options)?;
let db = client.database("test");
let collection = db.collection::<Book>("books");
let books = vec![
Book {
id: None,
title: "1984".to_string(),
author: "George Orwell".to_string(),
},
Book {
id: None,
title: "Animal Farm".to_string(),
author: "George Orwell".to_string(),
},
Book {
id: None,
title: "The Great Gatsby".to_string(),
author: "F. Scott Fitzgerald".to_string(),
},
];
collection.insert_many(books, None).await?;
Ok(())
}
id
をOption
にして自動採番させます。serde
のrename
で_id
にマッピングします。
検索
author
フィールドがGeorge Orwell
のドキュメントを取得します。
col.find(クエリー、ソート順)
tokio
を使用して非同期に呼び出します。
検索結果はカーソルで返されます。
ソートなどは、1、-1
と数値で使用します。
let filter = doc! {"author": "George Orwell"};
let find_options = FindOptions::builder().sort(doc! {"title": -1}).build();
let mut cursor = col.find(filter, find_options).await?;
while let Some(book) = cursor.try_next().await? {
println!("title: {}", book.title);
}
Ok(())
}
実行結果です。
title: Animal Farm
title: 1984
全件取得するには、let filter = doc!{}
と記述します。削除するには、col.delete_many()
と記述します。
Document型
structを定義するのが面倒な人には、Document
型が用意されています。
値の取得には、型に応じてget_xxx()
を使用します。ex.) get_str()
Aggregation Pipeline
SQLで言えばGroup By
になります。特定の条件でドキュメントをグルーピングし、平均値や合計などの集約計算をします。クエリーはMongoDB Compassからコードを自動生成して使用すると便利です。
let query = vec![
doc! {
"$project": doc! {
"_id": 0,
"item": 1,
"size": 1,
"tags": 1
}
},
doc! {
"$unwind": doc! {
"path": "$tags"
}
},
doc! {
"$group": doc! {
"_id": "$tags",
"avgTagQty": doc! {
"$avg": "$size.h"
}
}
},
doc! {
"$sort": doc! {
"avgTagQty": -1
}
},
];
let mut cursor = col.aggregate(query, None).await?;
実行結果です。
doc: { "_id": "cotton", "avgTagQty": 28 }
doc: { "_id": "gray", "avgTagQty": 27.9 }
doc: { "_id": "gel", "avgTagQty": 19 }
doc: { "_id": "blue", "avgTagQty": 19 }
doc: { "_id": "blank", "avgTagQty": 14 }
doc: { "_id": "red", "avgTagQty": 14 }
BSON
バージョン2.2.0からBSONを使用することでパフォーマンスが向上しています。
パススルー的にReactJSなどのフロントにJSONデータをそのまま返却するのであれば、わざわざstructにデシリアライズする必要はありません。そのためにRawDocumentBuf
型が利用できます。
10万ドキュメントを全件取得するパフォーマンスを測定してみます。
let collection = "employees";
// Document or RawDocumentBuf
let coll = client
.database("test")
.collection::<RawDocumentBuf>(collection);
let now = Instant::now();
//
let docs = coll.find(None, None).await?.try_collect::<Vec<_>>().await?;
println!("{}ms", now.elapsed().as_millis());
let json = serde_json::to_string_pretty(&docs)?;
実行結果
RawDocumentBuf | 880ms |
Document | 4714ms |
約5倍速くなりました。
Rayon
実際には取得したデータに何かしらの加工処理をする必要があるかと思います。そこでデータパラレル処理ライブラリRayon
を使用してDocument型に変換してみます。
rayonはiter()をpar_iter()
に変更するだけでデータの競合なくパラレルに処理してくれます。
pub use rayon::prelude::*;
let docs: Vec<RawDocumentBuf> = coll.find(None, None).await?.try_collect::<Vec<_>>().await?;
let _y: Vec<Document> = docs.par_iter().map(|x| x.to_document().unwrap()).collect();
私の4コアのmacでの処理は2569msとなりました。ちなみにRayonを使用しないケースは3658msでした。
ダイレクトにstructに変換する方法が分からなかったのでStackOverflow
で聞いたら教えてくれました。
sliceから生成することができます。そしてDocumentに変換するよりも2倍速くなりました。
let employees: Vec<Employee> = docs
.par_iter()
.map(|raw| bson::from_slice(raw.as_bytes()).unwrap())
.collect();
タイムアウト処理
Golangのgoroutine
のようにコンカレントなロジックを組む場合にメモリーリークしないようにタイムアウトさせる必要があります。Rustではスレッドを生成し、そのスレッドをタイムアウト付きで待つようにします。
let handle = tokio::task::spawn(async move { collection.insert_many(books, None).await });
if let Err(_) = tokio::time::timeout(Duration::from_secs(3), handle).await {
println!("did not receive value within 3 ms");
}
Discussion