📈

思いつきで作る時系列データベース (in Rust) (タイムスタンプ圧縮編)

2022/05/29に公開約8,600字

前回の記事で自作時系列データベースのファイル構造の設計について書きました。

https://github.com/tacogips/zikeiretsu-rs

本記事ではタイムスタンプの圧縮方法について書きたいと思います。

Simple-8b-RLE

zikeiretsuのblockファイルblock listファイルは共にソート済のタイムスタンプを含んでいます。これらを効率的に保存するためのエンコーディング方法として、InfluxDBでも採用されているSimple-8bとRLE(Run Length Encoding)を組み合わせた方式を採用しました。

Simple-8b

Simple-8bの初出はこの論文と思われます。もともとサーチエンジンのポスティングリストのインデックス圧縮を念頭に考えられたアルゴリズムです。もともと同じ著者が考えていた32ビット版のsimple-4bを64ビットに拡張したもののようです。

ざっくり言えば、 64ビット中の60ビットにデータをなるべく多く詰め込んで、先頭4ビットのセレクターでデータが何個詰め込まれているか表そうぜという物です。

┌──────────────────────┬────────────────────────────────────────────────────────┐
│ セレクター (4 bits)    │   データ (60 bits)                                     │
└──────────────────────┴────────────────────────────────────────────────────────┘

セレクターは0-15の値をとり、それぞれの値に対応するデータ数が割り当てられます。セレクターとデータ数の組み合わせを元の論文から抜粋します。

selector value 0 1 2 3 ... 8 ... 10 ... 15
1データのbit 0 0 1 2 7 10 60
含まれるデータ数 240 120 60 30 8 6 1
無駄になるbit数 60 60 0 0 4 0 0

例えば、2ビットで表す事ができる0-3の値を30個格納する場合はセレクターは3になります。1つのデータを表すのに7ビット必要な場合、格納できる数は8個でありセレクターは8になります。この場合(7ビット * 8個) = 56ビットとなり60ビット - 56ビット = 4ビットは使用されず無駄になります。
セレクター15は60ビットのデータを1個だけ格納する場合に使用されます。この例では61ビット以上のデータには対応していません。セレクター0,1は特別なセレクターであり、それぞれ、1が240個、1が120個連続する場合に使用されます。

このセレクターのテーブルは、圧縮したいデータの分布などによりカスタム可能です。

(論文ではSimpleファミリーを拡張したCarryファミリーやSlideファミリーも提案されています。
また他のデータ圧縮方式のPForやPForDeltaについての言及もあるのでそれぞれの違いをいつか調べてみたいと思います)

Simple-8b + RLE

Simple-8bの論文では、"1"が繰り返されるケースのみ考慮されていましたが、その他の値が繰り返される場合効率的に圧縮するにはどうしたら良いでしょうか?
思いつくのは繰り返される値と繰り返す回数だけをそれぞれ記録する方法でRLEと呼ばれています。

Simple-8bのセレクターに、RLEのための値を割り当ててデータ部の60ビットにRLEで圧縮された値を格納することで2つの方法を組み合わせる事ができます。

┌──────────────────────────┬────────────────────────────┬───────────────────────────┐
│ セレクター (4 bits)(15)    │   データ (32 bits)         │  繰り返し回数 (28 bits)     │
└──────────────────────────┴────────────────────────────┴───────────────────────────┘

33ビット以上の値が大量に繰り返されるようなデータの場合はデータ部のサイズを調整した方が良さそうですがzikeiretsuでは秒単位のタイムスタンプを圧縮しているため気にしなくても良さそうです。

ZikeiretsuのセレクターはThe FastPFOR C++ libraryの実装を真似させてもらい下記のようにしました。


┌──────────────────┬───────────────────────────────────────────┬────────────────────────┐
│ Selector(4bits)  │  1  2  3  4  5  6  7  8  9 10 11 12 13 14 │ 15(RLE)                │
├──────────────────┼───────────────────────────────────────────┼────────────────────────┤
│ Bitsize of value │  1  2  3  4  5  6  7  8 10 12 15 20 30 60 │ 32bits (value)         │
├──────────────────┼───────────────────────────────────────────┼────────────────────────┤
│ Number of val    │ 60 30 20 15 12 10  8  7  6  5  4  3  2  1 │ up to 2^28 (repeat num)│
├──────────────────┼───────────────────────────────────────────┼────────────────────────┤
│ Wasted Bits      │             12     4  4                   │                        │
└──────────────────┴───────────────────────────────────────────┴────────────────────────┘

タイムスタンプの圧縮

上記の圧縮方法でタイムスタンプを保存します。
時系列データのタイムスタンプはソートされているのでそれぞれ1つ前のデータポイントの時間とのDeltaを取ると正数になります。
このDeltaを圧縮して保存していきます。

## タイムスタンプのDeltaの例
1653778662 : T
1653778692 : T+1
1653811138 : T+2

(T+1) - T     = 30
(T+2) - (T+1) = 32446
[30, 32446] を保存

圧縮の手順は下記です。

  1. 圧縮したいdelta:u64の配列が同じ値の繰り返から始まっているか確認。RLEでの圧縮がより効率的であればRLEで圧縮

  2. deltaの部分配列を最も効率よく圧縮できるセレクターを探し書き込み。

  3. データの末尾に到達するまで1,2を繰り返す。

復元の手順は簡単で64ビットごとにデータを読み込んでセレクターとデータ部分をbitwise演算していけばいいだけです。読み込みが高速に行えるという特性は論文中でも言及されています。

