RustでCRDTを使ったリアルタイム共同編集機能を作ってみる話

2023/04/11に公開

メモ代わりにZennに記事を書いているせいで、下書きが積まれまくっているmasamikiです。

はじめに

FigmaとかかMiroとかゲームならマインクラフトとか共同編集を機能としてもっているものは世にいろいろありますが、共同で編集することの需要が今後も増えていきそうないかなさそうな、P2Pの世界も広がっていきそうないかなさそうな気がするので、「共同編集」もっと詳しく知っておかないとなと、そのあたりを書いてみました。
(わかりやすく説明するのがとても難しい………)

変なとこがあればコメントください。追記や修正していきます。

リアルタイム共同編集の仕組み

マルチプレイヤーな仕組みを検討する時に考えなければいけない、データのコンフリクトです。

同じものを同じタイミングで編集した場合どうなるの…?
後勝ち(Last-writer-wins)にしちゃえば?
編集したものを、他の人が編集してしまった後のUndoってどうなるの…?
後から編集されたように見えるけど、本当に後だったの?
オフラインで編集された値はどう反映されるの?

考えることはいっぱいあるかと思います。

そんな共同編集を行うためのの、データの扱い方として

  1. OT (Operational Transformation)
  2. CRDT (Conflict-free Replicated Data Type)

の2つがあげられます。

OTは、1つのデータを正とするもので、CRDTは「コンフリクトしない分散型のデータ」という名前の通り、データが分かれていても問題無く合体できますよというデータの構造を用意してあげるものです。

OTは、単一サーバーへ集中する形になるので、パフォーマンスのオーバーヘッドが少ないテキストのドキュメントの共同編集ぐらいならならOTでもいいですが、オフライン編集やP2Pでの整合姓担保ができるようにするならCRDT、みたいに使い分けたりされているのだとか。

某会社の記事によると、リッチテキストにCRDTを採用すると、若干、UXに問題がでてしまうらしいです。

https://youtu.be/42KDvoFqzsI

という話とともに動画が添付されてたのですが、この動画は、他人が編集後にマウスカーソルが移動しちゃう、という程度の問題なので、他の対策でもなんとかできるんじゃないかと思ったり。とりあえず、この動画を作った会社は、CRDTだと、他にも大きな落とし穴がでてきそうだ…、ということでOTを採用したようです。

OTの方が歴史は古いでが、GoogleのDocsやWaveにも利用されていますし、Microsoftも多くのサービスに採用しており、現在でも実用性の高い仕組みです。

とはいえ、元Google WaveエンジニアでShare.JSの作成者、かつ、shareDBの主要コミッターであるジョセフさんは、「未来はCRDT」だと、自身のブログ書いていたりします。

こっちの記事でも、「OTは、もうその研究の限界が見えはじめ、サーバーベースのOTとTransform Property2を備えたOTがなんとか現在も生き残っている。2006年頃に出現し、2011年に正式に定義されたCRDTが、今では最も愛される仕組みとなっている」と書かれていたりします。

Figmaに関しては、「CRDTを参考にしたCRDTではないもの」をRustで実装しているそうです。
詳しくはFigmaのCTOのブログへ。

OTを実感したい方はこちら

Yjsのように、ライブラリもあるので、そんなに原理を意識する必要もない技術なのかもしれないですけが…

参考までにRedisの記事も置いときます。
https://redis.com/blog/diving-into-crdts/

CRDTの種類

CRDTのレプリケーションのモデルは大きく2種類があります。

  1. Commutative Replicated Data Type (Operation-Based)
  2. Convergent Replicated Data Type (State-Based)

訳すと「可換複製データ型」(操作ベース)と「収束複製データ型」(状態ベース)となります。CmRDT、CvRDTとも略すとのこと。

操作ベース(CmRDT)のCRDT

CmRDTには、以下の特徴があります。

  • データのすべてのレプリカに更新情報を送信する必要がある
  • 更新を確実に配信する必要があるため、ネットワークが強くないといけない
  • 現在、誰が参加しているのか把握しておくことが重要

例えば、整数のCRDTの場合、+10と-20の操作がおこなれた際、「10追加した」「20減らした」という操作をすべてのレプリカにブロードキャストします。レプリカ達は更新を受け取り、自身に適用します。

これは「可換」ではあります。+10が先でも-20が先にしても計算ができるので。
ただし、これは「冪等」であるとは限りません。

例えば、どっかのレプリカが+10したとします。またどっかのレプリカが/20したとします。
その場合、(0 + 10) / 20 = 0.5とも(0 / 20) + 10 = 10ともなり得る、つまり結果が同じとは限りません。
そのため、通信インフラストラクチャは、レプリカに対するすべての操作が、重複することなく、任意の順序で他のレプリカに配信されるようにする必要がある、という厳しい条件がCmRDTには課せられます。

また、CmRDTはトランザクションの数が多くなりがちで通信負荷が大きくなるというデメリットも結局あります。

CmRDTは、2006年に出てきたCRDTの最初の考え方で、結局OTの方がいいよね、ということでOTの方が当時は採用されていたとのことです。

状態ベース(CvRDT)のCRDT

CvRDTには、以下の特徴があります。

  • データはローカルで更新(操作)され、レプリカに送信されてマージされる
  • 更新は増加のみする(減算が扱えないという意味ではなく、減算も扱えますが、それは増加を扱っているというかそんな感じで……あとで書きます)
  • 操作ベースよりも理解しやすい
  • 参加者の増減をコントロールしやすい
  • 操作ベースより多くのデータがネットワークを介して送信される

主立ったCRDTには、状態ベースが使われており、今回もこちらの話を進めようと思います。

状態ベースオブジェクト

CRDTの前に、状態ベースについて、状態ベースオブジェクトを例に話します。
内容はほぼ、参考のままなので、詳しく読みたい人はこちらへ。

