Chapter 06

チャネル

magurotuna
magurotuna
2021.02.24に更新

原文: https://tokio.rs/tokio/tutorial/channels

ここまで Tokio を使った並行処理について学んできたいくつかのことを、クライアントサイドに適用してみましょう。2並行で Redis コマンドを実行させたいとします。コマンド1つにつき1つのタスクを spawn して、2つのコマンドを並行に処理していきます。

まず、以下のようなコードを試してみます:

use mini_redis::client;

#[tokio::main]
async fn main() {
    // サーバーへのコネクションを確立する
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 2つのタスクを spawn する。
    // タスク1 はキーによる "get" を行い、// タスク2 は値を "set" する。
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

これはコンパイルに通りません。どうにかして、両方のタスクが client にアクセスする必要があるためです。Client 型は Copy トレイトを実装していないので、client の共有を行えるようにコードを追加しなければ、コンパイルができないのです。さらに言うと、Client::set&mut self をとっています。つまり、これを呼び出すためには、排他制御が必要だということです。タスクごとにコネクションを開くことも可能ではありますが、理想的ではありません。std::sync::Mutex を使うという考えもありますが、ロックを保持した状態で .await する必要があるので、これもだめです。tokio::sync::Mutex を使ったとしても、同時にさばけるリクエストは1つだけなので、微妙です。もしクライアントが Redis のパイプライン を実装しているとすると、非同期 mutex を使うことは、コネクションを有効に活用できないことにつながります。

メッセージ受け渡し

解決策は、「メッセージ受け渡し」パターンを使うことです。このパターンでは、client リソースを管理するための専用タスクを spawn します。リクエストの発行を希望するタスクは、client タスクへとメッセージを送ります。clientタスクは、リクエストの送信者の代わりにリクエストを発行し、送信者にレスポンスを送り返します。

この方法を用いることで、単一のコネクションが確立されます。client を管理するタスクは getset を呼ぶだけの排他的なアクセス権を得ることができます。さらに、チャネルはバッファとしても機能します。client タスクがビジー状態にある間にメッセージが送られてくることがあるかもしれませんが、client タスクが新しいリクエストを処理する余裕ができたタイミングで、次のメッセージをチャネルから取得すれば良いのです。これにより、スループットの改善につながりますし、コネクションプーリングをするように拡張することも可能です。

Tokio のチャネルプリミティブ

Tokio は、それぞれが異なる目的をもつ 多くのチャネル を提供しています。

  • mpsc: multi-producer, single-consumer 型のチャネル。たくさんの値を送ることができる。
  • oneshot: single-producer, single-consumer 型のチャネル。1つの値を送ることができる。
  • broadcast: multi-producer, multi-consumer 型のチャネル。受信側はすべての値を見ることができる。
  • watch: single-producer, multi-consumer 型のチャネル。たくさんの値を送ることができるが、履歴は残らない。受信側は最新の値のみを見ることができる。

multi-producer, multi-consumer 型のチャネルで、それぞれのメッセージを見ることができる消費者(受信者)は1つだけ、というようなチャネルがほしい場合は、async-channel というクレートを使うと良いでしょう。他にも、非同期 Rust 以外の場面で使うことのできるチャネルもあります。std::sync::mpsccrossbeam::channel などです。これらのチャネルはスレッドをブロックすることによってメッセージを待ち受けます(これは非同期コードの中では許されないことです)。

この章では、mpsconeshot を利用します。メッセージ受け渡しチャネルの他のタイプについては、のちの章で触れます。この章のコードの全体は こちら で確認可能です。

メッセージ型を定義

ほとんどの場合、メッセージ受け渡しを使うときには、メッセージを受け取るタスクは複数種類のコマンドに応答をすることになります。我々のケースでは、タスクは GETSET という2つのコマンドに対して応答します。これをモデル化するために、まず Command という enum を定義して、それぞれのコマンドに応じた値をもたせましょう。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

チャネルを作成

main 関数内で mpsc チャネルを作成します。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 最大 32 のキャパシティをもったチャネルを作成
    let (tx, mut rx) = mpsc::channel(32);

    // ... 残りのコードをここに書く
}

Redis コネクションを管理しているタスクへコマンドを送信するために mpsc チャネルを使います。"multi-producer" のおかげで、多くのタスクからメッセージを送信することができます。チャネルを作成すると2つの値が返ってきます―― sender (送信側)と receiver (受信側) です。これら2つを別々に使います。別々のタスクへとムーブすることが可能です。

チャネルをキャパシティ 32 で作成しました。もしメッセージが受信されるよりも早く次のメッセージが送信されたら、チャネルはそれを蓄えておきます。メッセージが32個蓄えられると、受信側によってメッセージが除去されるまでの間、send(...).await の呼び出しはスリープすることになります。

複数のタスクからの送信は、Senderクローン することによって実現されます。例えば:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

メッセージは両方とも1つの Receiver (上記の例では rx)へと送られます。mpsc チャネルにおいては、受信側をクローンすることはできません。

すべての Sender がスコープを抜ける、あるいは別の方法で drop された場合、チャネルにそれ以上のメッセージを送ることはできなくなります。この時点で、Receiver に対する recv の呼び出しは None を返すようになり、すべての送信者がいなくなってチャネルが閉じられた、ということを意味します。

我々の、Redis コネクションを管理するタスクというケースにおいては、チャネルが閉じられたならば Redis コネクションも閉じてもよいということが分かります。コネクションがそれ以降使われることはないからです。

"マネージャー" タスクを spawn する