zikeirestsではblockファイル、block_listファイルには先頭のタイムスタンプの値とタイムスタンプの数を記録していますので、それらと合わせてタイムスタンプの配列を復元しています。

Simple-8bの圧縮部分のソースコードを一部抜粋しておきます。
(書き込みをもう少し効率的に行う実装方法があるかもしれません)


pub fn compress<W>(src: &[u64], dst: &mut W) -> Result<()>
where
    W: Write,
{
    if src.is_empty() {
        return Ok(());
    }

    let mut current_idx: Index = 0;
    let src_len = src.len();
    loop {
        if current_idx >= src_len {
            return Ok(());
        }

        if let Some((rle_compression, rle_bound_idx)) = should_rle_compression(src, current_idx)? {
            let compressed_bytes = rle_compression.to_u64().to_be_bytes();
            dst.write_all(compressed_bytes.as_ref())?;
            current_idx = rle_bound_idx;
        } else if let Some((compression_set, bound_idx)) =
            search_simple_8b_compress_set(src, current_idx)?
        {
            debug_assert!(current_idx < bound_idx);
            debug_assert!(bound_idx <= src.len());
            compress_simple_8b(&src[current_idx..bound_idx], dst, compression_set)?;
            current_idx = bound_idx;
        } else {
            unreachable!("current idx out of bounds. (it should be a bug)")
        }
    }
}


// compression by simple-8b

const DATA_AREA_BITS: usize = 60;
const MAX_MEANINGFUL_BIT_SIZE: usize = DATA_AREA_BITS;

macro_rules! compression_set {
    ($({$selector:expr, $bits:expr, $num_of_val:expr}),*,) => {
            static COMPRESSION_SETS: once_cell::sync::Lazy<Vec<CompressionSet>> = once_cell::sync::Lazy::new(||vec![
                $(CompressionSet {
                selector:  Simple8bSelector {val: $selector},
                meaningful_bitsize: $bits,
                contain_num: $num_of_val
            }) , *]);
    };
}

// should be sorted ascending by bitszize(2nd value in the col)
compression_set! {
    {1,  1,  60},
    {2,  2,  30},
    {3,  3,  20},
    {4,  4,  15},
    {5,  5,  12},
    {6,  6,  10},
    {7,  7,  8},
    {8,  8,  7},
    {9,  10, 6},
    {10, 12, 5},
    {11, 15, 4},
    {12, 20, 3},
    {13, 30, 2},
    {14, 60, 1},
}

fn meaningful_bitsize(n: u64) -> usize {
    let mbs = (64u32 - n.leading_zeros()) as usize;
    if mbs == 0 {
        // `0` can represented by 1 bit
        1
    } else {
        mbs
    }
}


pub(crate) fn search_simple_8b_compress_set(
    src: &[u64],
    start_idx: Index,
) -> Result<Option<(&CompressionSet, Index)>> {
    if start_idx >= src.len() {
        return Ok(None);
    }
    'compression_set_loop: for each_compression_set in COMPRESSION_SETS.iter() {
        let mut bound_idx = start_idx + each_compression_set.contain_num;
        if bound_idx > src.len() {
            bound_idx = src.len()
        }

        for val in src[start_idx..bound_idx].iter() {
            let bitsize_of_val = meaningful_bitsize(*val);

            if bitsize_of_val > MAX_MEANINGFUL_BIT_SIZE {
                return Err(Error::ValueOutOfBound(*val));
            } else if bitsize_of_val > each_compression_set.meaningful_bitsize {
                continue 'compression_set_loop;
            }
        }

        return Ok(Some((each_compression_set, bound_idx)));
    }

    Err(Error::Simple8bCompressionFailed(start_idx))
}


pub(crate) fn compress_simple_8b<W>(
    src: &[u64],
    dst: &mut W,
    compression_set: &CompressionSet,
) -> Result<()>
where
    W: Write,
{
    debug_assert!(!src.is_empty());
    let mut result: u64 = src[0];
    for each_val in src[1..].iter() {
        result <<= compression_set.meaningful_bitsize;
        result |= each_val;
    }

    let meaningful_data_bits = src.len() * compression_set.meaningful_bitsize;
    let right_packing_bits_size = DATA_AREA_BITS - meaningful_data_bits;

    if right_packing_bits_size > 0 {
        result <<= right_packing_bits_size;
    }
    result |= compression_set.selector.val << DATA_AREA_BITS;

    dst.write_all(result.to_be_bytes().as_ref())?;
    Ok(())
}


その他のコード

所感

Simple-8bのアルゴリズムについて書かれている記事はあまり多くないようですが、時系列データベースではよく使われている方法のように思われます。実装もそんなに難しくなかったので(実行効率は良くないかもしれませんが)、データ圧縮の実装に触れるには良い教材なのかと思いました。

ただ、zikeiretsuはナノ秒の精度のタイムスタンプを扱っており、そのままではDeltaをとっても大きな数値になってしまいSimple-8bでは圧縮効率が悪くなってしまうのが悩みどころです。
苦肉の策としてタイムスタンプを秒単位と秒未満単位の数値に分けて、秒単位のタイムスタンプをSimple-8b-RLEで保存する事にしたのですが、他に巨大なソート済の正数を効率的に圧縮する方法が見つかればここらへんの実装を変更する予定でいます(GorillaのDelta of Deltaの方法も試しても良いかも)

Discussion

ログインするとコメントできます