Open14

Rust 非同期処理

おしぼりおしぼり

一通り読み終えたので、分からなかった部分を中心に復習していく
復習を終えたらロックの実装章を何も見ずに一回やってみる

おしぼりおしぼり

impl って結局なんだっけ?

ざっくりと

  1. メソッドを定義する
  2. トレイトを実装する

場合に分けられると思う。

以下のような impl <構造体> はメソッドを定義する場合に使う。

struct User {
    name: String,
}

impl User {
    fn change_name(&mut self, name: &str) {
        self.name = name.to_string();
    }
}

以下のような impl <トレイト> for <構造体> はとレイトを実装する場合に使う。

impl Drop for User {
    fn drop(&mut self) {
        println!(self.name);
    }
}

上記に加えてジェネリクスを使う場合のバリエーションもあるが、特段説明は不要だと思う。

トレイトを実装する場合に {} だけの場合がある?

これは特別なトレイトで、マーカトレイトと呼ばれる。
書籍中に頻出するマーカトレイトは SendSync である。

マーカトレイトの役割はあくまでコンパイラに構造体に関する情報を伝えるためのものであり、例えば Send ならその構造体 T の所有権を安全に別スレッドに転送できることを伝えている。Sync はその構造体 T の参照を複数スレッドで安全に共有できることを伝えている(参考)。

単に SendSync であることを伝えるなら以下のように書ける。

impl Send for User {};
impl Sync for User {};

もしジェネリクスを使う構造体がある場合、以下のように書く場合がある。
以下の記述はジェネリクスを使う構造体 Wrap<T>TSend である場合にのみ Send であることをコンパイラに伝えている(Sync も同様)。

impl<T> Send for Wrap<T> where T: Send {};
impl<T> Sync for Wrap<T> where T: Sync {};

これは、ある構造体 U が Send であるためには U を構成するデータ型も Send である必要があり、ジェネリクスを使い U<T> を定義する場合、その T が Send であるかどうかによって最終的に U<T>Send かどうかを決める必要があるからである(Sync も同様)。

また、SendSync に関しては unsafe impl も書籍中に頻出する。
マーカトレイトは実装する型が Send あるいは Sync である条件を満たしているならそれをコンパイラに伝えるだけだが、並行処理の機構を自ら実装する場合には複数スレッドからの参照の安全性などを自分のロジックにより担保する必要があり、そのロジックが正当かどうかはコンパイラには判断できない。そのため、unsafe impl と書くことにより unsafe なコードがあるけど、頑張って Send あるいは Sync であることを保証したから許してね、ということを伝える必要がある(参考)。

unsafe impl Send for User {};
おしぼりおしぼり

ライフタイム

ライフタイムは参照の生存期間を明確にするためのものであり、それ自体の説明は不要だと思う。
書籍中に出てくるライフタイム関連の記述としては、関数の引数と戻り値が共に一つの参照である場合に、そのライフタイムは同じ期間になる というライフタイム省略の話くらいだと思う(参考)。

以下は参考元のサンプルコードをそのまま持ってきたものである。

// これは
fn func(x &i32) -> &i32
// これと同じ
fn func<'a>(x: &'a i32) -> &'a i32
おしぼりおしぼり

クロージャとキャプチャ

参考)の記述をそのまま引用すると、クロージャとは 変数に束縛できたり,関数の引数として渡すことのできる名前のない関数(無名関数)のこと である。キャプチャはクロージャの呼び出し元の変数をクロージャのスコープに持ってくることを意味する(超絶ざっくり)。詳細の解説は参考元がわかりやすいのでそちらに任せる。書籍の文脈では std::thread::spawn がクロージャを渡す実装になっていて、ここが理解できたらとりあえずはいいのかなと思う。

例えば、以下のコードは numbers という変数の所有権をクロージャに移動させている。

let numbers = vec![1, 2, 3];
thread::spawn(move || {
    for n in numbers {
        println!("{n}");
    }
}).join().unwrap();

もし上記の例のように所有権を移動してしまうと困る場合はコピーする。参考元にも書いてあるが、この時キャプチャする変数は当然 Copy である必要がある。

let num = 42;
thread::spawn(|num| {
    println!(num);
}).join().unwrap();
おしぼりおしぼり

借用

関数の引数に参照を取ることを借用と呼ぶ(参考)。
借用に関しては普通に可変借用と不変借用がわかっていれば問題なく書籍は読める。メソッドやトレイトの実装をする時に &mut self&self かを考えながら実装できれば良い。