状態ベースのオブジェクトは、以下の4つの特徴を持ちます。

  1. 内部に状態を持つ
  2. クエリのメソッドを持つ
  3. 状態更新のメソッドを持つ
  4. マージのメソッドを持つ

pythonで書くとこんな感じ。

class Average(object):
  def __init__(self):
    self.sum = 0
    self.cnt = 0

  def query(self):
    if self.cnt != 0:
      return self.sum / self.cnt
    else:
      return 0

  def update(self, x):
    self.sum += x
    self.cnt += 1

  def merge(self, avg):
    self.sum += avg.sum
    self.cnt += avg.cnt

このAvarageクラスのインスタンス(レプリカ)を使って状態を変化させていくと、

state avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:1 cnt:1 1 {α1}
a2 sujm:4 cnt:2 2 {α2}

というように変化していきます。

CRDTにするための、状態の操作に関して、大事なキーワードは4つ。

  1. Causal History(因果関係の履歴)
  2. Convergent(収束)
  3. Eventually Consistent(結果整合姓)
  4. Strongly Eventually Consistent(強い結果整合姓)

3と4の違いですが、3、4どちらも結果(query)が同一値になるという意味では一緒です。違いの詳細は後ほど。

マージしない2つのインスタンス(レプリカ)a、bは、こんな感じで進行します。

state avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:1 cnt:1 1 {α1}
b0 sujm:0 cnt:0 0 {}
b1 sujm:2 cnt:1 2 {β1}
b2 sujm:6 cnt:2 3 {α1, β1}

CRDTにしよう案0 状態をマージしてみる

さて、マージした場合どうなるか見てみましょう。

staet avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:2 cnt:1 1 {α1}
a2 sujm:6 cnt:2 3 {α1, β1}
b0 sujm:0 cnt:0 0 {}
b1 sujm:4 cnt:1 4 {β1}

Causal Historyとは、すべてのリクエストに対する識別子の集合です。マージされたa2のCausal Historyは、aとbのCausal Historyの和集合となっているのが分かるかと思います。
Queryだけでなく、このCausal Historyが、a、bのレプリカで同値だということも、Convergent(収束)の条件となります。

a2は、b1をマージしたレプリカですが、結果(Query)がb1とEventually(同一)じゃないですよね。
そして、a2をb1にマージしても、Queryは同じ値にはならないでしょう。

つまりこれは、 Causal Historyは同値なのに、Convergent(収束)しないし、Eventually Consistent(結果整合姓)でもない、ということになります。

さて、「3. Eventually Consistent(結果整合姓)」と「4. Strongly Eventually Consistent(強い結果整合姓)」の違いなのですが、4 は、同一のCausal Historyを持つ場合に、状態も同一値になるという条件があるのが、3とは違うところです。

CRDTを目指し、我々はConvergent(収束)且つStrongly Eventually Consistent(強い結果整合姓)になるよう、マージの方法をまた変えてみましょう。

CRDTにしよう案1 状態をマージしない

class NoMergeAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    # Ignore merge requests!
    pass

マージなんかしない。無視。思い切った改良ですね。

staet avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:2 cnt:1 2 {α1}
a2 sujm:2 cnt:2 1 {α1, β1}
a3 sujm:2 cnt:2 1 {α1, β1}
b0 sujm:0 cnt:0 0 {}
b1 sujm:4 cnt:1 4 {β1}
b2 sujm:4 cnt:1 4 {α1, β1}
b3 sujm:4 cnt:1 4 {α1, β1}

こうしてしまうと、一定の値にConverge(収束)は確かにします。しかし、aとbがことなる場所にConvergeしてしまっています。

つまり、Convergent(収束)だけど、Eventually Consistent(結果整合姓)じゃないということになります。

CRDTにしよう案2 Bの状態はみんな無視する

class BMergeAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    if on_server_b():
      self.sum = avg.sum
      self.cnt = avg.cnt
    else:
      # Server a ignores
      # merge requests!

bのレプリカだけ、マージすると他のレプリカの状態が代入される。b以外はマージしない、そんなマージです。

staet avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:0 cnt:0 0 {α1}
a2 sujm:0 cnt:0 0 {α1}
b0 sujm:0 cnt:0 0 {}
b1 sujm:4 cnt:1 4 {α1}
b2 sujm:0 cnt:0 0 {α1}

Converge(収束)してます。そしてEventually Consistent(結果整合姓)もありますね。

ただ、何か違う気がしませんか……そう、よく見てください。
a1とb1をよく見てください。
Causal Historyが同値なのに、Queryが違うんですよ。

つまり、これが、Strongly Eventually Consistent(強い結果整合姓)ではない、ということになります。同一のCausal Historyを持つ場合に、状態も同一値になるという条件を満たしていないため、これはCRDTではないと言えます。

CRDTにしよう案3 最大値にする

class MaxAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    self.sum = max(self.sum, avg.sum)
    self.cnt = max(self.cnt, avg.cnt)

レプリカをマージする時に、大きいものを状態として採用するという方法です。

staet avarage(query) causal history
a0 sujm:0 cnt:0 0 {}
a1 sujm:2 cnt:1 2 {α1}
a2 sujm:4 cnt:1 4 {α1, β1}
a3 sujm:4 cnt:1 4 {α1, β1}
b0 sujm:0 cnt:0 0 {}
b1 sujm:4 cnt:1 4 {β1}
b2 sujm:4 cnt:1 4 {α1, β1}
b3 sujm:4 cnt:1 4 {α1, β1}

こうすることで、Causal HistoryもQueryも収束しており、同値になっていますね。Eventually Consistent(結果整合姓)ということです。

