Closed11

Hands-on Data Structures and Algorithms with Rust の4章

singly-linked list と doubly-linked list はちょっと飛ばす。

内容としてはトランザクションログ(時系列データ)を効率よく検索などするためにはどうすればいいかというもので、LinkedList 系だとインデックス探索や検索そのものに時間がかかるので SkipList というデータ構造を使ってもう少し速くできるようにしよう、というもの。

SkipList は、各要素がレベル単位で紐付けされていて、次のレベルに要素がなければ下のレベルに下がって…という感じで行うもの。各駅停車、急行、特急みたいな感じで要素が駅のように配置されていて、特急側から探していってなければ急行の駅を探し、最後になければ各駅停車で探すというイメージ。

検索のアベレージが O(log n) で済む&LinkedList にはなりそうだけど、検索がちょっと LinkedList だと遅そう、というケースに使用できる感じがする。

一方で SkipList に入れられる要素は比較可能である必要があるという制約がある。

実装は下記のような感じになる。

type Link = Option<Rc<RefCell<Node>>>;

#[derive(Clone)]
struct Node {
    next: Vec<Link>,
    pub offset: u64,
    pub command: String,
}

struct SkipList {
    head: Link,
    tails: Vec<Link>,
    max_level: usize,
    pub length: u64,
}

append 関数を追加しようとしたんだけど、いきなり get_level というまだ実装していない関数が何の説明もなく出てきて、ちょっと説明の要領がよくないのが本書の弱点かも。

append 関数を実装してみる。

  1. まず head の最大レベルを取る or head があればレベルを取得する(後ほど実装するみたい)。
  2. レベル 0 から tails を各々アップデートしていく。所定のレベルに古い tails があれば next として新しい要素を追加する。ない場合は、そのレベルの tails に素で新しい要素を追加する。
  3. リスト内の最初のノードに新しい要素を追加する。
  4. リストのサイズを +1 する。

get_level の実装は、コラムにも書いてあったけど確率的なアプローチを取って決定されるらしい。こうするとより高レベルのノードが均等に割り振られたりして便利なのだそう。

impl BestTransactionLog {
    pub fn append(&mut self, offset: u64, value: String) {
        let level = 1 + if self.head.is_none() {
            self.max_level
        } else {
            self.get_level()
        };

        let new = Node::new(vec![None; level], offset, value);

        for i in 0..level {
            if let Some(old) = self.tails[i].take() {
                let next = &mut old.borrow_mut().next;
                next[i] = Some(new.clone())
            }
            self.tails[i] = Some(new.clone());
        }

        if self.head.is_none() {
            self.head = Some(new.clone());
        }

        self.length += 1;
    }

    fn get_level(&self) -> usize {
        todo!()
    }
}

get_level の実装はこんな感じ。

    fn get_level(&self) -> usize {
        let mut n = 0;
        while rand::random::<bool>() && n < self.max_level {
            n += 1;
        }
        n
    }

この手法については論文が元ネタみたいで、ちなみに論文の結論には「SkipListはいらない。Skip List でできることは Balanced Trees でできる。あと、ワーストケースの時間的な制約においてもいい結果をもたらす」と書いてある。ただ、Balanced Trees にも欠点があって、実装がとにかくめんどくさい。Skip List はその点ではデータ構造がシンプルという点でよいかもしれない。だそう。

https://15721.courses.cs.cmu.edu/spring2018/papers/08-oltpindexes1/pugh-skiplists-cacm1990.pdf

find の実装。さすがに LinkedList や Vector よりかは少し複雑になる。

今回はログのオフセットというものを検索のキーとして探す。

最初のレベルでもっとも高いレベルの箇所から順にまず要素が存在しているかを確認していく。なければ start_level を下げて次の検索を行う。このとき発見された start_level は記録されて後続のループで再び使用される。

次のループでも同様にレベルの高い箇所から順に探していくことになる。先ほどの探索結果の続きの start_level から行う。

さらに次の要素のレベルを探索し、offset よりも小さいかどうかを判定し小さければ要素をとっておく。

最後の if 文で offset 同士が要求されたものと等しいかどうかを見る。

ポイントは各要素のトップレベルに紐付いている next を垂直にたどっていく探索をしているというところかな?これを自分で空で実装してくださいと言われるとちょっとむずかしいかも…

    pub fn find(&self, offset: u64) -> Option<String> {
        match self.head {
            Some(ref head) => {
                let mut start_level = self.max_level;
                let node = head.clone();
                let mut result = None;

                loop {
                    if node.borrow().next[start_level].is_some() {
                        break;
                    }
                    start_level -= 1;
                }

                let mut n = node;
                for level in (0..=start_level).rev() {
                    loop {
                        let next = n.clone();
                        match next.borrow().next[level] {
                            Some(ref next) if next.borrow().offset <= offset => n = next.clone(),
                            _ => break,
                        };
                    }

                    if n.borrow().offset == offset {
                        let tmp = n.borrow();
                        result = Some(tmp.command.clone());
                        break;
                    }
                }

                result
            }
            None => None,
        }
    }

