🦀

RustのMongoDBドライバーとRayon

2022/06/22に公開

この記事では、RustMongoDBドライバーの使い方の紹介と、データのパラレル処理Rayonの説明をします。

インストール

mongodbcrateをインストールします。

https://docs.rs/mongodb/latest/mongodb/

インサート

複数ドキュメントをインサートします。
serdeを使用してMongoDBのドキュメントをRustのstruct(Book)にマッピングします。

ドキュメントのインサート
use anyhow::Result;
use mongodb::bson::doc;
use mongodb::bson::oid::ObjectId;
use mongodb::{options::ClientOptions, Client};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Book {
    #[serde(rename = "_id", skip_serializing)]
    id: Option<ObjectId>,
    title: String,
    author: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("test");
    let collection = db.collection::<Book>("books");

    let books = vec![
        Book {
            id: None,
            title: "1984".to_string(),
            author: "George Orwell".to_string(),
        },
        Book {
            id: None,
            title: "Animal Farm".to_string(),
            author: "George Orwell".to_string(),
        },
        Book {
            id: None,
            title: "The Great Gatsby".to_string(),
            author: "F. Scott Fitzgerald".to_string(),
        },
    ];

    collection.insert_many(books, None).await?;
    Ok(())
}

idOptionにして自動採番させます。serderename_idにマッピングします。

検索

authorフィールドがGeorge Orwellのドキュメントを取得します。

col.find(クエリー、ソート順)

tokioを使用して非同期に呼び出します。
検索結果はカーソルで返されます。

ソートなどは、1、-1と数値で使用します。

    let filter = doc! {"author": "George Orwell"};
    let find_options = FindOptions::builder().sort(doc! {"title": -1}).build();
    
    let mut cursor = col.find(filter, find_options).await?;

    while let Some(book) = cursor.try_next().await? {
        println!("title: {}", book.title);
    }

    Ok(())
}

実行結果です。

実行結果
title: Animal Farm
title: 1984

全件取得するには、let filter = doc!{}と記述します。削除するには、col.delete_many()と記述します。

Document型

structを定義するのが面倒な人には、Document型が用意されています。
値の取得には、型に応じてget_xxx()を使用します。ex.) get_str()

Aggregation Pipeline

SQLで言えばGroup Byになります。特定の条件でドキュメントをグルーピングし、平均値や合計などの集約計算をします。クエリーはMongoDB Compassからコードを自動生成して使用すると便利です。

    let query = vec![
        doc! {
            "$project": doc! {
                "_id": 0,
                "item": 1,
                "size": 1,
                "tags": 1
            }
        },
        doc! {
            "$unwind": doc! {
                "path": "$tags"
            }
        },
        doc! {
            "$group": doc! {
                "_id": "$tags",
                "avgTagQty": doc! {
                    "$avg": "$size.h"
                }
            }
        },
        doc! {
            "$sort": doc! {
                "avgTagQty": -1
            }
        },
    ];

    let mut cursor = col.aggregate(query, None).await?;

実行結果です。

doc: { "_id": "cotton", "avgTagQty": 28 }
doc: { "_id": "gray", "avgTagQty": 27.9 }
doc: { "_id": "gel", "avgTagQty": 19 }
doc: { "_id": "blue", "avgTagQty": 19 }
doc: { "_id": "blank", "avgTagQty": 14 }
doc: { "_id": "red", "avgTagQty": 14 }

BSON

バージョン2.2.0からBSONを使用することでパフォーマンスが向上しています。

https://patrickfreed.github.io/rust/2022/04/27/unlocking-greater-performance-in-the-mongodb-rust-driver-via-raw-bson-and-zero-copy-deserialization.html#validating-bson

パススルー的にReactJSなどのフロントにJSONデータをそのまま返却するのであれば、わざわざstructにデシリアライズする必要はありません。そのためにRawDocumentBuf型が利用できます。

10万ドキュメントを全件取得するパフォーマンスを測定してみます。

    let collection = "employees";
    // Document or RawDocumentBuf
    let coll = client
        .database("test")
        .collection::<RawDocumentBuf>(collection);

    let now = Instant::now();

    //
    let docs = coll.find(None, None).await?.try_collect::<Vec<_>>().await?;
    println!("{}ms", now.elapsed().as_millis());

   let json = serde_json::to_string_pretty(&docs)?;

実行結果

RawDocumentBuf 880ms
Document 4714ms

約5倍速くなりました。

Rayon

実際には取得したデータに何かしらの加工処理をする必要があるかと思います。そこでデータパラレル処理ライブラリRayonを使用してDocument型に変換してみます。

https://docs.rs/rayon/latest/rayon/

rayonはiter()をpar_iter()に変更するだけでデータの競合なくパラレルに処理してくれます。

    pub use rayon::prelude::*;

    let docs: Vec<RawDocumentBuf> = coll.find(None, None).await?.try_collect::<Vec<_>>().await?;

    let _y: Vec<Document> = docs.par_iter().map(|x| x.to_document().unwrap()).collect();

私の4コアのmacでの処理は2569msとなりました。ちなみにRayonを使用しないケースは3658msでした。

ダイレクトにstructに変換する方法が分からなかったのでStackOverflowで聞いたら教えてくれました。

https://stackoverflow.com/questions/72696316/how-to-convert-a-mongodb-rawdocumentbuf-to-a-rust-struct/72720814#72720814

sliceから生成することができます。そしてDocumentに変換するよりも2倍速くなりました。

    let employees: Vec<Employee> = docs
        .par_iter()
        .map(|raw| bson::from_slice(raw.as_bytes()).unwrap())
        .collect();

タイムアウト処理

Golangのgoroutineのようにコンカレントなロジックを組む場合にメモリーリークしないようにタイムアウトさせる必要があります。Rustではスレッドを生成し、そのスレッドをタイムアウト付きで待つようにします。

    let handle = tokio::task::spawn(async move { collection.insert_many(books, None).await });

    if let Err(_) = tokio::time::timeout(Duration::from_secs(3), handle).await {
        println!("did not receive value within 3 ms");
    }

Discussion