原文: https://tokio.rs/tokio/tutorial/io

Tokio の I/O は std とほとんど同じように動作しますが、非同期であるという点が異なります。読み込みのためのトレイト (AsyncRead) と、書き込みのためのトレイト (AsyncWrite) があります。具体的な型がこれらのトレイトを必要に応じて実装しています(TcpStreamFileStdout など)。AsyncReadAsyncWrite は多くのデータ構造(例えば Vec<u8>&[u8] )に対しても実装されています。これによって、reader あるいは writer が要求されているような場所でバイト配列を使うことができるのです。

このページでは、いくつかの例を見ながら Tokio による基本的な I/O、すなわち読み込みと書き込みを解説していきます。次のページでもっと発展的な I/O の事例に踏み込んでいきます。

AsyncReadAsyncWrite

これら2つのトレイトはバイトストリームへの読み書きを非同期に行う機能を提供しています。一般的には、これらのトレイトに備わっているメソッドが直接呼び出されることはありません(これは、Future トレイトの poll メソッドを手動で呼び出すことがほとんどないのと同様です)。代わりに、AsyncReadExtAsyncWriteExt が提供するユーティリティメソッドを通して利用することになるでしょう。

ざっとメソッドをいくつか見ていきましょう。これらの関数はすべて async であり、.await と一緒に利用する必要があります。

async fn read()

AsyncReadExt::read は、データをバッファへと読み込んで、何バイト読み込まれたのかを返す非同期メソッドです。

注意: read()Ok(0) を返してきたら、そのストリームが閉じられたということを意味します。それ以上 read() を実行しても、即座に Ok(0) が返ってくるだけです。例えば、TcpStream インスタンスにおいて Ok(0) が返ってきた場合、それが意味するところは、ソケットの読み込み側が閉じられた、ということになります。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // 最大10バイト読み込む
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end は、EOF[1] に至るまでストリームからすべてのバイトを読み込みます。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // ファイルをすべて読み込む
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write は writer にバッファを書き込み、何バイト書き込まれたかを返します。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // バイト文字列の先頭からいくつかを書き込む。
    // 必ずしもすべてを書き込むわけではないことに注意。
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

async fn write_all()

AsyncWriteExt::write_all は writer にバッファ全体を書き込みます。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut buffer = File::create("foo.txt").await?;

    buffer.write_all(b"some bytes").await?;
    Ok(())
}

これら2つのトレイトは、他にも数多くの便利なメソッドを提供しています。それらについては API ドキュメントを参照してください。

ヘルパー関数

さらに、std と同様に、tokio::io モジュールは 標準入力標準出力標準エラー出力 を利用するための API や、数々の便利関数を提供しています。例えば、tokio::io::copy を使うと、reader から writer へとすべてのデータを非同期的にコピーすることができます。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

この例では、バイト配列が AsyncRead トレイトを実装しているという事実が活用されていることに注意してください。

Echo サーバー

非同期 I/O を行う練習をしてみましょう。Echo サーバーを作っていこうと思います。

この Echo サーバーでは、TcpListener をバインドして、ループの中でインバウンドコネクションを受けつけます。1回1回のインバウンドコネクションに対して、ソケットからデータを読み込んで、すぐにソケットに書き込みます。クライアントがサーバーへとデータを送信すると、まったく同じデータが返ってきます。

このような Echo サーバーを、少し異なるやり方を使いながら2つ実装してみようと思います。

io::copy() を使う

まずは io::copy を使って echo のロジックを実装していきます。

これは TCP サーバーであって、受け付けループが必要となります。そして、受け付けられたソケット1つ1つに対して、それを処理するための新しいタスクを spawn します。

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // ここでデータをコピーする
        });
    }
}

以前見たように、io::copy 関数は reader と writer を受け取って、前者から後者へとデータをコピーします。しかし、今我々の手元にあるのは TcpStream 1つだけです。この1つの値が AsyncReadAsyncWrite両方 を実装しているのです。io::copy は reader と writer への &mut 参照を要求しますから、1つの TcpStream を2つの引数に当てはめることはできません。

// これはコンパイルが通らない
io::copy(&mut socket, &mut socket).await

reader と writer に分割する

この問題に対応するため、ソケットを reader ハンドルと writer ハンドルへと分割しなければなりません。reader / writer の組へと分割を行うためのベストな方法は、具体的な型によって変わります。

任意の reader + writer 型は、io::split を使うことで分割が可能です。この関数は1つの値を受け取り、分割された reader ハンドルと writer ハンドルを返します。これらの2つのハンドルは、独立して利用することができます。別々のタスクから利用することも問題ありません。

例えば、echo クライアントは並行な読み込みと書き込みを以下のように処理することができるでしょう。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // バックグラウンドでデータを書き込む
    let write_task = tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Rust の型推論器がちょっとした補助を必要とすることがたまにある
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}

io::splitAsyncRead + AsyncWrite を実装している 任意の 値に対応しており独立したハンドルを返却することができますが、これを実現するために内部では ArcMutex が使われています。これに伴うオーバーヘッドを、TcpStream の場合は回避することができます。TcpStream は2つの特別な分割関数を提供しているのです。