次は動的配列通称ArrayList。

pub struct TimestampSaver {
    buf: Box<[Option<Node>]>,
    cap: usize,
    pub length: usize,
}

Java の ArrayList の配列の領域拡張をそのまま真似して実装している。Rust の標準ライブラリの Vector の成長のさせ方はちょっと違っていたはず。(足りなくなったら今保有しているキャパシティの2倍の配列を用意するみたいな感じだった気がする。で、clone ではなく unsafe ブロックで囲みつつポインタをつけかえるみたいなことをしていた記憶が)

impl TimestampSaver {
    fn grow(&mut self, min_cap: usize) {
        let old_cap = self.buf.len();
        let mut new_cap = old_cap + (old_cap >> 1);

        new_cap = std::cmp::max(new_cap, min_cap);
        new_cap = std::cmp::min(new_cap, usize::max_value());

        let current = self.buf.clone();
        self.cap = new_cap;

        self.buf = vec![None; new_cap].into_boxed_slice();
        self.buf[..current.len()].clone_from_slice(&current);
    }

インデックスによる要素の取得も実装する。

impl TimestampSaver {
    fn grow(&mut self, min_cap: usize) {
        let old_cap = self.buf.len();
        let mut new_cap = old_cap + (old_cap >> 1);

        new_cap = std::cmp::max(new_cap, min_cap);
        new_cap = std::cmp::min(new_cap, usize::max_value());

        let current = self.buf.clone();
        self.cap = new_cap;

        self.buf = vec![None; new_cap].into_boxed_slice();
        self.buf[..current.len()].clone_from_slice(&current);
    }

    pub fn at(&mut self, index: usize) -> Option<u64> {
        if self.length > index {
            self.buf[index].flatten()
        } else {
            None
        }
    }
}

なお本の中で at 関数は

    pub fn at(&mut self, index: usize) -> Option<u64> {
        if self.length > index {
            self.buf[index]
        } else {
            None
        }
    }

として実装されているが、たぶんこれは誤植だと思う。self.buf[index] で取得してしまうと、Option<Option<u64>> になってしまい型が合わない。flatten が必要になるんじゃないかな。ということで、先ほどの実装には付け足してある。

イテレータを実装して終了。ベクタに対するイテレータなので、とくに変わったところはない。

pub struct ListIterator {
    current: usize,
    data: Box<[Node]>,
}

impl Iterator for ListIterator {
    type Item = u64;

    fn next(&mut self) -> Option<u64> {
        if self.current < self.data.len() {
            let item = self.data[self.current];
            self.current += 1;
            item
        } else {
            None
        }
    }
}

impl DoubleEndedIterator for ListIterator {
    fn next_back(&mut self) -> Option<u64> {
        if self.current < self.data.len() {
            let item = self.data[self.current];
            if self.current == 0 {
                self.current = self.data.len() - 1;
            } else {
                self.current -= 1;
            }
            item
        } else {
            None
        }
    }
}

リストは既知で、SkipList もなんとなく聞いたことがあるデータ構造だったので、とくに新しい発見はなかった。

そこで、『みんなのデータ構造』を読み返してみたところ、ちょっとおもしろいトピックスを発見した。

動的配列は CPU キャッシュと相性がいい

1つ1つジャンプせずに入れ込むので。

XORList

LinkedList だと prev と next の2つのポインタをもつ必要があるが、この XORList は nextprev というポインタをひとつ持つだけで実現できるらしい。

nextprev は next XOR prev (要するに next と prev の排他的論理和)を保持する。この実装は Java や Python などのポインタをもたない言語では実装できないらしい。

C によるサンプル実装があるので、あとで読んでみる。

https://github.com/kylelaker/xorlist

XORList の基本的なアイディアとしては、

[Start = NULL] <-> [A] <-> [B] <-> [C] <-> [D]

という Doubly Linked List があったとき、

NodeA
prev = NULL, next = address(B)
NodeB
prev = address(A), next = address(C)
NodeC
prev = address(B), next = address(D)
NodeD
prev = address(C), next = NULL

となる。これに対する nextprev をそれぞれ取ると、

NodeA
nextprev = NULL XOR address(B)
NodeB
nextprev = address(A) XOR address(C)
NodeC
nextprev = address(B) XOR address(D)
NodeD
nextprev = address(C) XOR NULL

となる。

https://www.geeksforgeeks.org/xor-linked-list-a-memory-efficient-doubly-linked-list-set-1/

C++ による実装例がある

https://www.geeksforgeeks.org/xor-linked-list-a-memory-efficient-doubly-linked-list-set-2/

Geek for Geeks というサイト、この手の CS に関連する勉強をしていると絶対検索結果に出てくるな。

このスクラップは2021/01/01にクローズされました
ログインするとコメントできます