さらに、同じCausal Historyのaとbを比較して見てください。
Causal Historyが同じ時に、Queryも同じ、つまり、Strongly Eventually Consistent(強い結果整合姓)!

最大値を採用する、これがCRDTの1番簡単なやり方といって良いかもですね。

まとめると

これらのクラスの性質は、こんな感じになります。

Converge Eventually Consistent Strongly Eventually Consistent
Average × × ×
NoMergeAverage × ×
BMergeAverage ×
MaxAverage

CRDT

ここの内容もほぼ参考のままなので、英語で読みたい方はそちらへ。

CRDTを目指して状態ベースのオブジェクトを変えていきましたが、実はもう少し満たさなければならない条件があります。CvRDTの説明とも内容が被りますが。

  1. マージは結合的であること: xにyをマージしてzをマージしても、yにzをマージしたものをxとマージしても結果が同じになる。(merge(x, y), z) == merge(x, merge(y, z))
  2. マージは交換可能であること: xにyをマージしても、yにxをマージしても結果が同じになる。merge(x, y) == merge(y, x)
  3. マージが冪等であること: xにxをマージしても結果はxであること。 merge(x, x) == x
  4. 更新は増加すること

例えば、このクラスはCRDTの条件を満たすと言えます。

class IntMax(object):
  def __init__(self):
    self.x = 0

  def query(self):
    return self.x

  def update(self, x):
    assert x >= 0
    self.x += x

  def merge(self, other):
    self.x = max(self.x, other.x)
  1. マージは結合的で、
merge(merge(a, b), c)
= max(max(a.x, b.x), c.x)
= max(a.x, max(b.x, c.x))
= merge(a, merge(b, c))
  1. マージは交換可能で、
  merge(a, b)
= max(a.x, b.x)
= max(b.x, a.x)
= merge(b, a)
  1. マージは冪等で、
  merge(a, a)
= max(a.x, a.x)
= a.x
= a
  1. 更新は増加のみしていきます。
  merge(a, update(a, x))
= max(a.x, a.x + x)
= a.x + x
= update(a, x)

ここでは、そんな条件を満たす4つのCRDT、

  1. GCounter
  2. PN-Counter
  3. G-Set
  4. 2P-Set

を紹介してみます。

※他、LWW-Element-Set、OR-Set, ORSWOTも、Rustのライブラリの説明の時に話せたらと思ってます。

G-Counter

G-Counterは、増加するけど減少はしないレプリカカウンターです。
G-Counterは、以下の特徴を持ちます。

  1. n個のマシンにレプリケートされたGCounterの状態は、nの長さの整数の配列。
  2. Queryメソッドは、配列のすべての要素の合計を返す。
  3. Add(x)メソッド(更新メソッド)は、i番目のサーバーで実行された際、配列のi番目をx増やす。
  4. マージメソッドは、2つのレプリケーションの配列の、同じindexの値を比較して、大きい方を残した配列にする。

クラスにすると、こんな感じ。

class GCounter(object):
  def __init__(self, i, n):
    self.i = i # server id
    self.n = n # number of servers
    self.xs = [0] * n

  def query(self):
    return sum(self.xs)

  def add(self, x):
    assert x >= 0
    self.xs[self.i] += x

  def merge(self, c):
    zipped = zip(self.xs, c.xs)
    self.xs = [max(x, y) for (x, y) in zipped]

これを進めていくと、

staet query causal history
a0 i:0, n:2, xs:[0,0] 0 {}
a1 i:0, n:2, xs:[1,0] 1 {α1}
a2 i:0, n:2, xs:[1,2] 3 {α1, β1}
b0 i:1, n:2, xs:[0,0] 0 {}
b1 i:1, n:2, xs:[0,2] 2 {β1}
b2 i:1, n:2, xs:[0,6] 6 {β1, β2}
b3 i:1, n:2, xs:[1,6] 7 {α1, β1, β2}

という形で、データを収束させていきます。

PN-Counter

PN-Counterは、増加も減少もするレプリカカウンターです。
あれ?更新は増加だけでは…?と一瞬思うかもですが、特徴を見てみましょう。

  1. PN-Counterは状態として、pとnという名前の2つのG-Counterを持つ。pは追加された値の合計値を、nは減らされた値の合計値を表す。
  2. Queryメソッドは、pのQuery - nのQuery。
  3. Add(x)メソッド(更新メソッド)は、pのAdd(x)を実行する。
  4. Sub(x)メソッド(更新メソッド)は、nのAdd(x)を実行する。
  5. マージメソッドは、pとnそれぞれのマージを実行する。

という特徴から、pがPositive、nがNegativeだという意味も察していらっしゃるかと。

クラスにすると、こんな感じ。

class PNCounter(object):
  def __init__(self, i, n):
    self.p = GCounter(i, n)
    self.n = GCounter(i, n)

  def query(self):
    return self.p.query() - self.n.query()

  def add(self, x):
    assert x >= 0
    self.p.add(x)

  def sub(self, x):
    assert x >= 0
    self.n.add(x)

  def merge(self, c):
    self.p.merge(c.p)
    self.n.merge(c.n)

これを進めると、

staet query causal history
a0 p.xs:[0, 0], n.xs:[0, 0] 0 {}
a1 p.xs:[1, 0], n.xs:[0, 0] 1 {α1}
a2 p.xs:[1, 0], n.xs:[0, 2] -1 {α1, β1}
b0 p.xs:[0, 0], n.xs:[0, 0] 0 {}
b1 p.xs:[0, 0], n.xs:[0, 2] -2 {β1}
b2 p.xs:[0, 4], n.xs:[0, 2] 2 {β1, β2}
b3 p.xs:[1, 4], n.xs:[0, 2] 3 {α1, β1, β2}

こんな感じで、増加も減少もできます。

G-Set

