Chapter 08

フレーミング

magurotuna
magurotuna
2021.03.03に更新

原文: https://tokio.rs/tokio/tutorial/framing

さて、前の章で学んだ I/O の知識を活かして、Mini-Redis のフレーミング層を実装していきましょう。フレーミングとは、バイトストリームを受け取ってそれをフレームのストリームへと変換することです。フレームは2つのピアの間で伝達されるデータの単位を意味します。Redis プロトコルのフレームは以下のようになっています:

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

ここで、フレームはいかなる意味をも持たないデータだけで構成されているということに注意してください。コマンドの解析および実装はもっと高いレイヤーで行われます。

仮に HTTP に対して フレームを定義してみるとすると、以下のような見た目になるでしょう:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

Mini-Redis にフレーミングを実装するため、TcpStream をラップした構造体を作り、mini_redis::Frame の値を読み書きすることができるようにしましょう。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... 他のフィールドを書く
}

impl Connection {
    /// コネクションからフレームを読み取る
    ///
    /// EOF に到達したら `None` を返す
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // ここに実装を書く
    }

    /// コネクションにフレームを書き込む
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // ここに実装を書く
    }
}

Redis のワイヤープロトコルの詳細は こちら を参照してください。また、Connection に関してのコードの全体は こちら にあります。

バッファされた read

read_frame メソッドは、フレーム全体を受け取るまで待機してから値を返します。なお、TcpStream::read() の1回の呼び出しは任意の大きさのデータを返してよいことになっています。フレーム全体を含むかもしれないし、一部かもしれないし、複数フレームかもしれません。部分的なフレームだった場合、そのデータはバッファに蓄えられ、ソケットからさらにデータが読み取られます。複数のフレームだった場合、1つ目のフレームが return され、残りのデータは read_frame がもう一度呼び出されるまでバッファに蓄えられることになります。

これを実装するため、Connection 構造体に read 用のバッファフィールドを用意する必要があります。データはソケットから読み取られ、read バッファへ蓄えられます。フレームがパースされたら、対応するデータがバッファから削除されます。

BytesMut 型をバッファとして使うことにしましょう。これは Bytes の可変バージョンです。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // 4KB のキャパシティをもつバッファを確保する
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

続いて、read_frame() メソッドを実装します。

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // バッファされたデータからフレームをパースすることを試みる
        // 十分な量のデータがバッファに蓄えられていたら、ここでフレームを return する
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // バッファデータが足りなかった場合
        // ソケットからさらにデータを読み取ることを試みる
        //
        // 成功した場合、バイト数が返ってくる
        // `0` は "ストリームの終わり" を意味する
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // 相手側がコネクションを閉じた。
            // きれいにシャットダウンするため、read バッファのデータが空になる
            // ようにしなければならない。もしデータが残っているなら、それは
            // 相手がフレームを送信している途中でソケットを閉じたということを意味する
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

細かく説明していきます。read_frame メソッドはループ内で処理を行います。まず self.parse_frame() が呼び出されます。これは self.buffer から Redis フレームをパースすることを試みます。フレームをパースするのに十分なデータがあるなら、その場で read_frame() の呼び出し側にフレームを返却します。十分なデータがなければ、データをさらにソケットから読み取ってバッファへと入れることを試みます。追加データを読み取ったあと、parse_frame() を再度呼び出します。このときに、バッファに十分なデータがあれば、パースは成功するでしょう。

ストリームから読み取る際に 0 が返ってきたら、それは相手側からそれ以上のデータが送られてくることはない、ということを示します。このときもし read バッファがまだデータを保持しているならば、それはフレームが部分的に送られてきて、途中で突然コネクションが断ち切られてしまったということを意味します。これはエラーなので、Err を返します。

Buf トレイト

ストリームを読み込むとき、read_buf が呼び出されます。この read 関数は、引数として bytes クレートの BufMut トレイトを実装している値をとります。

