思いつきで作る時系列データベース (in Rust) (ファイル設計編)
概要
個人プロジェクトで使う時系列データベースを自作してみた話です。
時系列データをオリジナルのクエリを使用して取得できます。
いろいろ足りていないけれどもとりあえず最低限動いており、個人的にはまあまあ使えそうかな、という段階です。
最初データベースを作るにあたって、まずDatabase internalを読んで基礎知識を貯めてから取りかかろうかと思いましたが、多分読み終わるとそれで満足してしまって実装する気にならなくなりそうだったので結局大した事前知識なしで設計に着手することにしました。
ですので記事中の名称や用語などは慣用的ではないと思われます。
モチベーション
もう去年の話ですが、個人でgo言語で作っていた仮想通貨分析システムをRustでリプレイスすることにしました。
分析にあたっては何はともあれデータのETLを行わなければいけません。分析対象の仮想通貨取引所データはほとんどが時系列データです。
以前のシステムではAWSのKinesis firehose, glue,S3,athenaを使用してデータを変換しており、データフローは下記のような感じでした。
この構成の良いところは ECSでfirefhoseにデータをpushしてS3にデータを貯めておけば後からathenaを使って好きなデータを作れること、悪いところは、adhocな分析を行う際にathenaがややもっさりするところと、AWSの料金がバカにならないということでした。特に後者ですが、このデータ分析基盤だけで月に2万円程度の費用がかかってしまっており、個人の趣味の範囲でやるには辛い状態になっていました。
特にfirehose -> Glueの部分にコストがかかっていたので、parquetへの変換とS3への書き込みをECS側で定期的にやってしまえば良いのでは?と考え、さらに、なんなら時系列に特化したデータ圧縮方式を採用しよう、ついでにathenaを使うのではなく専用のDBを作成してしまおうという流れでDBの実装に着手することにしました。
データストリームのデータ圧縮と書き込み、クエリの実行を自作DBに任せることで次のようなシンプルなフローになるはずです。
Rustを採用したのは、当時、今後はもう全部Rustでいいんじゃないかという気持ちが高まっていたからです(今でもそう)
機能設計
DBの主なユースケースは、streamでのデータ書き込みとadhocなデータ分析やbatch処理からの読み込みとしました。
また、1クエリで返るデータは大きくても数百MB程度を想定しています。
目指すもの
-
ワンバイナリでさっくりで動く
-
なるべくもっさり感をなくす
- 1日分のデータが2,3秒以内に取得できると良い
- キャッシュとしてクラウドからローカルにデータをダウンロードする
- ファイルサイズをなるべく小さくする
-
データポイントが複数の値を持つ
-
データ取得はクエリで行う
- タイムゾーンや期間の指定を楽にしたい
目指さないもの
- リアルタイム性能、低レイテンシー。
- メモリ効率
- 並行性。同時書き込み、読み込み性能。
時系列DB "zikeiretsu" のファイル設計
metrics
あるデータに対するデータポイントのグループをmetricsを呼びます。ある metricsに含まれるデータポイントは同じフィールドを持ちます。
block
zikeiretsuが扱うデータポイントはタイムスタンプ(ナノ秒)フィールドを1つ持ち、boolean,floatいずれかの型の値フィールドを複数含む事ができる事とします。
あるmetricsの一定期間ごとにタイムスタンプでソートされたデータポイントのグループをblockと呼びます。block ファイルに複数のmetricsが含まれる事はありません。
このようにしたのは実装をシンプルにするためと、複数のmetricsを並行にロードしやすくする目的があります。
blockに含まれるデータはフィールドの型ごとにそれぞれ違った方式でエンコーディングされファイルに書き込まれます。
各フィールド型のエンコーディング方式は下記のとおりです。
- タイムスタンプ : block先頭のタイムスタンプは64bitsを使用して格納。それ以降はそれぞれ1つ前のタイムスタンプとのDeltaを取り、秒単位と秒未満単位の数値に分割して64bits単位でsimple8b-rleでエンコーディング
- float : gorilla encoding(XOR encoding)
- bool : 1 value = 1bit で8 bitsごとに書き込み
一度書き込まれたblockファイルは変更されません。つまり一度作成されたblockファイルに対するデータポイントのinsert/update/deleteは不可能とします。
blockのファイル構造とエンコーディング
┌────────────────────────────────────────────┬────────────────────────┬────────────────────────────┬─────┬───────────────────────────┐
│ (1) block中のデータポイントの数 (v bytes) │ (2) field数 M (1 byte) │ (3-1) field 1の型(1 byte) │ ... │ (3-M) field Mの型(1 byte) │
├──────────────────────────────────┬─────────┴────────────────────────┴────────────────────────────┴─────┴───────────────────────────┤
│ (4) 先頭のタイムスタンプ(8 bytes) │ (5) 秒単位のTimestampのdelta (v bytes) │
├──────────────────────────────────┴──────────────────────────────────┬──────────────────────────────────────────────────────────────┤
│ (6) タイムスタンプの秒未満の数値の共通のtrailing zero の数 (8 bits) │ (7) Timestampの秒未満の数値 ( v bytes) │
├──────────────────────────────────────────────┬───────┬──────────────┴──────────────────────────────────────────────────────────────┤
│ (8-1) Field 1の データ (v bytes) │ ... │ (8-N) field Nのデータ (v bytes) │
└──────────────────────────────────────────────┴───────┴─────────────────────────────────────────────────────────────────────────────┘
(1) blockに含まれるデータポイントの数は Base 128 Variants でエンコーディングされています。
これは数値を2進数にした後7 bitごとのグループに分割して最下位グループが先頭に来るように並び替えた後、最上位ビット(MSB)を付けて8 bitごとのグループにしますが、その際に次のグループがある場合はMSBを1に、最後のグループである場合は0にセットします。
数値を可変長のエリアに効率的に格納する方法としてProtocol bufferでも使用されているようです。 難しくなさそうなので自作しました
(2) フィールド数は最大255としているので1 byteに格納します。
(3) フィールドの型情報は1 byte で表現します。現在はfloatとboolのみ対応しています
- integer ... 1 (not supported yet)
- float ... 2
- string ... 3 (not supported yet)
- timestamp nano ... 4 (not supported yet)
- bool ... 5
- unsigned int ... 6 (not supported yet)
- timestamp sec ... 7 (not supported yet)
(5) 各データポイントのタイムスタンプの秒以下を切り捨ててdeltaを取りそれをエンコーディングした値を格納します。
例えばタイムスタンプが下記の左のように並んでいた場合、秒で切り捨てると右の数値になります。
- 1627467076_985012000 -> 1627467076 (先頭)
- 1627467250_785267000 -> 1627467257
- 1627468063_600895000 -> 1627468063
- 1627468158_257010000 -> 1627468158
さらに1つ前のタイムスタンプとのDeltaを取っていくと下記の右の数値が出てきます。この数値をsimple8b-rleでエンコーディングして格納します。複合する場合は先頭のタイムスタンプからDeltaを積み上げていき、データポイント数Nになるまで繰り返します。 simple8b-rleに関しては別の記事で書きます。
- 1627467257 - 1627467076 -> 181
- 1627468063 - 1627467257 -> 806
- 1627468158 - 1627468063 -> 95
(6) タイムスタンプの秒未満の数値の共通のTrailing zeroの数を格納します
先ほど同じタイムスタンプを例にすると、秒未満の数値は右側のようになります。
- 1627467076_985012000 -> 985012000
- 1627467250_785267000 -> 785267000
- 1627468063_600895000 -> 600895000
- 1627468158_257010000 -> 257010000
それらをそれぞれ2進数に直したものが下記です。
- 111010101101100001011100100000
- 101110110011100011100100111000
- 100011110100001110111000011000
- 1111010100011010100101010000
どの数値も最下位3ビットが0である事が分かります。"3"をこのエリアに格納します
- 111010101101100001011100100[000]
- 101110110011100011100100111[000]
- 100011110100001110111000011[000]
- 1111010100011010100101010[000]
ちなみに秒未満の数値の最大数"999999999"を二進数にすると"111011100110101100100111111111"と30桁の数値になるのでこの(6)のエリアに入る数値はmax 30であり、本来5bitあれば十分なのですがロジックをシンプルにするために8bits確保しています
(7) (6)で計算したタイムスタンプの秒未満の数値をTrailing zero分だけ右シフトした値を格納します。
デコードする際は同じ分だけ左シフトすると元の値に戻るわけです。
- 111010101101100001011100100[000] -> 111010101101100001011100100 = 123126500
- 101110110011100011100100111[000] -> 101110110011100011100100111 = 98158375
- 100011110100001110111000011[000] -> 100011110100001110111000011 = 75111875
- 1111010100011010100101010[000] -> 1111010100011010100101010 = 32126250
これらの数値は昇順に並んでいるわけでは無いのでDeltaを取ると負数が出ますし、Deltaをとっても値がそれほど小さくはならないのでは?と思い、そのままの値をsimple8b-rleで格納することにしました。
Deltaを取ってZigzag encodingで正数にしてみるとどのていど圧縮されるのか興味はありますが、そもそもこの(5),(6),(7)の格納方式は、そのままDeltaをとると結構大きな数値になってしまうナノ秒を無理やり詰め込めないかを思いつきで考えたものなので、根本的に別のより良い方法があるのかもしれません。
(8)各フィールドをエンコーディングして格納しています。
float型のフィールドはGorilla encoding(Xor Encoding)を使用しています。Xor Encodingに関しては https://zenn.dev/nakabonne/articles/d300838a1500c7#xor-encoding で解説されています。オレオレ実装はコチラです https://github.com/tacogips/zikeiretsu-rs/blob/main/xor-encoding/src/lib.rs
bool方は値を1,0に直して1 byteごとに上位bitから詰め込めるだけ詰め込む方式にしています。
block list
上記のblockファイルは、blockに含まれるタイムスタンプの開始と終了を名称とする{開始タイムスタンプ}_{開始タイムスタンプ}
という形式のディレクトリに格納されます。このタイムスタンプは秒単位です。
例:
162688734_162688740
あるメトリクスに対するすべてのblockの開始終了タイムスタンプを"block list"として1つのファイルに登録しておき、
ある期間のデータポイントを検索する際はこのblock fileを参照して必要なblockファイルを特定します。
┌─────────────────────────────────────┬────────────────────────────────────────────────────┬─────────────────────────────────────────┐
│ (1)block list更新Timestamp(8 byte) │ (2) block listに含まれるタイムスタンプの数 (v byte) │ (3) 先頭の開始timestamp (v bytes) │
├─────────────────────────────────────┴───────────┬────────────────────────────────────────┴┬────────────────────────────────────────┤
│ (4) 開始タイムスタンプのDelta(8 byte) │ (5) 先頭の終了 タイムスタンプ (v bytes) │ (6) 終了タイムスタンプのDelta(v byte) │
└─────────────────────────────────────────────────┴─────────────────────────────────────────┴────────────────────────────────────────┘
それぞれのエリアのエンコーディング方式はblockと同様のBase 128 VariantやDeltaのsimple8b-rleですので説明は割愛します。
WAL (未実装)
多くのデータベースでは対障害性を上げる目的やtransactionのrollback journalとしてWrite-Ahead-Logging を採用しています。
zikeiretsuでは取引所のデータをストリームで保存している事を想定しており、耐障害性は欲しいところです、、が結局まだ未実装です。
zikeiretsuではredisのAOFに近い形で実装する事を検討していましたが、 力尽きてしまいました、
データベースディレクトリ構造
block,block listはローカルマシン上に下記のようなディレクトリ構造で保管されます。トップディレクトリの下に"データベース"という単位でディレクトリが作成され、その下にデータの種類ごとに"メトリクス"ディレクトリが作成されます。
例としては"取引所"がデータベースにあたり、"約定データ","板情報"などがメトリクスに対応します。
ファイル書き込み時にはメトリクス単位に排他処理のためのlock fileを作成しています。
データベースに紐づく形でGoogle Cloud Storageのバケットとパスを指定することで、ローカルファイル書き込み後にクラウドにアップロードすることもできます。
[zdb_top_dir]
│
├─ [database_dir]
│ │
│ ├─ (lockfile for a metrics)
│ │
│ ├─ wal (not implemented yet )
│ │ │
│ │ └─ ...
│ │
│ ├─ error
│ │ │
│ │ └── ...
│ │
│ │
│ ├─ blocklist
│ │ │
│ │ ├── {metrics_1}.list
│ │ └── {metrics_2}.list
│ │
│ └── block
│ │
│ │
│ ├── [metrics_1]
│ │ ├── 1626
│ │ │ ├─ 162688734_162688740
│ │ │ │ └─ block
│ │ │ │
│ │ │ └─ 162688736_162688750
│ │ │ └─ block
│ │ ├── 1627
│ │ │ └─ ...
│ │ │
│ │ ├── 1628
│ │ │ ├─ 162788734_162889735
│ │ │ │ └─ block
│ │ │ │
│ │ │ └─ 162898735_162899730
│ │ │ └─ block
│ │ ├─ ...
│ │
│ ├── [metrics_2]
│ └── ....
│
├─ [database2_dir]
検索時のシーケンス
zikeiretsu は受け取ったクエリをパースした後、検索対象のメトリクスのblock_listファイルを探しにいきます。ローカルに存在しなければCloud storageからダウンロードしてきます。
このblock_listに含まれるタイムスタンプを二分探索で検索し検索対象のデータを含んでいるblockファイルを特定、loadまたはクラウドからダウンロードしてデコードします。
各blockファイルは、一部タイムスタンプの範囲が重なっている可能性があります。取引所から取得するデータが遅延して届いたりするとこのような事がおきます。複数のblockファイルをマージする際にその点も考慮しています。
データの書き込みとクエリ実行の例
データの書き込みはクエリでなくRustのコード上から行います。読み込み時はクエリをziekiretsuに投げます。
query中にformatを指定することで、table,json,parquetの何れかの形式で出力できます。
クエリについては別記事で書こうと思います。
書き込みのサンプルコードは下記にあります。
気になっていたクラウド料金について
GKSのautopilotのnode1台でデータを書き込んでいますが、この料金が無料分を除いて月間1500円ほど、
cloud storageの使用量が月間700円ほどになっています。
高いと見るか安いと見るかはなんとも言えないところですが数万円かかっていた頃に比べれば遥かにマシではあります。まぁどこかの時点でクラウドにおいてあるファイルの一部を自前のストレージに移すこともできますし、保存先としてより安いgoogle drive対応を考えても良いですし対策のしようはあるかなと。
保存されるデータファイルのサイズはどれくらいでしょうか。ある取引所のある日の注文データをzikeiretsuを通して、json,parquet,parquet + snappy圧縮で出力して、zikeiretsuのデータファィルのサイズと比較してみます。
ファイル種類 | ファイルサイズ(bytes) |
---|---|
json | 49,338,802 |
parquet | 37,842,029 |
parquet + snappy | 10,133,639 |
zikeiretsu | 11,693,715 |
、、あれ、parquet + snappyよりもzikeiretsuの方が圧縮効率が悪いですね、、15%くらい大きいです。 というかsnappyがすごいです。
試しにzikeiretsuのデータファイルをsnappyで圧縮してみます。
ファイル種類 | ファイルサイズ(bytes) |
---|---|
zikeiretsu + snappy | 10,089,261 |
一応parquet + snappyと同程度にはなりました、、データファイルのsnappy圧縮は後々考えても良いかもしれません。流石にデータ分析で広く使われているフォーマットはすごいですね。
最初からparquet + snappyで保存すれば良かったのでは、という話になりますが、まぁ半分お遊びですし今後も改良の余地があるということで。とりあえずDatabase internal読んで勉強しなおします。
つづく
長くなってしまったので、エンコーディングやクエリについてはそのうちまた別の記事で書く事にします。
今回はデータファイルの設計を中心としたお話でした。
参考文献
gorilla encoding : http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
Influxdb data compression: https://www.programmersought.com/article/65292941965/
ゼロから作る時系列データベースエンジン: https://zenn.dev/nakabonne/articles/d300838a1500c7
Index compression using 64-bit words: https://www.academia.edu/51061650/Index_compression_using_64_bit_words
simple 8b rleの実装例: https://github.com/lemire/FastPFor/blob/master/headers/simple8b_rle.h
Time-series compression algorithms, explained: https://www.timescale.com/blog/time-series-compression-algorithms-explained/
Protcol buffer encodinig: https://developers.google.com/protocol-buffers/docs/encoding
Discussion