G-Setは、カウンターではなく、要素を管理してくれるCRDTです。

  1. 状態は単なるSet(集合)。
  2. QueryメソッドはSetを返す。
  3. Add(x)メソッド(更新メソッド)はxをSetに追加する。
  4. マージメソッドは、レプリカ間のSetの和集合を作る。

要素の追加ができて、削除ができないので、G-CounterのSet版と言えばいいでしょうか。

クラスにすると、こんな感じ。

class GSet(object):
  def __init__(self):
    self.xs = set()

  def query(self):
    return self.xs

  def add(self, x):
    self.xs.add(x)

  def merge(self, s):
    self.xs = self.xs.union(s.xs)

これを進めていくと、

staet query causal history
a0 {} {} {}
a1 {1} {1} {α1}
a2 {1, 2} {1, 2} {α1, β1}
b0 {} {} {}
b1 {2} {2} {β1}
b2 {2, 4} {2, 4} {β1, β2}
b3 {1, 2, 4} {1, 2, 4} {α1, β1, β2}

という感じで、Set(集合)を増加させていきます。

2P-Set

2P-Setは、追加も削除んもできるCRDTです。

  1. aとrという名前のG-Setを状態として持つ。aは追加、rは削除を表す。
  2. Queryメソッドは、aのQuery - r.Query。
  3. Add(x)メソッド(更新メソッド)は、aのAdd(x)を実行する。
  4. Sub(x)メソッド(更新メソッド)は、rのAdd(x)を実行する。
  5. マージメソッドは、aとrそれぞれのマージを実行する。

何か、PN-Counterみたいな感じですね。
ちなみに、削除のSetは「トゥームストーン(墓石)」Setとも呼びます。

クラスはこんな感じ。

class TwoPSet(object):
  def __init__(self):
    self.a = GSet()
    self.r = GSet()

  def query(self):
    return self.a.query() - self.r.query()

  def add(self, x):
    self.a.add(x)

  def sub(self, x):
    self.r.add(x)

  def merge(self, s):
    self.a.merge(s.a)
    self.r.merge(s.r)

これを進めると

staet query causal history
a0 a:{}, r:{} {} {}
a1 a:{1}, r:{} {1} {α1}
a2 a:{1}, r:{2} {1} {α1, β1}
b0 a:{}, r:{} {} {}
b1 a:{}, r:{2} {} {β1}
b2 a:{}, r:{1,2} {} {β1, β2}
b3 a:{1}, r:{1,2} {2} {α1, β1, β2}

というような感じです。

2P-Setは、一度削除した要素を再度追加することはできないというのも特徴です。
これを「Remove-Wins」と言います。

rust-crdt

さて、今回はCRDTのためのクレート「rust-crdt」を試してみたくて、書き始めた記事ですが、ここまで来るのが大分遅くなりました。

RustでCRDTと検索すると

https://docs.rs/crdts/latest/crdts/
https://docs.rs/ron-crdt/latest/ron_crdt/

この2つがでてくるのですが、後者はメンテもされてなさそうですし、ちょっと特殊なので、前者を試した次第です。

dependenciesにクレートを追加

cargo-editを使っているなら

cargo add crdts

でプロジェクトにcrdtを追加。
この記事書いている時点だと7.0.0ですかね、バージョンは。

Exampleをやってみよう

これは、クレートのリポジトリに入っているExampleの1つです。
BoBとAliceがそれぞれ共有のパスワードを変更するその処理を書いたものになります。

use crdts::{CmRDT, CvRDT, VClock};
use std::cmp::Ordering::*;

fn main() {
    #[derive(Debug, Default, Clone, PartialEq)]
    struct VersionedString {
        clock: VClock<String>,
        data: String,
    }
    let shared_password = VersionedString::default();

    // alice and bob take a copy ...
    let mut bobs_copy = shared_password.clone();
    let mut alices_copy = shared_password;

    // bob edits the shared password..
    bobs_copy
        .clock
        .apply(bobs_copy.clock.inc("BOB".to_string()));
    bobs_copy.data = "pa$$w0rd".to_string();

    // ... and shares it with alice.

    // Alice first compares the vclock of her copy with bob's:
    match alices_copy.clock.partial_cmp(&bobs_copy.clock) {
        Some(Less) => { /* bob's clock is ahead */ }
        _ => panic!("Bob's clock should be ahead!!"),
    }
    // Alice sees that bob's clock is ahead of hers.
    // This tells her that Bob has seen every edit she has
    // seen and his string is a more recent version.
    alices_copy = bobs_copy.clone();

    // Now, alice decides to changes the password.
    alices_copy
        .clock
        .apply(alices_copy.clock.inc("ALICE".to_string()));
    alices_copy.data = "letMein32".to_string();

    // But! concurrently, bob edits the password again!
    bobs_copy
        .clock
        .apply(bobs_copy.clock.inc("BOB".to_string()));
    bobs_copy.data = "0sdjf0as9j13k0zc".to_string();

    // Alice shares her edit with bob and bob compares clocks
    match bobs_copy.clock.partial_cmp(&alices_copy.clock) {
        None => { /* these clocks are not ordered! */ }
        _ => panic!("These clocks are not ordered!"),
    }

    // If we take a look at the clocks we see the problem.
    assert_eq!(format!("{}", bobs_copy.clock), "<BOB:2>");
    assert_eq!(format!("{}", alices_copy.clock), "<ALICE:1, BOB:1>");

    // bob's version counter is bigger on his copy but alices
    // version counter is bigger on her copy
    // (version counters default to 0 if not present in a clock)

    // This is how VClocks can be used to detect conflicts.
    // Bob needs to manually look at the two strings and decide
    // how to manage this conflict.

    // Bob decides to keep Alices string, he merges alices clock
    // into his to signify that he has seen her edits.
    bobs_copy.clock.merge(alices_copy.clock.clone());
    bobs_copy.data = "letMein32".to_string();

    // looking once more at bob's clock we see it includes all
    // edits done by both bob and alice
    assert_eq!(format!("{}", bobs_copy.clock), "<ALICE:1, BOB:2>");

    // Once Alice receives bob's updated password she'll see that
    // his clock is ahead of hers and choose to keep his versioned string.
    match alices_copy.clock.partial_cmp(&bobs_copy.clock) {
        Some(Less) => {
            // bob's clock is ahead
            alices_copy = bobs_copy.clone()
        }
        _ => panic!("Alice's clock should be behind!!"),
    }

    assert_eq!(alices_copy, bobs_copy);
}

