原文: https://tokio.rs/tokio/tutorial/spawning

ギアを上げて、Redis サーバーに取りかかりはじめましょう。

まず最初に、前の章で書いたクライアント側の SET / GET のコードを、example ファイルに移動しましょう。こうすることで、私たちがこれから作るサーバーに対して、このコードを実行することができるようになります。

$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

それから新しい空っぽのの src/main.rs ファイルを作って、進めていきましょう。

ソケットを受けつける

私たちの Redis サーバーがまず最初にしなければならないのは、外から内への(インバウンドの)TCP ソケットを受けつけることです。これは、tokio::net::TcpListener を使うことによって実現できます。

Tokio の多くの型は、Rustの標準ライブラリに用意されている「同期」版の API と同じ名前がつけられています。そうするのが合理的な場合、Tokio は std と同じ API を async fn を使って公開しているということです。

TcpListener6379 番ポートにバインドされ、そしてループの中でソケットが受けつけられます。それぞれのソケットは処理されてからクローズされます。とりあえず、コマンドを読んで、それを標準出力に出力、そしてエラーを返すような実装をしてみましょう。

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // リスナーをこのアドレスにバインドする
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // タプルの2つ目の要素は、新しいコネクションのIPとポートの情報を含んでいる
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // `Connection` を使うことで、バイト列ではなく、Redis の
    // 「フレーム」を読み書きできるようになる
    // この `Connection` 型は mini-redis で定義されている
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // エラーを返す
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

早速この accept ループを実行してみます。

$ cargo run

別のターミナルウィンドウで、hello-redis を実行してください(前の章で作った、SET / GET を行うコマンドです)。

$ cargo run --example hello-redis

以下のように出力されるはずです。

Error: "inimplemented"

サーバー側のターミナルの出力はこんな感じです。

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

並行性

私たちのサーバーはほんの少し問題を抱えています(エラーで返していることの他に、です)。それは、一度に一つのインバウンドリクエストしかさばけていないということです。コネクションが受けつけられたとき、サーバーはレスポンスが完全にソケットに書き込まれるまで accept ループの中にとどまることになってしまっています。

Redis サーバーには、多くの並行したリクエストを処理してもらいたいものです。そうするためには、いくつか並行性を加える必要があります。

並行(concurrency)と並列(parallelism)は同じものではありません。もしあなたが、2つのタスクを交互に行うのなら、それはそれらのタスクを「並行して」こなしているということになりますが、「並列」ではありません。これを「並列」にしたいのであれば、2人の人間が必要で、それぞれのタスクにそれぞれの人間を専任させることになるでしょう。

Tokio を使うことによる利点の1つは、非同期コードを書くことによって、普通のスレッドを利用して多くのタスクを並列に処理するのではなく、「並行に」処理することができるようになる、という点です。実際、Tokio を使えば、たとえ単一のスレッドであったとしても、多くのタスクを並行して実行することができるのです!

コネクションを並行でさばくため、1つ1つのインバウンドコネクションに対して新しいタスクを作ります。このタスクの中でコネクションは処理されることになります。

accept ループは以下のようになります。

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // それぞれのインバウンドソケットに対して新しいタスクを spawn する
        // ソケットは新しいタスクに move され、そこで処理される
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

タスク

Tokio のタスクは、非同期のグリーンスレッドで、async ブロックを tokio::spawn に渡すことで作らえれます。tokio::handle 関数は JoinHandle を返し、これを使うことで spawn されたタスクとやりとりをすることができます。async ブロックは返り値をもつことができて、tokio::handle の呼び出し元は、JoinHandle に対して .await をすることで、この返り値を取得することができます。

以下が例です。

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // 非同期処理を何かやる
        "return value"
    });

    // 他の処理をやる

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

JoinHandle を待ち受ける(.await する)と、Result が返ってきます。タスクが、実行中にエラーに遭遇した場合に、JoinHandleErr を返します。これはタスクが panic したか、あるいはランタイムのシャットダウンによってタスクが強制的にキャンセルされた場合に起きます。

タスクは、スケジューラによって管理される実行単位です。タスクを spawn すると、Tokio のスケジューラにそれが登録され、適切なタイミングでタスクが実行されることが保証されます。spawn されたタスクは、spawn されたスレッドと同一のスレッド上で実行されることもあれば、ランタイム上の異なるスレッドで実行されることもあります。spawn されたあとにスレッド間を行き来することもあります。

Tokio におけるタスクはとても軽量です。内部的には、1回のアロケーションしか必要としませんし、64バイトのメモリしか消費しません。アプリケーションは気軽に、数百万とまではいかないものの、数千のタスクを spawn することができます。

'static 境界

Tokio ランタイム上でタスクを spawn するとき、その型は 'static でなければなりません。つまり、spawn されるタスクは、タスクの外で所有されているデータへのいかなる参照をも含んではならないということです。

'static が常に「永遠に生存する」ことを意味している、というのはよくある誤解です。ある値が 'static だからといって、メモリがリークするということにはなりません。詳しくは、Common Rust Lifetime Misconceptions を参照してください。

例えば、次の例はコンパイルに失敗します:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

コンパイルを試みると、以下のエラーが出てきます。

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

