Open4

『Distributed Services with Go』をRustでやっていく

koji_matskoji_mats

https://pragprog.com/titles/tjgo/distributed-services-with-go/
『Distributed Services with Go』は分散システム実装の入門書です。JSON/HTTP APIエンドポイントの作成から始まり、APIエンドポイントのprotobuf/gRPC化やTLS対応、そして非集中型サービスディスカバリ・オーケストレーションの実装であるSerfや分散合意アルゴリズムRaftなどを扱っています。『データ指向アプリケーションデザイン』で原理はなんとなくわかったけど分散システムを具体的にどう実装したら良いかわからないという人を対象にしているらしいです。(自分はこの書籍はまだほとんど読んでないです)
この書籍では全てGoで実装していますが、理解度を確認しながらやっていきたいのでRustで実装していきます。

koji_matskoji_mats

1. Let's Go

Kubernetesなどで実行する分散システムの実装を学ぶ。その最初の一歩としてcommit log JSON over HTTP serviceというシンプルなシステムを作りながら分散システムの実装を体験します。

APIのエンドポイントとしてはprotobuf/gRPCの方が効率が良いが、実装が簡単なJSON/HTTPサービスから始めていきます。書籍で使われているコードは以下リポジトリにあります。

http://github.com/travisjeffery/proglog

RustのHTTPサーバを選択

書籍ではGo標準のnet/httpを使用しています。これに合わせてRustで軽量のHTTPサーバを探すと以下クレートが候補に挙がりました。

HTTPSとHTTP/2サポートしていてGitHub star数の多いactix-webを使うことにしました。

この章のコード

https://github.com/koji-m/distributed_service_with_rust/tree/master/lets_rust

commit logのデータ構造

commit logは追記のみの時間順に整列されたデータ構造で、HTTPサーバのメモリ上でグローバルに共有されます。そのため、ログデータと一緒にMutexオブジェクトも持っています。

type Record struct {
	Value  []byte `json:"value"`
	Offset uint64 `json:"offset"`
}

type Log struct {
	mu      sync.Mutex
	records []Record
}

actix-webではサーバのメモリ上でグローバルなデータを共有したい場合、App::app_data()メソッドでサーバにセットします。上記のLog構造体と同等のデータ構造をRustでは以下のように実装しました。

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
    value: String,
    offset: usize,
}

pub type Log = Mutex<Vec<Record>>;

RustのMutexは排他制御対象のデータを内包するのでMutexLogとして定義しました。そして、Loggerというトレイトで拡張します。(MutexLoggerトレイトを実装する)

pub trait Logger {
    fn create_empty() -> Log;
    fn append(&self, record: Record) -> usize;
    fn read(&self, offset: usize) -> Result<Record, String>;
}

actix-webのJSONの扱い

Go標準のHTTPサーバを使った実装では、明示的にJSONをデコードして扱います。