続いてチャネルからのメッセージを処理するタスクを spawn しましょう。まず、クライアントコネクションが Redis に対して確立されます。それから、Redis コネクションを経由して受信したコマンドが発行します。

use mini_redis::client;
// `rx` の所有権をタスクへとムーブするために `move` キーワードを付ける
let manager = tokio::spawn(async move {
    // サーバーへのコネクションを確立する
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // メッセージの受信を開始
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

以前作った2つのタスクを書き換えます。コマンドを直接 Redis コネクションに対して発行するのではなく、チャネルを使って送信するようにしてみましょう。

// `Sender` ハンドルはタスクにムーブされる。
// タスクは2つあるので、2つ目の `Sender` を作る必要がある。
let tx2 = tx.clone();

// タスク1は "get" を、タスク2は "set" を担当する
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

プロセスが終了してしまう前にコマンドがしっかりと完了したことを保証するため、main 関数の一番下で join ハンドルを .await します。

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

レスポンスを受け取る

最後のステップは、"マネージャー" タスクから返ってくるレスポンスを受け取ることです。GET コマンドは値を取得する必要があり、SET コマンドは処理が正常に終了したかどうかを知る必要があります。

レスポンスを渡すために oneshot チャネルを使います。oneshot チャネルは signle-producer, single-consumer 型のチャネルで、単一の値を送信するのに最適です。我々のケースでは、「単一の値」とはまさにレスポンスのことです。

mpsc と同様に、 oneshot::channel() は送信側のハンドルと受信側のハンドルを返します。

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

mpsc とは違いキャパシティの指定はありません。oneshot ではキャパシティは常に1だからです。また、ハンドルはどちらもクローンすることができません。

"マネージャー" タスクからレスポンスを受け取るために、コマンドを送信する前に oneshot チャネルを作成します。"マネージャー" タスクに送信するコマンドに、oneshot チャネルの Sender 側を含めるようにします。Receiver 側はレスポンスを受け取るために使います。

まず、CommandSender を持つように書き換えましょう。便宜のため、Sender を指すための型エイリアスを宣言しておきます。

use tokio::sync::oneshot;
use bytes::Bytes;

// 複数の異なるコマンドは1つのチャネルを通して「多重化 (multiplexed)」される
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Vec<u8>,
        resp: Responder<()>,
    },
}

/// リクエストを送る側が生成する。
/// "マネージャー" タスクがレスポンスをリクエスト側に送り返すために使われる
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

タスクを書き換えて、oneshot::Sender を含むようなコマンドを発行するようにしましょう。

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };

    // GET リクエストを送信
    tx.send(cmd).await.unwrap();

    // レスポンスが来るのを待つ
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: b"bar".to_vec(),
        resp: resp_tx,
    };

    // SET リクエストを送信
    tx2.send(cmd).await.unwrap();

    // レスポンスが来るのを待つ
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

そして最後に、"マネージャー" タスクを修正します。oneshot チャネルを通してレスポンスを送るようにしましょう。

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // エラーは無視する
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val.into()).await;
            // エラーは無視する
            let _ = resp.send(res);
        }
    }
}

oneshot::Sendersend は即座に完了するため、.await をする必要はありません。oneshot チャネルの send は、どんな待機もすることなく即座に「失敗」または「成功」をするからです。

oneshot チャネルに値を送る際、受信側がすでに drop されていたら、結果は Err になります。これは受信側はもはやレスポンスに一切興味がない、ということを暗に示しています。我々のケースにおいても、受信側がそれ以上レスポンスを必要としなくなるということは発生し得ます。したがって、resp.send(...)Err を返してきたとしても、それを特別に処理する必要はありません。

全体のコードは こちら で確認できます。

バックプレッシャーと有界なチャネル

並行性とキューイングを導入するときは常に、キューが有界であって、システムが優雅に負荷を処理することができるということを確認するのが重要です。有界でないキューは最終的に利用可能なメモリをすべて埋め尽くし、システムが予測のつかない落ち方をすることに繋がります。

Tokio は暗黙的なキューイングを避けるように注意しています。これの大部分は、async 処理が "lazy" であるという事実に基づきます。以下の例を考えてみましょう:

loop {
    async_op();
}

もしこの非同期処理が正格[1]に実行されるのであれば、このループは、前回の処理が終わったかどうかを確認せず、次々と新しい async_op をキューに入れることになります。結果として、暗黙的な無制限のキューイングをしていることになります。コールバックベースのシステムと、正格な future ベースのシステムは特にこの問題の影響を受けやすいです。

しかし、Tokio と非同期 Rust においては、上記のコードによって async_op が実行されることはありません。.await が呼ばれていないからです。.await を使うようにコードを書き換えたら、async_op が完了してはじめて次のループに進むようになります。

loop {
    // `async_op` が完了するまではループが繰り返されない
    async_op().await;
}

並行性とキューイングは明示的に導入されなければなりません。これを実現するためには、以下のような方法があります:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

これらを行う場合、並行度の総量が確実に有界となるように注意してください。例えば、TCP を受け付けるループを書く場合、開かれているソケットの合計数が有界となるようにしてください。mpsc::channel を使うときは、適切なチャネルキャパシティを選択してください。具体的な限界値はアプリケーションごとに異なってくるでしょう。

良い限界値を注意深く選ぶことは、信頼性のある Tokio アプリケーションを書く上で非常に重要です。

脚注
  1. 訳注: "eagerly" の訳として「正格」を採用しました。"lazy" (=遅延) の対義語です。 ↩︎