ここで、VClockという言葉がでてきました。

VClockとは、分散システムでのイベントの半順序を決定し、因果関係のバリデーションを行うことのできるデータ構造Vector Clockの構造体でです。

半順序

1978年、レスリー・ランポート(Leslie Lamport)が、happened beforeという半順序(partial order)を定義しました。
複数のオブザーバーがいたとき、「e1 が e2 に因果的に影響を与えるときに限り、e1 は e2 に先立って起きる」半順序のみがあるという理論です。
参考

この順序というのは、集合の二項関係により示される性質です。

二項関係とは、とある集合Sから、集合Sの要素を2つ並べたものを何個か集めた集合のことです。

例えば、

S = \{1, 2, 3\}

とした時、直積集合であるS × Sは、

S = \{(1,1), (1,2), (1,3), (2,1), (2,2), (2,3), (3,1), (3,2), (3,3)\}

となり。 S × Sの部分集合がSの二項関係なので、そこから任意のものを取り出して作った集合

\{(1,1), (3,2)\}, \{(1,3), (2,1), (3,2)\}, \{(1,1), (2,2)\}

のような集合が二項関係の集合となります。

いや……分かりづらいですね。
こうしましょう。

\begin{aligned} A &= \{ 佐藤, 田中, 鈴木 \}\\\\ B &= \{ ラーメン, 蕎麦, うどん \} \end{aligned}

そして、この集合は、

\{(佐藤, ラーメン), (田中, うどん)\}

AからBへの嫌いな食べ物という二項の関係を表しています。

そして、これはABの直接集合

\{(佐藤, ラーメン), (田中, ラーメン), (鈴木, ラーメン),(佐藤, 蕎麦), (田中, 蕎麦), (鈴木, 蕎麦), (佐藤, うどん), (田中, うどん), (鈴木, うどん)\}

に含まれてますよね。

これらの二項の関係において、

  1. 任意のx ∈ Xに対してx ≤ x(反射律)
  2. x, y ∈ Xに対してx ≤ y, y ≤ xならばx = y(反対称律)
  3. x, y, z ∈ Xに対して,x ≤ y, y ≤ zならばx ≤ z(推移律)
  4. 任意のx, y ∈ Xに対して,x ≤ yまたはy ≤ x(完全律)

という関係性のうち、反射律・反対称律・推移律が満たされる関係を半順序関係といい、完全律も満たすものを全順序と言います。(詳しくはこれとかこれを読む方がいい)

レスリー曰く、特殊相対性理論が、時空におけるイベントの不変の全順序付けがないことを教えてくるそうです。
よくわからんです。

ただ、確かに以下のような例で考えると

(a1)AliceがBBQやりたいと言ったから→BobがChrisに伝えて→(c3) Chrisが行きたがった、という例から、イベントによる順序というものが匂えるのではないでしょうか?

a1とc2(Chirsが暇)については、どっちが先かなんてわからないイベントであり、4. は満たされないことも、なんとなくふんわりと分りそうじゃないでしょうか?

Causal History

BBQの例においても、a, b, cの各イベントの因果を最もシンプルに追えるのが、Causal Historyです。
各イベントに、ユニークな名前(例えば、a1のようなノード名 + カウンター)もとい識別子を付け、メッセージ送信時に、それ(Causal History)を一緒に送信し、因果を管理します。

CRDTの例で書いていたので、それほど説明はいらないですね。
因果から順序を判定し、マージを行えるようにします。

Vector Clock

Causal Historyは言い仕組みですが、コンパクトではないという問題があります。どこまでも増えていくためです。
なので、各ノードの最新の数字だけ保存すれば良い、つまり、{a1, a2, b1, c1}ではなく、{2, 1, 1}またば[2, 3, 3]のようにすれば良い、即ちベクトルとして管理すればいいじゃないかという考えがVector Clockです。

詳しくはこちら

このExampleは、このVClockを使ってパスワードが変更のコンフリクトをチェックするExampleです。

VClockはの構造はこんな感じ、dotsというfieldを持っています。
識別子とカウンターのKVですね。

pub struct VClock<A: Ord> {
    /// dots is the mapping from actors to their associated counters
    pub dots: BTreeMap<A, u64>,
}

コンフリクトしない場合

やっとExample。

  1. Bobのクロックを進める

bobs_copy.clock.inc("BOB".to_string())でBOBのclockを1進めています。
正確にはBOBという文字列がactor(識別子、dotsのキー)として、dots内のBOBのバリューが増える感じです。
も少し正確にいうと、バリューが増えたという操作(Op)が返ってきている処理です。

clock.applyすることで、それをbob_copyのclockに反映させます。

applyの内容はシンプルで、操作に付いているカウンターと、VClockが持っているカウンターを比較して、取込かどうかを評価して取り込んでいるというような形になっています。

    fn apply(&mut self, dot: Self::Op) {
        if self.get(&dot.actor) < dot.counter {
            self.dots.insert(dot.actor, dot.counter);
        }
    }
  1. BobのクロックとAliceのクロックの順序を比較する

partial_cmpというメソッドで、レプリカ間のclockを比較して、順序をチェックできます。