TcpStream::split は、ストリームへの 参照 を受け取って reader ハンドルと writer ハンドルを返します。渡すのは参照なので、返ってくる2つのハンドルは split() が呼ばれたタスクと 同じ タスクに留まらなければなりません。この split はゼロコストです。ArcMutex も必要としていません。また、TcpStreaminto_split という関数も提供しています。これを使うと、タスクをまたいでムーブさせることが可能なハンドルを生成することができます。ただし、Arc のコストは必要になります。

我々の echo サーバーのケースでは、io::copy()TcpStream を所有しているタスクと同じ場所で呼ばれるので、TcpStream::split を使うことができます。Echo ロジックを処理するタスクは以下のような実装になります:

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

io::copy を使うバージョンの全体のコードは こちら で確認できます。

手動でコピーする

さて、データを手動でコピーして echo サーバーを作るとするとどのようになるのかを見てみましょう。AsyncReadExt::readAsyncWriteExt::write_all を使います。

まず Echo サーバーの全体像をお見せします:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // `Ok(0)` が返ってきたらリモート側が閉じられたことを意味する
                    Ok(0) => return,
                    Ok(n) => {
                        // データをソケットへとコピーする
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // 予期しないソケットエラーが発生した場合。
                            // ここで何かできることはさほどないので、処理を停止する
                            return;
                        }
                    }
                    Err(_) => {
                        // 予期しないソケットエラーが発生した場合。
                        // ここで何かできることはさほどないので、処理を停止する
                        return;
                    }
                }
            }
        });
    }
}

細かく見ていきましょう。まず、AsyncReadAsyncWrite のユーティリティが利用されているので、拡張トレイトをスコープにもってくる必要があります。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

バッファをアロケートする

実装の方針としては、ソケットからバッファへとデータを読み込み、バッファの内容をソケットへと書き戻す、という感じになります。

let mut buf = vec![0; 1024];

ここで、バッファをスタック上に確保するのを明示的に避けています。以前の章 で、 .await をまたいで生存するようなすべてのタスクステートは、タスクが保持しておかなければならない、ということを説明しました。これはちょうど、それぞれのバリアントが「特定の .await 呼び出しのために保持されている必要のあるステート」となっている enum のようなもの、と考えると良いかもしれません。今回の我々のケースでは、buf.await をまたいで利用されることになります。Vec を使っているので、タスクステートは1回のアロケーションで保存されます。

もしバッファがスタック上の配列で表現されているとしたら、ソケットごとに spawn されるタスクの内部構造は、以下のような見た目になるでしょう。

struct Task {
    // タスクの内部フィールド
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

スタック配列がバッファとして使われている場合、それはタスク構造にインライン的に保持されることになり、タスク構造が肥大化することに繋がります。また、バッファのサイズはしばしばページサイズとなりますが、このことによって Task が厄介なサイズ―― ページサイズ + 数バイト をもつことになってしまいます。

コンパイラは、 async ブロックのレイアウトに関して、原始的な enum を用いた場合よりもずっと優れた最適化をしてくれます。なので、enum だとしたら必要となるような、バリアントからバリアントへの変数のムーブが、実際に行われることはありません。しかし、タスク構造体のサイズは、少なくとも、もっとも大きなサイズのバリアントと同じくらいの大きさになってしまいます。[2]

したがって、バッファ専用のアロケーションを行うほうが効率的になることが多いのです。

訳注

分かりにくい文章になってしまったと思うので補足させてください。
ここでは、バッファの確保を [u8; 1024] (スタック上に確保される配列型)で行うよりも Vec<u8> (ヒープ上に確保されるベクタ)で行ったほうが効率が良い、ということを説明しています。その理由として、以下の2つが挙げられています。

a) スタック配列だと .await をまたぐときに全体のムーブが必要になること(ベクタであればヒープへのポインタ + 補助的な僅かなデータをムーブするだけで済む)
b) スタック配列の場合、タスクを表す構造体がバッファデータをすべて含むことになるため、構造体サイズが大きくなってしまうこと(ベクタであればヒープへのポインタ + 補助的な僅かなデータ を含むだけでよく、軽量に保てる)

EOF を処理する

TCP ストリームの読み取り側がシャットダウンされたら、read() 呼び出しが Ok(0) を返すようになります。この時点で読み取りループを抜け出すことが重要です。EOF に到達したのにループから抜け出すのを忘れるというのは、よくあるバグの原因です。

loop {
    match socket.read(&mut buf).await {
        // `Ok(0)` が返ってきたらリモート側が閉じられたことを意味する
        Ok(0) => return,
        // ... その他の場合分けを書く
    }
}

ループから抜け出すのを忘れると、CPU が無限ループを起こして使用率 100% となるような状況になるのが一般的です。ソケットが閉じられているので、socket.read() がすぐに完了し、永遠にループが繰り返されることになるためです。

手動でコピーを行うバージョンの全体のコードは こちら で確認できます。

脚注
  1. 訳注: End Of File ↩︎

  2. 訳注: enum のメモリ上でのサイズに関しては、小さい整数の「タグ」と最大のバリアントのすべてのフィールドを保持するのに十分なサイズのメモリで構成される (ただし、将来ずっとこのような構成である保証はない) とされています (プログラミングRust より) ↩︎