原文: https://tokio.rs/tokio/tutorial/io
Tokio の I/O は std
とほとんど同じように動作しますが、非同期であるという点が異なります。読み込みのためのトレイト (AsyncRead
) と、書き込みのためのトレイト (AsyncWrite
) があります。具体的な型がこれらのトレイトを必要に応じて実装しています(TcpStream
、 File
、Stdout
など)。AsyncRead
と AsyncWrite
は多くのデータ構造(例えば Vec<u8>
や &[u8]
)に対しても実装されています。これによって、reader あるいは writer が要求されているような場所でバイト配列を使うことができるのです。
このページでは、いくつかの例を見ながら Tokio による基本的な I/O、すなわち読み込みと書き込みを解説していきます。次のページでもっと発展的な I/O の事例に踏み込んでいきます。
AsyncRead
と AsyncWrite
これら2つのトレイトはバイトストリームへの読み書きを非同期に行う機能を提供しています。一般的には、これらのトレイトに備わっているメソッドが直接呼び出されることはありません(これは、Future
トレイトの poll
メソッドを手動で呼び出すことがほとんどないのと同様です)。代わりに、AsyncReadExt
と AsyncWriteExt
が提供するユーティリティメソッドを通して利用することになるでしょう。
ざっとメソッドをいくつか見ていきましょう。これらの関数はすべて async
であり、.await
と一緒に利用する必要があります。
async fn read()
AsyncReadExt::read
は、データをバッファへと読み込んで、何バイト読み込まれたのかを返す非同期メソッドです。
注意: read()
が Ok(0)
を返してきたら、そのストリームが閉じられたということを意味します。それ以上 read()
を実行しても、即座に Ok(0)
が返ってくるだけです。例えば、TcpStream
インスタンスにおいて Ok(0)
が返ってきた場合、それが意味するところは、ソケットの読み込み側が閉じられた、ということになります。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// 最大10バイト読み込む
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end
は、EOF[1] に至るまでストリームからすべてのバイトを読み込みます。
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// ファイルをすべて読み込む
f.read_to_end(&mut buffer).await?;
Ok(())
}
async fn write()
AsyncWriteExt::write
は writer にバッファを書き込み、何バイト書き込まれたかを返します。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// バイト文字列の先頭からいくつかを書き込む。
// 必ずしもすべてを書き込むわけではないことに注意。
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
async fn write_all()
AsyncWriteExt::write_all
は writer にバッファ全体を書き込みます。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("foo.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}
これら2つのトレイトは、他にも数多くの便利なメソッドを提供しています。それらについては API ドキュメントを参照してください。
ヘルパー関数
さらに、std
と同様に、tokio::io
モジュールは 標準入力、標準出力、標準エラー出力 を利用するための API や、数々の便利関数を提供しています。例えば、tokio::io::copy
を使うと、reader から writer へとすべてのデータを非同期的にコピーすることができます。
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
この例では、バイト配列が AsyncRead
トレイトを実装しているという事実が活用されていることに注意してください。
Echo サーバー
非同期 I/O を行う練習をしてみましょう。Echo サーバーを作っていこうと思います。
この Echo サーバーでは、TcpListener
をバインドして、ループの中でインバウンドコネクションを受けつけます。1回1回のインバウンドコネクションに対して、ソケットからデータを読み込んで、すぐにソケットに書き込みます。クライアントがサーバーへとデータを送信すると、まったく同じデータが返ってきます。
このような Echo サーバーを、少し異なるやり方を使いながら2つ実装してみようと思います。
io::copy()
を使う
まずは io::copy
を使って echo のロジックを実装していきます。
これは TCP サーバーであって、受け付けループが必要となります。そして、受け付けられたソケット1つ1つに対して、それを処理するための新しいタスクを spawn します。
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// ここでデータをコピーする
});
}
}
以前見たように、io::copy
関数は reader と writer を受け取って、前者から後者へとデータをコピーします。しかし、今我々の手元にあるのは TcpStream
1つだけです。この1つの値が AsyncRead
と AsyncWrite
の 両方 を実装しているのです。io::copy
は reader と writer への &mut
参照を要求しますから、1つの TcpStream
を2つの引数に当てはめることはできません。
// これはコンパイルが通らない
io::copy(&mut socket, &mut socket).await
reader と writer に分割する
この問題に対応するため、ソケットを reader ハンドルと writer ハンドルへと分割しなければなりません。reader / writer の組へと分割を行うためのベストな方法は、具体的な型によって変わります。
任意の reader + writer 型は、io::split
を使うことで分割が可能です。この関数は1つの値を受け取り、分割された reader ハンドルと writer ハンドルを返します。これらの2つのハンドルは、独立して利用することができます。別々のタスクから利用することも問題ありません。
例えば、echo クライアントは並行な読み込みと書き込みを以下のように処理することができるでしょう。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// バックグラウンドでデータを書き込む
let write_task = tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Rust の型推論器がちょっとした補助を必要とすることがたまにある
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
io::split
は AsyncRead + AsyncWrite
を実装している 任意の 値に対応しており独立したハンドルを返却することができますが、これを実現するために内部では Arc
と Mutex
が使われています。これに伴うオーバーヘッドを、TcpStream
の場合は回避することができます。TcpStream
は2つの特別な分割関数を提供しているのです。
TcpStream::split
は、ストリームへの 参照 を受け取って reader ハンドルと writer ハンドルを返します。渡すのは参照なので、返ってくる2つのハンドルは split()
が呼ばれたタスクと 同じ タスクに留まらなければなりません。この split
はゼロコストです。Arc
も Mutex
も必要としていません。また、TcpStream
は into_split
という関数も提供しています。これを使うと、タスクをまたいでムーブさせることが可能なハンドルを生成することができます。ただし、Arc
のコストは必要になります。
我々の echo サーバーのケースでは、io::copy()
は TcpStream
を所有しているタスクと同じ場所で呼ばれるので、TcpStream::split
を使うことができます。Echo ロジックを処理するタスクは以下のような実装になります:
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
io::copy
を使うバージョンの全体のコードは こちら で確認できます。
手動でコピーする
さて、データを手動でコピーして echo サーバーを作るとするとどのようになるのかを見てみましょう。AsyncReadExt::read
と AsyncWriteExt::write_all
を使います。
まず Echo サーバーの全体像をお見せします:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// `Ok(0)` が返ってきたらリモート側が閉じられたことを意味する
Ok(0) => return,
Ok(n) => {
// データをソケットへとコピーする
if socket.write_all(&buf[..n]).await.is_err() {
// 予期しないソケットエラーが発生した場合。
// ここで何かできることはさほどないので、処理を停止する
return;
}
}
Err(_) => {
// 予期しないソケットエラーが発生した場合。
// ここで何かできることはさほどないので、処理を停止する
return;
}
}
}
});
}
}
細かく見ていきましょう。まず、AsyncRead
と AsyncWrite
のユーティリティが利用されているので、拡張トレイトをスコープにもってくる必要があります。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
バッファをアロケートする
実装の方針としては、ソケットからバッファへとデータを読み込み、バッファの内容をソケットへと書き戻す、という感じになります。
let mut buf = vec![0; 1024];
ここで、バッファをスタック上に確保するのを明示的に避けています。以前の章 で、 .await
をまたいで生存するようなすべてのタスクステートは、タスクが保持しておかなければならない、ということを説明しました。これはちょうど、それぞれのバリアントが「特定の .await
呼び出しのために保持されている必要のあるステート」となっている enum
のようなもの、と考えると良いかもしれません。今回の我々のケースでは、buf
が .await
をまたいで利用されることになります。Vec
を使っているので、タスクステートは1回のアロケーションで保存されます。
もしバッファがスタック上の配列で表現されているとしたら、ソケットごとに spawn されるタスクの内部構造は、以下のような見た目になるでしょう。
struct Task {
// タスクの内部フィールド
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
スタック配列がバッファとして使われている場合、それはタスク構造にインライン的に保持されることになり、タスク構造が肥大化することに繋がります。また、バッファのサイズはしばしばページサイズとなりますが、このことによって Task
が厄介なサイズ―― ページサイズ + 数バイト
をもつことになってしまいます。
コンパイラは、 async ブロックのレイアウトに関して、原始的な enum
を用いた場合よりもずっと優れた最適化をしてくれます。なので、enum
だとしたら必要となるような、バリアントからバリアントへの変数のムーブが、実際に行われることはありません。しかし、タスク構造体のサイズは、少なくとも、もっとも大きなサイズのバリアントと同じくらいの大きさになってしまいます。[2]
したがって、バッファ専用のアロケーションを行うほうが効率的になることが多いのです。
EOF を処理する
TCP ストリームの読み取り側がシャットダウンされたら、read()
呼び出しが Ok(0)
を返すようになります。この時点で読み取りループを抜け出すことが重要です。EOF に到達したのにループから抜け出すのを忘れるというのは、よくあるバグの原因です。
loop {
match socket.read(&mut buf).await {
// `Ok(0)` が返ってきたらリモート側が閉じられたことを意味する
Ok(0) => return,
// ... その他の場合分けを書く
}
}
ループから抜け出すのを忘れると、CPU が無限ループを起こして使用率 100% となるような状況になるのが一般的です。ソケットが閉じられているので、socket.read()
がすぐに完了し、永遠にループが繰り返されることになるためです。
手動でコピーを行うバージョンの全体のコードは こちら で確認できます。
-
訳注: End Of File ↩︎
-
訳注: enum のメモリ上でのサイズに関しては、小さい整数の「タグ」と最大のバリアントのすべてのフィールドを保持するのに十分なサイズのメモリで構成される (ただし、将来ずっとこのような構成である保証はない) とされています (プログラミングRust より) ↩︎