match alices_copy.clock.partial_cmp(&bobs_copy.clock) {
    Some(Less) => { /* bob's clock is ahead */ }
    _ => panic!("Bob's clock should be ahead!!"),
}

Bobの方がclockを進めているので、std::cmp::OrderingのLessのが返ってくるはずです。

  1. Bobの方が先にあるので、Aliceの値を上書きする

alices_copy = bobs_copy.clone();

コンフリクトする場合

  1. Aliceのクロックを進める

alices_copy.clock.inc("ALICE".to_string())でAliceのclockを1進め、clock.applyします。

  1. Bobのクロックも進める

bobs_copy.clock.inc("BOB".to_string())でBobのclockを1進め、clock.applyします。

  1. BobのクロックとAliceのクロックの順序を比較する

互いに影響しないイベントを起こしており、順序にすることができないため、Noneが返ります。

match bobs_copy.clock.partial_cmp(&alices_copy.clock) {
    None => { /* these clocks are not ordered! */ }
    _ => panic!("These clocks are not ordered!"),
}

現実の状況としては、Aliceが行った変更をBobに共有しようとするも、Bobもパスワードを変更してしまっているよという状態です。

  1. そこで、Bobに対してAliceをマージしてしまいます。

bobs_copy.clock.merge(alices_copy.clock.clone());

    fn merge(&mut self, other: Self) {
        for dot in other.into_iter() {
            self.apply(dot);
        }
    }

merge、すべてのactorに対してapplyが走るような処理です。
これで、BobのクロックにAliceの履歴が加わった状態になります。

  1. BobのクロックとAliceのクロックの順序を比較して、順序が後の方に先の方を上書きする。

マージしたことで、Bobのクロックが進んでいるという結果が返ってきます。

状況としては、既にBobがパスワードをアップデートしてしまってるのですが、自身のパスワードを保持するか破棄するか選んでください、というような画面が現実では表示されてる感じですかね。

match alices_copy.clock.partial_cmp(&bobs_copy.clock) {
    Some(Less) => {
    // bob's clock is ahead
    alices_copy = bobs_copy.clone()
    }
    _ => panic!("Alice's clock should be behind!!"),
}

Yes/Noの判定はないですが、このExampleだとYesをした感じですね。
Aliceの値にBobの値が反映されて、このExampleは終わりです。

さて、

別のExampleもやってみよう

今度は、複数のデバイスからデータが操作されるExampleです。

use crdts::{CmRDT, CvRDT, Map, Orswot};

fn main() {
    let mut friend_map: Map<&str, Orswot<&str, u8>, u8> = Map::new();

    let read_ctx = friend_map.len(); // we read anything from the map to get a add context
    friend_map.apply(
        friend_map.update("bob", read_ctx.derive_add_ctx(1), |set, ctx| {
            set.add("janet", ctx)
        }),
    );

    let mut friend_map_on_2nd_device = friend_map.clone();

    // the map on the 2nd devices adds 'erik' to `bob`'s friends
    friend_map_on_2nd_device.apply(friend_map_on_2nd_device.update(
        "bob",
        friend_map_on_2nd_device.len().derive_add_ctx(2),
        |set, c| set.add("erik", c),
    ));

    // Meanwhile, on the first device we remove
    // the entire 'bob' entry from the friend map.
    friend_map.apply(friend_map.rm("bob", friend_map.get(&"bob").derive_rm_ctx()));

    assert!(friend_map.get(&"bob").val.is_none());

    // once these two devices synchronize...
    let friend_map_snapshot = friend_map.clone();
    let friend_map_on_2nd_device_snapshot = friend_map_on_2nd_device.clone();

    friend_map.merge(friend_map_on_2nd_device_snapshot);
    friend_map_on_2nd_device.merge(friend_map_snapshot);
    assert_eq!(friend_map, friend_map_on_2nd_device);

    // ... we see that "bob" is present but only
    // contains `erik`.
    //
    // This is because the `erik` entry was not
    // seen by the first device when it deleted
    // the entry.
    let bobs_friends = friend_map
        .get(&"bob")
        .val
        .map(|set| set.read().val)
        .map(|hashmap| hashmap.into_iter().collect::<Vec<_>>());

    assert_eq!(bobs_friends, Some(vec!["erik"]));
}

パスワードのExampleと異なり、strのキー、OrswotをValueに持つMapにデータをいれて、コンフリクトを解決するようにしているExampleですね。

ORSWOT

CRDT、上でいくつか紹介しましたが、まだまだあります。
そのの一つがORSWOTです。

ORSWOTを説明するには、いくつかのCRDTをまた説明する必要があります。

LWW-Element-Set (Last-Write-Wins-Element-Set)とは

LWW-Element-Setは追加のSetと削除のSetがあり、2P-Setとよく似ています。

ただ、何が違うかというと、更新した際に一緒に保存されるタイムスタンプを使って、追加の方にある要素がトゥームストーンの要素より後のタイムスタンプだった場合は、削除扱いしないという、後勝ち(Last-Write-Wins)になっているのが、2P-Setとは異なる点です。

タイムスタンプが同じだったらどうなるかというと、LWW-Element-Setでは、AddかRemoveのどちらかにバイアスをかけることができ、どちらかが勝つようになっています。

OR-Set (Observed-Remove Set)とは

OR-Setは、追加および削除ができるORDTで、LWW-Element-Setとそっくりなのですが、タイムスタンプではなく、タグを用いている点が異なっています。

OR-Setでは、Setの要素毎に、Addタグのリスト(集合)とRemoveタグのリスト(集合)が保存されます。
適当に書いてみるなら、{{ value: 1 add: {α1, β2}, remove: {}}, { value: 2 add: {β1}, remove: {α1}}}的な感じですかね。