まず read() を使って同じような read ループを実装してみたらどうなるか、というのを考えてみましょう。BytesMut の代わりに Vec<u8> を利用することができます。

use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // 4KB のキャパシティをもつバッファを確保する
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

そして、Connectionread_frame メソッドは以下のようになります:

use mini_redis::{Frame, Result};

impl Connection {
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        loop {
            if let Some(frame) = self.parse_frame()? {
                return Ok(Some(frame));
            }

            // バッファが十分なキャパシティを必ずもつように調整する
            if self.buffer.len() == self.cursor {
                // Grow the buffer
                self.buffer.resize(self.cursor * 2, 0);
            }

            // 何バイト読んだかを記録しながらバッファに読み取っていく
            let n = self.stream.read(
                &mut self.buffer[self.cursor..]).await?;

            if 0 == n {
                if self.cursor == 0 {
                    return Ok(None);
                } else {
                    return Err("connection reset by peer".into());
                }
            } else {
                // カーソルを更新する
                self.cursor += n;
            }
        }
    }

    // ... 他のメソッド
}

バイト配列と read を使って実装する場合、どれだけのデータがバッファされたのかを追跡するためのカーソルを用意する必要があります。read() 関数に対して、バッファのまだ値が埋まっていない部分を渡さなければなりません。さもなくば、バッファに蓄えられたデータを上書きしてしまうことになります。バッファが満杯になったら、読み取りを続けるためにバッファを伸長させなければなりません。上の例には含まれていませんが、parse_frame() メソッドを作る際は、self.buffer[..self.cursor] に含まれるデータをパースすることになるでしょう。

バイト配列とカーソルをいっしょに取り扱うことはとても一般的なので、bytes クレートはそれらを抽象化した機能を提供しています。Buf トレイトは、データの読み取り元となるような型に対して実装されます。一方、BufMut トレイトは、データの書き込み先となるような型に実装されます。BufMut であるような T 型を read_buf() 関数に渡すと、バッファの内部的なカーソルが read_buf によって自動的に更新されます。このため、我々が書く read_frame メソッドでは、自分たちでカーソルを管理する必要はありません。

さらに、Vec<u8> を使う場合、バッファは必ず 初期化 されていなければなりません。vec![0; 4096] は4096バイトの配列を確保し、それらを0で埋めます。バッファの大きさを拡張するときには、拡張された分の領域も0で初期化されます。この初期化のコストはタダではありません。しかし、BytesMut および BufMut を使うと、バッファのキャパシティは 未初期化 になります。BytesMut がもたらす抽象化によって、未初期化のメモリを不注意で読み取ってしまうということが防止されるのです。したがって、初期化のステップを経る必要がなくなります。

パースする

さて、parse_frame() 関数を見ていきましょう。パースは以下の2ステップで行われます。

  1. フレームの全体がバッファにあることを確かめて、フレームの終了インデックスを見つける
  2. フレームをパースする

mini-redis クレートはこれらのステップのための関数を提供しています:

  1. Frame::check
  2. Frame::parse

また、Buf がもたらす抽象化も再利用しましょう。BufFrame::check に渡します。check 関数は渡されたバッファをイテレートするので、内部カーソルが進んでいきます。check が return するとき、内部カーソルはフレームの終了地点を指すことになります。

Buf 型として、std::io::Cursor<&[u8]> を使います。

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // `Buf` 型を作る
    let mut buf = Cursor::new(&self.buffer[..]);

    // フレーム全体が取得可能かどうかチェックする
    match Frame::check(&mut buf) {
        Ok(_) => {
            // フレームのバイト長を取得
            let len = buf.position() as usize;

            // `parse` を呼び出すため、内部カーソルをリセットする
            buf.set_position(0);

            // フレームをパースする
            let frame = Frame::parse(&mut buf)?;

            // バッファからフレーム分を読み捨てる
            self.buffer.advance(len);

            // 呼び出し側にフレームを返す
            Ok(Some(frame))
        }
        // 十分な量のデータがバッファされていなかった場合
        Err(Incomplete) => Ok(None),
        // エラーが発生した場合
        Err(e) => Err(e.into()),
    }
}

