💽

RustでLSMツリーを実装してみた

に公開

概要

CassandraやScyllaDBといったKVSで使用されているLSMツリーを簡単に実装してみたので、簡単にお話ししていきたいと思います

こちらがリポジトリです
https://github.com/sraku2159/lsmtree

実装のお話をする前に簡単にLSMツリーについてお話ししていきたいと思います

LSMツリーとは

LSMツリーとはLog Structre Merge ツリーの略で以下の四つのコンポーネントから構成されます。

  • Memtable
  • CommitLog
  • SSTable
  • コンパクション

以下、ScyllaDBのちょー親切なドキュメントから拝借した画像です

操作に関しては、一般に

  • PUT
  • GET

のみです。挿入と削除は同じように追記するという形をとります。
ここからは簡単に各コンポーネントについて説明します。

Memtable

  • データがある閾値に達するまで、格納され続ける平衡木のこと
  • 全てのデータの書き込みはここに行われる
  • データの読み取り時最初に読み込まれる
  • もしデータが存在しなければ、後述のSSTableを参照しにいく

CommitLog

Memtableはあくまでメモリ上に存在するだけなので、クラッシュした際にデータがすべて消えてしまう。その対策ためのLSMツリー上で行ったアクションのログのこと
このログは、以下で説明するSSTableにフラッシュされたのちに消される

SSTable

Sorted String Tableの略で、キーとなる文字列をソートされた状態で格納されたファイルのこと

  • キーはある一つのSSTable内で一意である
  • Memtableにデータが存在しなかった際に、こちらから読み込まれる
  • キーがソート済みであるため、探索が効率的に行える

コンパクション

一般的にSSTableはイミュータブルであるため、異なる2つのSSTableに同じキーで異なる値が存在すると言うことがある。このうちどちらか一方の古いデータは不要であるため、無駄なデータが存在し続けることになる。
そのため、LSMツリーではどこかのタイミングでいくつかのSSTableを一つにまとめて、上書きされたデータや削除されたデータなどを新しく作成されたSSTableから削除し、新しく作成されたデータの元となったSSTableを削除するという方法を取ることで、ストレージの無駄使いを無くしている。
このいくつかの古いSSTableから新しいSSTableを作成することをコンパクションという。

LSMツリーの概要の説明が終わったので、次に実装した時のお話をしていきたいと思います。

実装

今回実装したのは、

  • Memtableへの書き込み/読み込み
  • CommitLogへの書き込み
  • SSTableへの書き込み/読み込み
  • Size-tiered コンパクション

です。

Memtable

今回は、簡単にRustの標準で実装されているBTreeMapを使用し、それをラップする形にしました
格納するデータですが、キーに関しては単なるStringで実装し、値に関しては以下のようなEnumで実装しました

type Key = String;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Value {
    Data(String, u64), // (value, timestamp)
    /*
    Tombstone: 削除されたデータを表す
        1, 0, 0, 0, 0, 0, 0, 0, // key_len: 1
        0,
    empty string:
        0, 0, 0, 0, 0, 0, 0, 0, // key_len: 0
     */
    Tombstone(u64),
}

ここで記載されているコメントは、SSTableへ格納する際のファイルフォーマットです

CommitLog

こちらは以下のようなファイルフォーマットにしました

------------------------------------------------------------------------------------------
| コマンド(1バイト) | キーの長さ(8バイト) | キー | 値の長さ(8バイト) | 値 | タイムスタンプ(8バイト) |
------------------------------------------------------------------------------------------

基本的にmemtableに対する操作が行われたタイミングで同じようにcommitlogにも追記するような形をとっています。

            Some(value) => {
                commitlog.write_put(key, value, timestamp);
                memtable.put(key, value, timestamp)
            },
            None => {
                commitlog.write_delete(key, timestamp);
                memtable.delete(key, timestamp)
            }

SSTable

レコードに関しては以下のようなフォーマットを取りました。

------------------------------------------------------------------------
| キーの長さ(8バイト) | キー |  値の長さ(8バイト) | 値 | タイムスタンプ(8バイト) |
------------------------------------------------------------------------

SSTableにおける探索の処理ですが、別ファイルにキーとファイルオフセットのみを記述したインデックスを作成しそれを二部探索する形で行っています。

コンパクション