このエラーが発生したのは、変数はデフォルトでは async ブロックに ムーブ されないからです。v というベクタは依然として main 関数によって所有されています。println! の行で v への借用が要求されます。Rust コンパイラは親切にもこのことを教えてくれていますし、さらに修正方法の提案までしてくれています。7行目を task::spawn(async move { に変更することで、v を spawn されるタスクにムーブしますよ、ということをコンパイラに指示することができます。これで、タスクは内部で利用しているすべてのデータを自身で所有することになって、すなわち 'static になることができます。

もし、データの一部が複数のタスクから非同期にアクセスされる必要があるのであれば、Arc のような同期プリミティブを使って共有されなければなりません。

ここで、コンパイラが「argument type(ここでは async ブロック) が 'static ライフタイムよりも outlive (長生きする)」というエラーメッセージを出していることに注意してください。この言い回しはいくぶん紛らわしいかもしれません。'static ライフタイムはプログラムの終了まで生存するということなのだから、仮に argument type がそれよりも長生きするというのであれば、それはメモリリークなのではないか?と考えたくなります。これに対しての説明としては、「'static ライフタイムよりも長く生存するのは、 の話であって のことではない。型が有効でなくなる前に値は破棄されてもよい」ということになります。

我々が「ある値は 'static だ」と言うとき、それが意味するのは「その値をずっと保持しつづけたとしても、不正な値になることはない」ということです。コンパイラは、新しく spawn されたタスクがどれだけ長く生存するのかを推論することができないので、タスクが長生きしすぎないということを保証するためには、タスクが永遠に生存しうる、と判断することになります[1]。したがって 'static であることが重要になってくるのです。[2]

上のボックスの中で Common Rust Lifetime Misconceptions へのリンクを貼りましたが、このリンク先においては、T: 'static という表記に対して、「その型は 'static よりも長生きする ("its type outlives 'static")」とか「その値は 'static である (the value is 'static)」という表現ではなく、「'static による境界付け ("bounded by 'static")」という用語が使われていますが、これらはすべて同じ意味です。ただし、&'static T という表記における 「'static とアノテーションがつけられている (annotated with 'static)」とは異なります。

Send 境界

tokio::spawn で spawn されるタスクは 必ず Send トレイトを実装していなければなりません。.await によってタスクが一時的に中断されているときに、Tokio ランタイムがそのタスクを別スレッドに移動することができるようにするためです。

.await をまたいで保持されるデータがすべて Send である場合に、そのタスクは Send になります。.await が呼ばれたとき、タスクの制御はスケジューラに戻され、次にタスクが実行されるとき、中断されたポイントから再開されます。この仕組みが正しく機能するためには、.await後に使われるステートは、タスクに保存されていなければなりません。ステートが Send であるなら、すなわちスレッド間で移動させることができるのであれば、タスク自体もスレッド間で移動させることができるということになります。逆に、ステートが Send ではなかったら、タスクも Send になりません。

例えば、以下の例は正しく動きます。

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // { } で囲っていることにより、`rc` が `.await` の前に drop される
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` はもはや使用されない。
        // タスクがスケジューラに戻るときには破棄されている
        yield_now().await;
    });
}

一方、以下は動きません。

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` は `.await` のあとに使用されている。
        // つまり、タスクのステートとして保持されなければならない
        yield_now().await;

        println!("{}", rc);
    });
}

コンパイルすると以下のエラーが出てきます。

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

このエラーの特殊なケースについて、次の章で詳しく議論します

値を保存する

さて、受信したコマンドを処理する process 関数を実装していきましょう。値を保存するのに、HashMap を使います。SET コマンドは HashMap に値を挿入し、GET はそれを読み出します。加えて、1つのコネクションで複数のコマンドを受け付けることができるように、ループを使いましょう。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // データを蓄えるため `HashMap` を使う
    let mut db = HashMap::new();

    // `mini-redis` が提供するコネクションによって、ソケットから来るフレームをパースする
    let mut connection = Connection::new(socket);

    // コネクションからコマンドを受け取るため `read_frame` を使う
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // `Vec<u8> として保存する
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` はデータが Bytes` 型であることを期待する
                    // この型についてはのちほど解説する
                    // とりあえずここでは `.into()` を使って `&Vec<u8>` から `Bytes` に変換する
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // クライアントへのレスポンスを書き込む
        connection.write_frame(&response).await.unwrap();
    }
}

では、サーバーを立ち上げて

$ cargo run

別のターミナルで hello-redis を実行しましょう。

$ cargo run --example hello-redis

以下のように出力されるでしょう。

got value from the server; success=Some(b"world")

これで値を GET したり SET したりすることができるようになりましたが、問題点があります。値がコネクション間で共有されていないということです。別のソケットが接続して hello キーで GET しようとしても、何も値が見つかりません。

この章の全体のコードは こちら で確認できます。

次の章では、すべてのソケットに対してデータを保持する実装をやっていきます。

脚注
  1. 訳注: コンパイラは、タスクに対するライフタイムの正確な推論をすることはできないから、生存期間についての最悪ケースである「プログラムの実行中はずっと生存しつづける(より正確には 生存しつづけても問題ない、不正にならない)」という厳しい条件を要求し、この条件を満たすプログラムだけをコンパイルに通す、というような解釈ができると思います。 ↩︎

  2. 訳注: 'static ライフタイムの話については、Rustの2種類の 'static も参考になるかもしれません。 ↩︎