func (s *httpServer) handleProduce(w http.ResponseWriter, r *http.Request) {
	var req ProduceRequest
	err := json.NewDecoder(r.Body).Decode(&req)
  ...

Rustのactix-webでは、引数のリクエストデータの型をweb::Json<Record>に指定すれば関数内でreq.0を参照するだけでRecord構造体のデータを簡単に取り出せます。

#[post("/")]
async fn produce(log: web::Data<Log>, req: web::Json<Record>) -> HttpResponse {
    ...

1章終わり。

koji_matskoji_mats

2. Structure Data with Protocol Buffers

この章から、JSON over HTTPからprotobuf over gRPCへ変更していきます。

JSONに対するProtocol Buffersの利点は...

  • 型安全の保証
  • データのスキーマ違反防止
  • 高速なシリアライズ
  • データの後方互換性の提供

protobufをシリアライズ/デシリアライズするコードを生成するには、protobufの定義ファイル.protoをprotocコマンドでコンパイルします。この章ではRecord構造体に対応するデータをシリアライズ/デシリアライズするprotobufのコードを生成します。

syntax = "proto3";

package log.v1;

message Record {
  bytes value = 1;
  uint64 offset = 2;
}

現在protocコマンドで生成できるコードの言語としてGoやJava、Pythonなどをサポートしていますが、Rustはサポートしていません。Rustではprotoc-rustクレートを使ってprotobufのコードを生成することにします。

extern crate protoc_rust;

fn main() {
    protoc_rust::Codegen::new()
        .out_dir("src/api/v1")
        .inputs(&["protos/log.proto"])
        .include("protos")
        .run()
        .expect("protoc");
}

Cargoでのビルド時に自動でprotobufのコードを生成するために、上記コードをパッケージルートにbuild.rsとして保存します。そして、Cargo.tomlにビルド時の依存クレートとしてprotoc-rustを追加しておきます。

[build-dependencies]
protoc-rust = "2.0"

コード生成にはprotocコマンドが必要なのでインストールしておきます。

以上でprotobufコードの自動生成ができるようになりました。cargo buildを実行するとsrc/api/v1配下にlog.rsというprotobufのコードが生成されます。

この章のコード

https://github.com/koji-m/distributed_service_with_rust/tree/master/structure_data_with_protobuf

2章終わり。

koji_matskoji_mats

3. Write a Log Package

ログの構造体とメソッドを定義していきます。ログを構成するデータ構造全体を図に纏めます。

ログデータは、2.で定義したRecord構造体をprotobufでシリアライズしたものをビッグエンディアンで書き込みます。
Storeは実際のログデータが記録されるファイルを管理する構造体です。効率を考慮してファイルへのログデータの書き込みはバッファを介して行います。一方でファイルの任意の場所のデータの読み出しも必要になるので、Storeには読み込み用のファイルと、同じファイルを参照する書き込み用バッファを持たせます。

pub struct StoreInner {
    file: File,
    buf: BufWriter<File>,
    path: Box<Path>,
    pub size: u64,
}

pub type Store = Mutex<StoreInner>;

ログデータの読み込み時に書き込み用バッファをflushします。

fn append(&mut self, p: Vec<u8>) -> std::io::Result<(u64, u64)> {
    let mut st = self.lock().unwrap();
    let pos = st.size;

    st.buf.write(&p.len().to_be_bytes())?;
    let w = st.buf.write(&p)?;
    ...

fn read_at(&self, buf: &mut Vec<u8>, offset: u64) {
    let mut st = self.lock().unwrap();
    st.buf.flush().unwrap();

    st.file.seek(SeekFrom::Start(offset)).unwrap();
    st.file.read_exact(buf).unwrap();
}

Indexは、ログのインデックスとログが書き込まれている場所(ファイル先頭からのバイト数)のマッピングを管理する構造体です。N番目のログは、IndexのN番目のエントリに記録されている場所情報を参照することで実際のログデータに効率良くアクセスできます。Indexの1エントリは4バイトのオフセット情報と8バイトの場所情報で構成される小さなデータなのでメモリ上に展開されます。ただし、Indexも永続化する必要があるのでmmapを使ってファイルへ同期します。Rustでmmapを実現するクレートにmemmapがありますが3年程更新が止まっているので、これをForkしたmemmap2を使うことにしました。
https://crates.io/crates/memmap2

pub struct Index {
    file: File,
    path: Box<Path>,
    mmap: MmapMut,
    pub size: usize,
}

StoreとIndexを一纏めにするのがSegmentです。ディスクスペースの都合上古いログは削除したりアーカイブしたりする必要があり、その際にログの追記とコンフリクトしないようにログデータをセグメントで分けて保存します。各Segmentは管理しているログのインデックスレンジも持たせる必要があります。

pub struct Segment<'a> {
    store: Store,
    index: Index,
    pub base_offset: u64,
    pub next_offset: u64,
    config: &'a Config,
}

最後にSegmentを管理するLog構造体を定義します。
ログを構成する全てのSegmentだけでなく、現在データを追記する対象(active segment)のSegmentを持たせます。そのため、SegmentはSegmentのリストとactive segmentの両方から参照され かつ変更もされるのでRefCellによる内部可変性パターンとスマートポインタRcを使います。

pub struct LogInner<'a> {
    dir: String,
    config: &'a Config,
    pub active_segment: Rc<RefCell<Segment<'a>>>,
    pub segments: Vec<Rc<RefCell<Segment<'a>>>>,
}

type Log<'a> = Mutex<LogInner<'a>>;

この章のコード

https://github.com/koji-m/distributed_service_with_rust/tree/master/write_a_log_package

3章終わり。