原文: https://tokio.rs/tokio/tutorial/spawning
ギアを上げて、Redis サーバーに取りかかりはじめましょう。
まず最初に、前の章で書いたクライアント側の SET
/ GET
のコードを、example ファイルに移動しましょう。こうすることで、私たちがこれから作るサーバーに対して、このコードを実行することができるようになります。
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
それから新しい空っぽのの src/main.rs
ファイルを作って、進めていきましょう。
ソケットを受けつける
私たちの Redis サーバーがまず最初にしなければならないのは、外から内への(インバウンドの)TCP ソケットを受けつけることです。これは、tokio::net::TcpListener
を使うことによって実現できます。
TcpListener
は 6379 番ポートにバインドされ、そしてループの中でソケットが受けつけられます。それぞれのソケットは処理されてからクローズされます。とりあえず、コマンドを読んで、それを標準出力に出力、そしてエラーを返すような実装をしてみましょう。
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// リスナーをこのアドレスにバインドする
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// タプルの2つ目の要素は、新しいコネクションのIPとポートの情報を含んでいる
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// `Connection` を使うことで、バイト列ではなく、Redis の
// 「フレーム」を読み書きできるようになる
// この `Connection` 型は mini-redis で定義されている
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// エラーを返す
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
早速この accept ループを実行してみます。
$ cargo run
別のターミナルウィンドウで、hello-redis
を実行してください(前の章で作った、SET
/ GET
を行うコマンドです)。
$ cargo run --example hello-redis
以下のように出力されるはずです。
Error: "inimplemented"
サーバー側のターミナルの出力はこんな感じです。
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
並行性
私たちのサーバーはほんの少し問題を抱えています(エラーで返していることの他に、です)。それは、一度に一つのインバウンドリクエストしかさばけていないということです。コネクションが受けつけられたとき、サーバーはレスポンスが完全にソケットに書き込まれるまで accept ループの中にとどまることになってしまっています。
Redis サーバーには、多くの並行したリクエストを処理してもらいたいものです。そうするためには、いくつか並行性を加える必要があります。
コネクションを並行でさばくため、1つ1つのインバウンドコネクションに対して新しいタスクを作ります。このタスクの中でコネクションは処理されることになります。
accept ループは以下のようになります。
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// それぞれのインバウンドソケットに対して新しいタスクを spawn する
// ソケットは新しいタスクに move され、そこで処理される
tokio::spawn(async move {
process(socket).await;
});
}
}
タスク
Tokio のタスクは、非同期のグリーンスレッドで、async
ブロックを tokio::spawn
に渡すことで作らえれます。tokio::handle
関数は JoinHandle
を返し、これを使うことで spawn されたタスクとやりとりをすることができます。async
ブロックは返り値をもつことができて、tokio::handle
の呼び出し元は、JoinHandle
に対して .await
をすることで、この返り値を取得することができます。
以下が例です。
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// 非同期処理を何かやる
"return value"
});
// 他の処理をやる
let out = handle.await.unwrap();
println!("GOT {}", out);
}
JoinHandle
を待ち受ける(.await
する)と、Result
が返ってきます。タスクが、実行中にエラーに遭遇した場合に、JoinHandle
は Err
を返します。これはタスクが panic したか、あるいはランタイムのシャットダウンによってタスクが強制的にキャンセルされた場合に起きます。
タスクは、スケジューラによって管理される実行単位です。タスクを spawn すると、Tokio のスケジューラにそれが登録され、適切なタイミングでタスクが実行されることが保証されます。spawn されたタスクは、spawn されたスレッドと同一のスレッド上で実行されることもあれば、ランタイム上の異なるスレッドで実行されることもあります。spawn されたあとにスレッド間を行き来することもあります。
Tokio におけるタスクはとても軽量です。内部的には、1回のアロケーションしか必要としませんし、64バイトのメモリしか消費しません。アプリケーションは気軽に、数百万とまではいかないものの、数千のタスクを spawn することができます。
'static
境界
Tokio ランタイム上でタスクを spawn するとき、その型は 'static
でなければなりません。つまり、spawn されるタスクは、タスクの外で所有されているデータへのいかなる参照をも含んではならないということです。
例えば、次の例はコンパイルに失敗します:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
コンパイルを試みると、以下のエラーが出てきます。
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
このエラーが発生したのは、変数はデフォルトでは async
ブロックに ムーブ されないからです。v
というベクタは依然として main
関数によって所有されています。println!
の行で v
への借用が要求されます。Rust コンパイラは親切にもこのことを教えてくれていますし、さらに修正方法の提案までしてくれています。7行目を task::spawn(async move {
に変更することで、v
を spawn されるタスクにムーブしますよ、ということをコンパイラに指示することができます。これで、タスクは内部で利用しているすべてのデータを自身で所有することになって、すなわち 'static
になることができます。
もし、データの一部が複数のタスクから非同期にアクセスされる必要があるのであれば、Arc
のような同期プリミティブを使って共有されなければなりません。
ここで、コンパイラが「argument type(ここでは async
ブロック) が 'static
ライフタイムよりも outlive (長生きする)」というエラーメッセージを出していることに注意してください。この言い回しはいくぶん紛らわしいかもしれません。'static
ライフタイムはプログラムの終了まで生存するということなのだから、仮に argument type がそれよりも長生きするというのであれば、それはメモリリークなのではないか?と考えたくなります。これに対しての説明としては、「'static
ライフタイムよりも長く生存するのは、型 の話であって 値 のことではない。型が有効でなくなる前に値は破棄されてもよい」ということになります。
我々が「ある値は 'static
だ」と言うとき、それが意味するのは「その値をずっと保持しつづけたとしても、不正な値になることはない」ということです。コンパイラは、新しく spawn されたタスクがどれだけ長く生存するのかを推論することができないので、タスクが長生きしすぎないということを保証するためには、タスクが永遠に生存しうる、と判断することになります[1]。したがって 'static
であることが重要になってくるのです。[2]
上のボックスの中で Common Rust Lifetime Misconceptions へのリンクを貼りましたが、このリンク先においては、T: 'static
という表記に対して、「その型は 'static
よりも長生きする ("its type outlives 'static
")」とか「その値は 'static
である (the value is 'static
)」という表現ではなく、「'static
による境界付け ("bounded by 'static
")」という用語が使われていますが、これらはすべて同じ意味です。ただし、&'static T
という表記における 「'static
とアノテーションがつけられている (annotated with 'static
)」とは異なります。
Send
境界
tokio::spawn
で spawn されるタスクは 必ず Send
トレイトを実装していなければなりません。.await
によってタスクが一時的に中断されているときに、Tokio ランタイムがそのタスクを別スレッドに移動することができるようにするためです。
.await
をまたいで保持されるデータがすべて Send
である場合に、そのタスクは Send
になります。.await
が呼ばれたとき、タスクの制御はスケジューラに戻され、次にタスクが実行されるとき、中断されたポイントから再開されます。この仕組みが正しく機能するためには、.await
の後に使われるステートは、タスクに保存されていなければなりません。ステートが Send
であるなら、すなわちスレッド間で移動させることができるのであれば、タスク自体もスレッド間で移動させることができるということになります。逆に、ステートが Send
ではなかったら、タスクも Send
になりません。
例えば、以下の例は正しく動きます。
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// { } で囲っていることにより、`rc` が `.await` の前に drop される
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` はもはや使用されない。
// タスクがスケジューラに戻るときには破棄されている
yield_now().await;
});
}
一方、以下は動きません。
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` は `.await` のあとに使用されている。
// つまり、タスクのステートとして保持されなければならない
yield_now().await;
println!("{}", rc);
});
}
コンパイルすると以下のエラーが出てきます。
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
このエラーの特殊なケースについて、次の章で詳しく議論します。
値を保存する
さて、受信したコマンドを処理する process
関数を実装していきましょう。値を保存するのに、HashMap
を使います。SET
コマンドは HashMap
に値を挿入し、GET
はそれを読み出します。加えて、1つのコネクションで複数のコマンドを受け付けることができるように、ループを使いましょう。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// データを蓄えるため `HashMap` を使う
let mut db = HashMap::new();
// `mini-redis` が提供するコネクションによって、ソケットから来るフレームをパースする
let mut connection = Connection::new(socket);
// コネクションからコマンドを受け取るため `read_frame` を使う
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// `Vec<u8> として保存する
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` はデータが Bytes` 型であることを期待する
// この型についてはのちほど解説する
// とりあえずここでは `.into()` を使って `&Vec<u8>` から `Bytes` に変換する
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// クライアントへのレスポンスを書き込む
connection.write_frame(&response).await.unwrap();
}
}
では、サーバーを立ち上げて
$ cargo run
別のターミナルで hello-redis
を実行しましょう。
$ cargo run --example hello-redis
以下のように出力されるでしょう。
got value from the server; success=Some(b"world")
これで値を GET
したり SET
したりすることができるようになりましたが、問題点があります。値がコネクション間で共有されていないということです。別のソケットが接続して hello
キーで GET
しようとしても、何も値が見つかりません。
この章の全体のコードは こちら で確認できます。
次の章では、すべてのソケットに対してデータを保持する実装をやっていきます。
-
訳注: コンパイラは、タスクに対するライフタイムの正確な推論をすることはできないから、生存期間についての最悪ケースである「プログラムの実行中はずっと生存しつづける(より正確には 生存しつづけても問題ない、不正にならない)」という厳しい条件を要求し、この条件を満たすプログラムだけをコンパイルに通す、というような解釈ができると思います。 ↩︎
-
訳注:
'static
ライフタイムの話については、Rustの2種類の 'static も参考になるかもしれません。 ↩︎