Addリストの中のタグが全て、Removeリスト(トゥームストーン)に入ることで、要素が削除されるという仕組みになっています。
つまり、Addの方が優先されるので、Add-Winsといいう位置付けになります。

ちなみにマージは、各要素の各リストの和集合が作られるような動きをします。

ORSWOT(Observe Remove Set Without Tombstones)とは

OR-Setでは、トゥームストーンがどこまでも増える可能性があります。まさしく、Vevtor Clockの説明で書いた内容です。
なので、タグを集合ではなく、ベクトルにすることでそれを解決したのがORSWOTです。

そして、今回のExampleで使われているのはその、ORSWORTです。

このクレートの中では、ORSWORTが、こんな感じで定義されています。

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Orswot<M: Hash + Eq, A: Ord + Hash> {
    pub(crate) clock: VClock<A>,
    pub(crate) entries: HashMap<M, VClock<A>>,
    pub(crate) deferred: HashMap<VClock<A>, HashSet<M>>,
}

clockは、いわば各デバイスの変更を管理するclockと言えばいいでしょうか。
既に反映させたかさせてないかを判断してるフィールドですね。

entriesが、実際の状態情報を持っているフィールドで、VClockを使うことで、キーになっている要素のAddとRemoveを管理しています。

実際にExampleやりながら、中身見ていきましょう。

  1. デバイスを用意
    デバイス1とデバイス2のそれぞれのCRDTを用意します。
// device1
let mut friend_map: Map<&str, Orswot<&str, u8>, u8> = Map::new();

// device2
let mut friend_map_on_2nd_device = friend_map.clone();

ここで、crdtクレートのMapというのがでてきてますが、中身はどうなっているのでしょうか?

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Map<K: Ord, V: Val<A>, A: Ord + Hash> {
    // This clock stores the current version of the Map, it should
    // be greator or equal to all Entry.clock's in the Map.
    clock: VClock<A>,
    entries: BTreeMap<K, Entry<V, A>>,
    deferred: HashMap<VClock<A>, BTreeSet<K>>,
}

fieldはとてもOrswotと似ていますが、Orswotが状態をSetで持ち、こっちが状態をMapで持つという感じですかね。
Orswotはentiryとして、KeyとVClockのMapが使われていたのですが、

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Entry<V: Val<A>, A: Ord> {
    // The entry clock tells us which actors edited this entry.
    clock: VClock<A>,

    // The nested CRDT
    val: V,
}

MapはKeyとValue(CRDT) + VClockのMapが使われている構造です。

つまり、このExampleではKVの共同編集をするため、KeyとOrswotをValueとしたMapのCRDTをつかっていくというExampleですね。

Nestしないようなシンプルなデータであれば、だいたいがこのExampleを元にして実装できるのではないでしょうか。

  1. デバイス1でBobに友人Janetを追加

let read_ctx = friend_map.len(); でcontextを取得しています。

で、このコンテキストが何者かというと、

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadCtx<V, A: Ord> {
    /// clock used to derive an AddCtx
    pub add_clock: VClock<A>,

    /// clock used to derive an RmCtx
    pub rm_clock: VClock<A>,

    /// the data read from the CRDT
    pub val: V,
}

こんな感じで、Addのclock、Removeのclock、valueを持つ構造となっています。
len()では、CRDT(ここだとMap)のclockが、Add/Removeのclockに代入され、entriesの長さがvalueに代入されます。

    friend_map.apply(
        friend_map.update("bob", read_ctx.derive_add_ctx(1), |set, ctx| {
            set.add("janet", ctx)
        }),
    );

derive_add_ctx(1)の引数はactor、つまりデバイスの識別子を入れています。
そうすると、clockを進めたAddのためのcontext(AddCtx)が返ります。

デバイス1のMapのupdateメソッドは、Keyと、AddCtxと、第3引数としてValue(Orswot)とAddCtxを引数とし、Op(操作)を返すクロージャとなっており、上の例だと、"bob"をKeyとして、"janet"をAddする操作の更新を行うような記述になっています。(Op::Upが返ります。)

    fn apply(&mut self, op: Self::Op) {
        match op {
            Op::Rm { clock, keyset } => self.apply_keyset_rm(keyset, clock),
            Op::Up { dot, key, op } => {
                if self.clock.get(&dot.actor) >= dot.counter {
                    // we've seen this op already
                    return;
                }

                let entry = self.entries.entry(key).or_default();

                entry.clock.apply(dot.clone());
                entry.val.apply(op);

                self.clock.apply(dot);
                self.apply_deferred();
            }
        }
    }

applyは、OpがUpなのかRemoveなのかでその処理を変えてます。
Upの場合は、actor(この場合はデバイス1)の最新の変更か判定し、value(Orswot)へのapplyを実施する。

    fn apply(&mut self, op: Self::Op) {
        match op {
            Op::Add { dot, members } => {
                if self.clock.get(&dot.actor) >= dot.counter {
                    // we've already seen this op
                    return;
                }

                for member in members {
                    let member_vclock = self.entries.entry(member).or_default();
                    member_vclock.apply(dot.clone());
                }

                self.clock.apply(dot);
                self.apply_deferred();
            }
            Op::Rm { clock, members } => {
                self.apply_rm(members.into_iter().collect(), clock);
            }
        }
    }

Orswot側のapplyでは、各member(この場合は"janet"の文字列のみ)に対して、操作のclockがapplyされる。こうすることで、MapのKey"bob"の、Valueに"janet"がAddされたことになる。

  1. デバイス2で友人Erikの追加
    friend_map_on_2nd_device.apply(friend_map_on_2nd_device.update(
        "bob",
        friend_map_on_2nd_device.len().derive_add_ctx(2),
        |set, c| set.add("erik", c),
    ));
  1. デバイス1でBobを消す
    friend_map.apply(friend_map.rm("bob", friend_map.get(&"bob").derive_rm_ctx()));