Frame::check 関数の全体は こちら で確認できます。ここではその全体については解説しません。

注目すべき関連事項としては、Buf トレイトの「バイトイテレータ」スタイルの API が利用されている、ということです。これらの API はデータを取得し、内部カーソルを進めます。例えば、フレームをパースする際、フレームのタイプを決定するために最初のバイトがチェックされます。ここで使用される関数は Buf::get_u8 です。これは現在のカーソル位置のバイトを取得し、1つカーソルを進めます。

Buf トレイトには他にも便利なメソッドが数多くあります。詳細は、API ドキュメント を参照してください。

バッファされた write

フレーミング API のもう半分は、write_frame(frame) 関数です。この関数はフレーム全体をソケットへと書き込みます。write システムコールの呼び出しを少なくするために、書き込みをバッファ化します。write バッファを用意して、フレームはソケットに書き込まれるより前にエンコードされた状態でバッファへと書き込まれます。しかし、read_frame() とは違い、ソケットへの書き込み前にすべてのフレームがバイト列としてバッファ化されるとは限りません。

"bulk" 型のストリームフレームを考えてみましょう。書き込まれる値は Frame::Bulk(Bytes) です。"bulk" フレームのワイヤーフォーマットはフレームヘッドとなっています。フレームヘッドには、$ という文字と、それに続いてバイト数の情報が含まれています。フレームの大半はBytes の値からなるコンテンツです。データが大きい場合、それを中間バッファへとコピーすることにはコストがかかるでしょう。

バッファされた write を実装するため、BufWrite 構造体 を利用します。この構造体は AsyncWrite トレイトを実装する型 T によって初期化され、BufWriter 自身も ASsyncWrite を実装しています。BufWriter に対して write が呼び出されると、内部の writer へと直接書き込むのではなく、バッファへと書き込みを行います。バッファがいっぱいになったら、コンテンツは内部の writer へと「流され」[1]、内部バッファのデータは消去されます。特定のケースにおいて、バッファをバイパスすることを可能にする最適化も存在しています。

このチュートリアルでは write_frame() の完全な実装は行いません。実装の全体は こちら を確認してください。

まず、Connection 構造体を以下のように更新します:

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

次に write_frame() を実装します。

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

ここで利用している関数は AsyncWriteExt トレイトが提供しているものです。これらは TcpStream でも利用可能ですが、中間バッファなしに単一のバイト書き込みを行うのは賢明ではありません。

  • write_u8 writer へと単一のバイトを書き込む
  • write_all writer へとスライスの全体を書き込む
  • write_decimal は mini-redis により実装されている

write_frame() メソッドは、最後に self.stream.flush().await を呼び出しています。BufWriter は中間バッファに書き込みを蓄えるので、write を呼び出してもデータがソケットへと書き込まれることは保証されていません。return する前にフレームがソケットへと書き込まれてほしいので、flush() を呼んでいます。flush() を呼ぶことで、バッファの中で保留状態となっているデータがすべてソケットへと書き込まれます。

write_frame() 内で flush()呼び出さない という代替案があります。代わりに、flush() 関数を Connection のメソッドとして提供するのです。こうすることで、呼び出し側が write バッファへと複数の小さいフレームを書き込んで、それからまとめてソケットへと書き込む、といったことができるようになります。こうすると write システムコールが1回で済みます。しかし、このような実装は Connection API を複雑化させてしまいます。 Mini-Redis の目標の1つに「シンプルさ」というのがありますから、ここでは fn write_frame() の中で flush().await を呼び出すという実装にすることにしました。

脚注
  1. 訳注: 原文では "flush" という単語が使われています。バッファの文脈で "flush" という単語にはよく出会いますが、適切な日本語訳が思いつかなかったため、カギカッコ付きの「流す」という表現を用いました。 ↩︎