Chapter 05

状態を共有する

magurotuna
magurotuna
2022.01.09に更新

原文: https://tokio.rs/tokio/tutorial/shared-state

ここまで、キー・バリュー方式のサーバーが動くところまでやってきました。しかし、大きな問題点がありました――ステートがコネクション間で共有されていないことです。この章でその修正をしていきます。

戦略

Tokio でステートを共有するための方法はいくつかあります。

  1. Mutex を利用して共有ステートを「ガード」する
  2. ステートを管理するためのタスクを spawn し、メッセージの受け渡しによりステートを操作する

一般的には、シンプルなデータに対しては1つ目のやり方、I/O プリミティブのような非同期処理を必要とするものに対しては2つ目のやり方を採用したくなります。この章では、共有されるステートは HashMap で、行う操作は insertget です。これらの操作はどちらも非同期ではありませんから、Mutex を使うことにしましょう。

2つ目のやり方については、次の章で説明をします。

bytes クレートを依存に追加する

Mini-Redis クレートは、Vec<u8> を使う代わりに bytes クレートが提供している Bytes という型を使っています。Bytes の目的は、ネットワークプログラミングのための強固な配列構造を提供することで、最大の特徴としては Vec<u8> に対して浅いクローン("shallow cloning")を追加していることです。つまり、Bytes のインスタンスに対して clone() メソッドを呼んでも、内部のデータはコピーされません。Bytes は内部データに対する参照カウントのハンドルであるということです。Bytes 型はいくつかの追加機能をもった Arc<Vec<u8>> のようなものである、と考えることができます。

bytes クレートを依存に追加するために、以下の行を Cargo.toml[dependencies] セクションに追加してください。

bytes = "1"

HashMap を初期化する

HashMap は多くのタスクの間で、そしておそらく多くのスレッドの間で共有されることになります。これを可能とするために、Arc<Mutex<_>> で包みましょう。

まず、利便性のため、use ステートメントのあとに以下の型エイリアスを追加します。

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

そして HashMap を初期化して process 関数に Arcハンドルを渡すように main 関数を書き換えます。Arc を使うことで、おそらく多くのスレッドで実行されているであろう複数のタスクから並行的に HashMap を参照することが可能になります。なお、Tokio では、何らかの共有ステートへのアクセスを提供する値を指すためにハンドルという用語を用います。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // ハッシュマップへのハンドルを複製する
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

std::sync::Mutex の使用にあたって

ここで、HashMap をガードするために、tokio::sync::Mutex ではなく std::sync::Mutex を使用していることに注意してください。非同期のコードの中で、いかなる場合にも tokio::sync::Mutex を利用するというのは、よくある誤りです。非同期の mutex は、.await の呼び出しをまたいでロックされるような mutex のことです。

同期的な mutex はロックを取得するために待っている間、今のスレッドをブロックします。これにより、他のタスクが処理されることも妨げることになります。しかし、普通は tokio::sync::Mutex に切り替えたからといってこのような状況を改善できるわけではありません。非同期の mutex も、内部では同期的な mutex を利用しているからです。

経験則としては、非同期のコードの中で同期的な mutex を使ってもよいケースは、競合[1]がそれほど多くなく、かつロックが .await をまたいで保持されないような場合に限られます。また、std::sync::Mutex よりもパフォーマンスが良い代替品として、parking_lot::Mutex を利用することも検討してください。

process() を書き換える

process 関数はもう HashMap を初期化する必要はありません。代わりに、HashMap への共有ハンドルを引数として受け取るようにしましょう。そして、HashMap を利用する前にロックをとるようにする必要もあります。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // `mini-redis` が提供するコネクションによって、ソケットから来るフレームをパースする
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // クライアントへのレスポンスを書き込む
        connection.write_frame(&response).await.unwrap();
    }
}

タスク、スレッド、競合

競合がごく少ない状況であれば、クリティカルな短区間のガードをとるためにブロッキング mutex を使うことは妥当な戦略です。ロックの競合が激しい場合、タスクを実行しているスレッドはブロックしながら mutex のことを待たなければなりません。これは、現在のタスクの実行を妨げるだけではなく、そのスレッドで実行予定されている他のタスクをもブロックするということに繋がります。

デフォルトでは Tokio ランタイムはマルチスレッドスケジューラを使います。ランタイムが管理している任意の数のスレッド上で、タスクが実行予約されます。もし、多くのタスクが予約されていて、それらすべてが mutex へのアクセスを必要とするのであれば、それは競合状態にあると言えるでしょう。一方で、current_thread ランタイムが使用されている場合は、mutex が競合状態になることはありません。

current_thread ランタイム は軽量なシングルスレッドランタイムです。spawn するタスクが少なく、わずかなソケットを開くだけといった場合には、このランタイムを使うのが良い選択です。例えば、非同期クライアントライブラリの上に同期的な API への橋渡しを提供するような場合に、このランタイムはうまく機能するでしょう。