friend_map.get(&"bob")で"bob"のまたcontextが得られるのですが、このgetの場合は、Removeのclockには、value(Orswot)のclockが入ったcontextが返ります。

derive_rm_ctx()で、Removeのclockが入った削除のcontext(RmCtx)が返ります。

それが先ほどのapplyのmatchの中で、

            Op::Rm { clock, keyset } => self.apply_keyset_rm(keyset, clock),

BobのOrswortのclockの各デバイスのカウンタと、Opのclockを比較してOpが先であれば、そのデバイスをremoveし、最後clockになんのデバイスも残ってなさそうなら、キーを削除する、というロジックが動いています。

fn apply_keyset_rm(&mut self, mut keyset: BTreeSet<K>, clock: VClock<A>) {
        for key in keyset.iter() {
            if let Some(entry) = self.entries.get_mut(&key) {
                entry.clock.reset_remove(&clock);
                if entry.clock.is_empty() {
                    // The entry clock says we have no info on this entry.
                    // So remove the entry
                    self.entries.remove(&key);
                } else {
                    // The entry clock is not empty so this means we still
                    // have some information on this entry, keep it.
                    entry.val.reset_remove(&clock);
                }
            }
        }

        // now we need to decide wether we should be keeping this
        // remove Op around to remove entries we haven't seen yet.
        match self.clock.partial_cmp(&clock) {
            None | Some(Ordering::Less) => {
                // this remove clock has information we don't have,
                // we need to log this in our deferred remove map, so
                // that we can delete keys that we haven't seen yet but
                // have been seen by this clock
                let deferred_set = self.deferred.entry(clock).or_default();
                deferred_set.append(&mut keyset);
            }
            _ => { /* we've seen all keys this clock has seen */ }
        }
    }

ただ、このMapのclockよりOpのclockが後になっている、つまり、Opが遅れて届いてしまったケースなんですが、その場合は、まだ見ぬOpがあることを懸念して遅延用のsetに値を保留させ、mergeの時に使われます。
今回のExampleでは通りませんが。

  1. 2つのデバイスのMapをマージ
    friend_map.merge(friend_map_on_2nd_device_snapshot);

Mapのマージはちなみにこんな感じになっている。

    fn merge(&mut self, other: Self) {
        self.entries = mem::take(&mut self.entries)
            .into_iter()
            .filter_map(|(key, mut entry)| {
                if !other.entries.contains_key(&key) {
                    // other doesn't contain this entry because it:
                    //  1. has seen it and dropped it
                    //  2. hasn't seen it
                    if other.clock >= entry.clock {
                        // other has seen this entry and dropped it
                        None
                    } else {
                        // the other map has not seen this version of this
                        // entry, so add it. But first, we have to remove any
                        // information that may have been known at some point
                        // by the other map about this key and was removed.
                        entry.clock.reset_remove(&other.clock);
                        let mut removed_information = other.clock.clone();
                        removed_information.reset_remove(&entry.clock);
                        entry.val.reset_remove(&removed_information);
                        Some((key, entry))
                    }
                } else {
                    Some((key, entry))
                }
            })
            .collect();

        for (key, mut entry) in other.entries {
            if let Some(our_entry) = self.entries.get_mut(&key) {
                // SUBTLE: this entry is present in both maps, BUT that doesn't mean we
                // shouldn't drop it!
                // Perfectly possible that an item in both sets should be dropped
                let mut common = VClock::intersection(&entry.clock, &our_entry.clock);
                common.merge(entry.clock.clone_without(&self.clock));
                common.merge(our_entry.clock.clone_without(&other.clock));
                if common.is_empty() {
                    // both maps had seen each others entry and removed them
                    self.entries.remove(&key).unwrap();
                } else {
                    // we should not drop, as there is information still tracked in
                    // the common clock.
                    our_entry.val.merge(entry.val);

                    let mut information_that_was_deleted = entry.clock.clone();
                    information_that_was_deleted.merge(our_entry.clock.clone());
                    information_that_was_deleted.reset_remove(&common);
                    our_entry.val.reset_remove(&information_that_was_deleted);
                    our_entry.clock = common;
                }
            } else {
                // we don't have this entry, is it because we:
                //  1. have seen it and dropped it
                //  2. have not seen it
                if self.clock >= entry.clock {
                    // We've seen this entry and dropped it, we won't add it back
                } else {
                    // We have not seen this version of this entry, so we add it.
                    // but first, we have to remove the information on this entry
                    // that we have seen and deleted
                    entry.clock.reset_remove(&self.clock);

                    let mut information_we_deleted = self.clock.clone();
                    information_we_deleted.reset_remove(&entry.clock);
                    entry.val.reset_remove(&information_we_deleted);
                    self.entries.insert(key, entry);
                }
            }
        }

        // merge deferred removals
        for (rm_clock, keys) in other.deferred {
            self.apply_keyset_rm(keys, rm_clock);
        }

        self.clock.merge(other.clock);

        self.apply_deferred();
    }
}
  1. Bobは消えず、Bobの友達としてErikのみ残る
    let bobs_friends = friend_map
        .get(&"bob")
        .val
        .map(|set| set.read().val)
        .map(|hashmap| hashmap.into_iter().collect::<Vec<_>>());

    assert_eq!(bobs_friends, Some(vec!["erik"]));

というような感じみたいです。

何か作ってみる。

正直、これをWebのサービスと提供するにはどういう構成にすればいいのか…そのスタンダートはわからないのですが、今回、Figmaを参考にこんな感じでつくってみようかと思います。

clientとserverのやりとりはCBORで行ってまみようかと。

と思ったのですが、記事が大分長くなってきたので、作った話はまた後日。

長文失礼しました。

Discussion