pub

そのまんま(参考

おしぼりおしぼり

以降ではロックの実装を念頭にいくつか知見的なものを復習していく

おしぼりおしぼり

Guard の実装

SpinLock の実装で出てきた、安全なロックのインターフェースを提供するテックニックである。ロック系の機構を実装する場合、当たり前だが lockunlock という操作を考える必要がある。例えば排他的なロックなら lock はロックしたいデータ T に対する排他参照 &mut T を返すのが最も直感的な実装である。

fn lock(&self) -> &mut T {
    ...
}

この場合 unlock は以下のようなインターフェースになりそうである。

fn unlock(&self) {
    ....
}

ただ、上記の実装だと unlock によって確実に lock の時に取得した参照がなくなっていることをコンパイル時に保証できない。実装的には内部で locked みたいな変数を使ってロック状態を表現することはできるが、あくまでロジックにより担保しているだけだ。

Guard のアイデアとしては、&mut T を返す代わりに不変参照を表す構造体を返して、その構造体を drop すれば良い。こうすれば不変参照を取り上げるという操作を drop というコンパイラが検証可能な振る舞いとして表現できる。

これによって明示的な unlock というインターフェースを作らずに drop するだけで unlock と同じことができるようになる。

Deref, DerefMut

上記のような Guard は何かしらの型 T の参照として振る舞うので、できたら使う側がその T と同じようなインターフェースで使えるようにしたい。これをもう少し正確に表現すると、Guard を通して T への透過的な参照をしたいということになる。

Deref, DerefMut はそれぞれ透過的な不変参照と可変参照を提供するためのインターフェースとなる。

おしぼりおしぼり

内部可変性周りの話

基本的に Rust は可変参照を複数の場所から行うことを許容しないが、厳密にこのルールに従うとマルチスレッド間で書き込み可能なデータを共有できないということになる。逃げ道として用意されているのが内部可変性であり、内部可変性は可変参照を通してデータを変更できるようにする。

Cell

  • 参照は値のコピーを取り出すだけ
  • 変更は全体を置き換えるだけ

RefCell

  • 借用できる
  • 可変と不変どちらもできる
  • 不変のときに可変を得ようとするとダメ

パーキング・条件変数

パーキング

  • park は他のスレッドに unpwark されるまでスレッドを待機させる
  • unpark はスレッドを指定して再開させる

parking の例は少し理解に手間取った。

  • パーキングを消しても理論的に正しいプログラム?
    • 実装例は確かに排他ロックを取得しているため完全に正しい
  • park する前に他のスレッドから unpark されても問題ない
    • 内部的には unpark されたことは記録されていて、park しようとしたらこの unpark の記録が消えるだけで、動作的には park せずに処理を続ける
    • これをやらないと、書籍の例だと既にアイテムは存在しているのに park し続けることになる

条件変数

待機するスレッドが増えるとパーキングは破綻するので、条件変数を使う。
条件変数は Mutex と一緒に使う。通知側は待機側を意識しなくて良い(というよりは実装側でなんとかしてて使う側は意識しないという話)ので、好きなだけ待機スレッドを増やせる。

実装的には wait する側が mutex を引数に取る必要がある。

おしぼりおしぼり

メモリオーダリング

この本はもうメモリオーダリングの本です。本当に。
以下は完全に自分用のメモ。

Relaxed

基本的にアトミック変数を操作する順番をいい感じに制御したいというのがメモリオーダリングの話で、アトミック変数自体が保証するのは atomicity だけなので、順序は何も保証しないよねという話。

でも、Relaxed の場合でも全変更順序だけは保証していて、他のすべてのスレッドから見たときの更新順序は全く同じになる。観測する側のスレッドによってその順序が変わるようなことはない。

Release/Aquire

Relaxedだとある一つのアトミック変数に対する読み込みと書き込みが、それぞれの操作自体はアトミックであるものの、順序が逆転する可能性がある。もちろん同じスレッド間なら順序は逆転しない。つまり、複数のスレッドからアトミックに読み書きを行う場合、その操作は同じスレッド間では先行発生関係があるが、複数スレッド間では何も先行発生関係がない。

この複数スレッド間の先行発生関係を保証するのが Release/Aquire オーダリング。

Release は書き込みが他のスレッドの Aqurire よりも前に他のスレッドから見えることを保証する。同じ意味だが、Aquire は読み込みが他のスレッドの Release な書き込みのあとに行われることを保証する。例えば、アトミックなフラグを読み書きするケースを考える。スレッド A がそのフラグに対して値を書き込み、スレッド B がそのフラグの値を読み込むとする。アトミック変数なので、その書き込みや読み込み操作自体のアトミック性は保証されている(つまり読み書きしてる途中で他のスレッドから読み書きされない)。しかし順序に関しては保証していないので、スレッド A の書き込みとスレッド B の読み込みの順序は全く保証されていない。これは単にアトミック性だけが必要なら問題ないが、ロックの取得や解放のためのフラグ( false だと獲得可能で、true だと解放まで待機するスピンロックなど)だと順序が大事になる。なぜなら、ロックの解放であるフラグへの false 書き込みとロックの取得のために必要な読み込みの順序が逆転してしまうと、本当はロックが解放されていても true (解放の書き込みより前)を読み込んでしまう可能性があり、ロックが解放されていないと勘違いしてしまうから。

おしぼりおしぼり

セマフォ

バイナリセマフォは Mutex なので、カウントセマフォを念頭に置く。セマフォは単にリソースに対してアクセス可能なスレッドの数を制限する。セマフォは排他的なアクセス(つまりある時点でアクセス可能なスレッドは一つ)を提供しており、複数スレッドからのアクセスを許可するかどうかが異なる。セマフォは数を制限するだけで、それ以上のことはしない。現在のアクセス数を数える変数に Mutex を使うくらい。

実装的には以下のようなイメージ。通知は条件変数を使う。

pub struct Semaphore {
    max: isize,
    cond: Condvar,
    mutex: Mutex<isize>,
}

セマフォを使うと送信側の数を制限したチャネルなどを実装できる。
こちらも実装のイメージだけ。

#[derive(Clone)]
pub struct Sender<T> {
    sem: Arc<Semaphore>,
    buf: Arc<Mutex<LinkedList<T>>>,
    cond: Arc<Condvar>,
}

pub struct Receiver<T> {
    sem: Arc<Semaphore>,
    buf: Arc<Mutex<LinkedList<T>>>,
    cond: Arc<Condvar>,
}
おしぼりおしぼり

デッドロック

  • 初期状態から遷移可能で、かつ次の状態がない = デッドロック
  • RWLock で RL を取得中に WL を取得するとデッドロックする
    • WL は RL の解放を待つが、RL の解放前に WL の待機があるため RL は解放されない
    • Rust だと RL を取得時に即座に解放する場合は問題ない
  • ライブロック はロックのデッドロックは起きていないが、リソースは獲得できない状態
  • 飢餓 は特定のプロセスだけがロックを獲得できない状態
  • 再帰ロック はロック取得中にそのロックを取得しようとすること
    • 普通の Mutex だとデッドロックになる
    • 処理継続可能なロックは 再入可能(reentrant) なロックと呼ばれる
    • 再入可能なロックのアルゴリズムは以下の通り
  1. ロック取得する
  2. 再度ロック取得する
      - この時自身がロック取得中ならカウント値をインクリメント
  3. ロック開放は自身のカウント値がゼロになったら
use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
   let val = Arc::new(RwLock::new(true));
   let t = thread::spawn(move || {
       let flag = val.read().unwrap(); // flag の drop まで RL 取得中
       // let flag = *val.read().unwrap(); なら即座に解放される
       // その場合 if flag ... となる
       if *flag {
           *val.write().unwrap() = false; 
           println!("flag is true");
       }
      });

t.join().unwrap();
}
おしぼりおしぼり

擬似覚醒

  • 条件変数を使ってる時に条件が満たされてないのに wait から帰ること
  • シグナルとマルチスレッドは相性悪い
    • メイン処理とシグナルハンドラどちらもロックを取得する関数だとする
    • メイン処理がロック取得後にシグナルハンドラが起動する
    • シグナルハンドラはロックを取得しようとするが、ロックはメイン処理が取得済み
    • しかもメイン処理はシグナルハンドラにブロックされるためロックを解放できない
    • デッドロックになる
    • シグナルハンドラがメイン処理をブロックするのが原因なので、シグナルハンドラ用のスレッドを別で用意してあげると解決できる
    • メインスレッドはシグナルをブロックして、シグナルハンドラ用のスレッドは同期的にシグナルを待つ
おしぼりおしぼり

グリーンスレッド

ユーザーランドで実装するスレッドのこと。コンテキストスイッチなどをユーザーランドからやる。