コンパクションに関しては、今回はsize-tieredコンパクションという手法を実装しました。
これは、ざっくりいうと同じくらいのサイズのデータが閾値を超える程度に存在したら、コンパクションを実行するという戦略です。
詳しくは、ScyllaDBの以下のリンクを参照してください。
https://opensource.docs.scylladb.com/stable/architecture/compaction/compaction-strategies.html
実際のコンパクションの処理ですが、ほとんどマージソートと同じです。
あまり、実装が綺麗でなくて恐縮なのですが一応記載します。

    fn merge(&self, sstables: Vec<SSTableData>) -> SSTableData {
        let mut target = sstables;
        while target.len() != 1 {
            target = target.chunks(2).map(|pair| {
                if pair.len() == 1 {
                    return pair[0].clone();
                }
                let ret = self.merge_impl(&pair[0], &pair[1]);
                ret
            }).collect::<Vec<SSTableData>>();
        }
        target.pop().unwrap()
    }

    fn merge_impl(&self, left: &SSTableData, right: &SSTableData) -> SSTableData {
        let mut merged = SSTableData::new();
        let mut left_iter = left.iter();
        let mut right_iter = right.iter();

        let mut left = left_iter.next();
        let mut right = right_iter.next();
        while left.is_some() && right.is_some() {
            let left_v = left.unwrap();
            let right_v = right.unwrap();
            if left_v.key() < right_v.key() {
                let _ = merged.push(left_v.clone());
                left = left_iter.next();
            } else if left_v.key() > right_v.key() {
                let _ = merged.push(right_v.clone());
                right = right_iter.next();
            } else {
                // keyが重複している場合、timestampが大きい方を選ぶ
                if left_v.timestamp() > right_v.timestamp() {
                    let _ = merged.push(left_v.clone());
                } else {
                    let _ = merged.push(right_v.clone());
                }
                left = left_iter.next();
                right = right_iter.next();
            }
        }

        while left.is_some() {
            let left_v = left.unwrap();
            let _ = merged.push(left_v.clone());
            left = left_iter.next();
        }

        while right.is_some() {
            let right_v = right.unwrap();
            let _ = merged.push(right_v.clone());
            right = right_iter.next();
        }
        merged
    }

これらの処理を1秒ごとに行うようにしています。

その他

並行処理

一番難しかったのは、実はここです。今回の実装方針として、Readerという構造体には対象となるファイルパスを保存しておき、reader.read()というような形でメソッドが呼び出されたら実際にファイルをオープンするという形をとっています。この方針がかなり面倒なことを起こしました。
例えば以下のような状況を考えます。

  1. データを取得するためReaderのイテレータを返す
  2. コンパクションが走り、ファイルが削除される
  3. Readerのreadメソッドが走る

このような時、もちろんファイルが存在しないので、エラーが出ます。
そこで、以下のような方法を取りました。(以下抜粋です)

#[derive(Debug)]
pub struct SSTableReaderManager {
    reader: SSTableReader,
    delete: AtomicBool,
}
impl SSTableReaderManager {
    pub fn delete(&self) {
        self.delete.store(true, std::sync::atomic::Ordering::Release);
    }
}
impl Drop for SSTableReaderManager {
    fn drop(&mut self) {
        let deleted = self.delete.load(std::sync::atomic::Ordering::Acquire);
        if deleted {
            std::fs::remove_file(&self.reader.file).ok();
            std::fs::remove_file(&self.reader.index_file).ok();
        }
    }
}

deleteの処理はとりあえず、フラグを立てるということをするだけにし、ReaderManagerが実際にドロップされるタイミングで削除するようにしました。
加えて、Readerを実際に必要とする関数等は以下の構造体から共有参照を得るような形をとりました。

#[derive(Debug)]
pub struct SharedSSTableReader {
    inner: Mutex<HashMap<String, Arc<SSTableReaderManager>>>,
    pub sst_dir: String,
    pub index_file_suffix: String,
}

このようにすることで、ファイルがまだ参照されている間はdropされることがないのでファイルは削除されず、参照がなくなったタイミングでファイルが削除されるようになったため、どうにかうまいこと行くようになりました。

このような方針をとるようになって、かなりコードが汚れました。。。。

まとめ

以下、実際に実装してみての個人的なよかった点、反省点です。

反省点

テスト

テストファイルをフィクスチャとして用意すればよかった

今回、ファイルの読み込み/書き込み含めどちらもテストをしなければならず、あまり他のテストへの影響を及ぼしたくなかったため、テスト中でディレクトリの作成・削除/ファイルの作成・削除を行なっていました。加えて、データを取得するメソッドであるgetは、データを保存するためのputに依存し
てしまっています。こちらをフィクスチャで用意すれば万事解決だったなと今になって思いました。

Rust

モジュールの使い方

同じレイアにあるモジュールに対して、直接依存させてしまっているのが良くなかったなと思いました。
利用する側でトレイトをかくべきだった。。。

関連関数・トレイト

最初の方針ではコンポジションパターンに倣って、Writer/Readerを実装していたのですが、encode/decodeに関してはメソッドと関連関数にしてしまっていました。

その他

欲張りすぎた

最初はLinuxでのメモリ管理単位であるページ(最近、フォリオというのが出てきてるみたいですね。)サイズ程度で固めて管理したら速いのでは??などと欲張って、ごちゃごちゃしちゃいました。

よかった点

公開する関数をラッパーにしたこと

Rustはプライベートなリソースも同じモジュールかその下のモジュールからであれば、アクセスが可能という特性を活かし、公開するメソッドでメインロジック以外の部分の処理を行い、メインロジックをプライベートにすることで、テストがとてもしやすくなりました。

簡易的なスレッドプールを実装したこと

あるテストで、

failed to spawn thread: Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }

というエラーに引っかかり、putで大量のスレッドが生成されてしまっていることが原因だということに気がつきました。そこでスレッドプールが必要になりました。
しかし、特にそこまで便利なクレートが必要なかったので、超簡単でいいから作ってみよーと思い、作ってみたものの結構やりがいがあって思わぬところで楽しみが見つかりました。

感想

自分でファイルフォーマットから決めて実装するということが初めての経験で、ゼロから手作りで作成するのは本当に楽しかったです!!
今度暇ができたらブルームフィルターなんかも実装したい!

最後まで読んでいただきありがとうございました!!!

参照

Discussion