もし同期 mutex における競合が問題となったとしても、Tokio の mutex に切り替えることがベストな解決策になることはほとんどありません。そうではなく、以下の選択肢を検討してみるべきでしょう:

  • ステートを管理するための専任タスクを作り、メッセージの受け渡しを行う
  • mutex をシャーディングする
  • mutex を使わずに済むようにコードを再構築する

我々のケースでは、それぞれのキーは独立しているので、mutex をシャーディングするという案がうまくいきます。これを行うため、Mutex<HashMap<_, _>> を1つ作るのではなく、N 個導入します。

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

そうすると、あるキーに対してそれが格納されている場所を見つけるという工程が2ステップになります。まず、ステップ1で、キーを使ってどの "shard" に属しているのかを特定します。そしてステップ2で、HashMap の中からキーを検索します。

// ステップ1: "shard" を特定し、ロックをとる
let shard = db[hash(key) % db.len()].lock().unwrap();

// ステップ2: 特定した "shard" に対して、get や insert などの操作を行う
shard.insert(key, value);

dashmap クレートが、シャード化されたハッシュマップの実装を提供しています。

.await をまたいで MutexGuard を保持する

以下のようなコードを書くことがあるかもしれません。

use std::sync::Mutex;

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // `lock` がここでスコープを抜ける

上記の関数 increment_and_do_stuff を呼び出すようなタスクを spawn しようとすると、以下のようなエラーメッセージが出てくるでしょう。

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

これは、std::sync::MutexGuard 型が Send ではない ために起こっているエラーです。つまり、mutex のロックを別のスレッドに送ることはできないのですが、Tokio のランタイムは .await のところでタスクを別スレッドに動かす可能性があるため、エラーになっている、ということです。このエラーを回避するためには、.await の前に mutex のロックのデストラクタが実行されるよう、コードを書き換える必要があります。

// こう書き換えると、ちゃんと動きます!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock = mutex.lock().unwrap();
        *lock += 1;
    } // `lock` がここでスコープを抜ける(つまりデストラクタが実行される)

    do_something_async().await;
}

ここで注意が必要なのは、以下の例は動かないということです:

use std::sync::Mutex;

// 動かない例
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

これがコンパイルできないのは、コンパイラは現在のところ、future が Send なのか否かをスコープ情報だけに基づいて算出しているからです。もしかすると、将来、明示的な drop に対応するようにコンパイラが改良されるかもしれませんが、とりあえず今のところは、スコープを使わなければならないということです。

なお、ここで説明したエラーについては、前章の Send 境界 でも登場しました。

この問題を巧みに回避するために、タスクが Send であることを要求されないようなやり方で spawn しようと考えつくかもしれませんが、これはやめるべきです[2]。あるタスクが mutex のロックをもったまま .await の時点で Tokio によって中断させられたとしましょう(タスクA)。そして、同じスレッドで実行予約されうる別のタスクが、mutex のロックを取得しようとするとしましょう(タスクB)。このような状況では、デッドロックが発生し得ます―― mutex のロック取得を待機している タスクB によって、今 mutex のロックを保持している タスクA がロックを解放することが妨げられる、といったことになりうるからです。

このエラーを直すためのやり方をいくつか紹介します。

.await をまたいでロックを保持することがないようにコードを書き換える

すでに上でこの例については紹介しましたが、もっと強固なやり方を紹介します。例えば、mutex を struct で包んで、その struct に実装した async ではないメソッドの中でのみ mutex のロックをとる、というようなやり方です。

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // この関数は `async` ではない
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

このデザインパターンを用いることで、Send に関するエラーに遭遇することがないと保証されます。async 関数の中では、mutex のガードについての記述が一切現れないからです。

ステートを管理するためのタスクを spawn し、操作のためにメッセージの受け渡しを利用する

これはこの章のはじめに言及した2つ目のやり方ですが、共有されるリソースが I/O リソースである場合によく使われます。詳細については次の章で解説します。

Tokio の非同期 mutex を利用する

Tokio が提供している tokio::sync::Mutex 型を利用することもできます。Tokio の mutex の主な特徴は、.await をまたいでも問題なくロックを保持することができるということです。とはいえ、非同期 mutex は通常の mutex に比べてかなり重たいので、一般的には他の2つのアプローチのうちのどちらかを選択するほうが良いです。

use tokio::sync::Mutex; // 注意!Tokio の mutex を使っている

// これはコンパイルできる!
// (ただしこの場合はコードを書き換えたほうが良い)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // `lock` はここでスコープを抜ける
脚注
  1. 訳注: 原文では "contention" という表記。「競合」という訳よりもそのまま「コンテンション」のほうが意味が伝わりやすいかもしれません。 ↩︎

  2. 訳注: ここの訳、あまり自身がないので、原文もご参照ください。Send でなくても spawn するための関数として tokio::task::spawn_local というのがあるので、これを使って回避しようとするな、という意味だととらえました。 ↩︎