サーバーの作成のチュートリアルをもとにRustのArc、Mutex、トレイト境界、ライフサイクル境界の説明をしてみる
はじめに
著者はRust初心者です。間違いなどあればご指摘いただければと存じます。
以下のハンズオンをもとに説明する
最後のプロジェクト: マルチスレッドのWebサーバを構築する
シングルスレッドサーバーを立てる
手順
- tcpListenerを127.0.0.1:7878に設定
- listenerでクライアントからのリクエストを処理
- handle_connectionでhttpのヘッダーを読み取ってその内容によって処理や表示ページを分ける
main(説明しませんので先述のサイトを見ていただければと存じます)
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;
use std::fs;
use std::thread;
fn main() {
// tcp接続をリッスンする。
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// listenerで来たクライアントからのリクエストをfor文で処理し、確認して文をprintする
for stream in listener.incoming() {
let _stream = stream.unwrap();
handle_connection(_stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// データ保持のためのbufferを用意、1024Byteとしている。0が1024個の配列。
// 十分な大きさとしているが、本来の実装では動的にバッファーを取れるようにする必要あり
let mut buffer = [0; 1024];
// // TcpStreamで読み取った値をbufferに保存する
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep3sec = b"GET /sleep3sec HTTP/1.1\r\n";
let sleep2sec = b"GET /sleep2sec HTTP/1.1\r\n";
let sleep10sec = b"GET /sleep10sec HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep3sec) {
thread::sleep(Duration::from_secs(3));
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep2sec) {
thread::sleep(Duration::from_secs(2));
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep10sec) {
thread::sleep(Duration::from_secs(10));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
status_line,
contents.len(),
contents
);
// byteに変換し、writeメソッドで接続に送信する
stream.write(response.as_bytes()).unwrap();
// flushで待機を行なって送信が終わるのを待つ
stream.flush().unwrap();
// bufferの値を文字列に変更する。lossyは読めない文字列を変換する
// println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
マルチスレッドサーバーを立てる
手順
- tcpListenerを127.0.0.1:7878に設定
- ThreadPool関数でThread数を指定して並列処理ができるスレッドを作成
- listenerでクライアントからのリクエストを処理
- ThreadPool関数にhandle_connectionを渡してリクエスト分のスレッドにhandle_connectionの実行権限を与える。
- ThreadPool内で並列処理を行うためのライブラリが作成したsenderを使ってreceiverを持ったスレッド内のworkerにリクエストの処理を送信する。
- workerは受け取ったリクエストを処理していく(終わった段階で順次次のリクエストを処理する)
main.rs
extern crate webserver;
use webserver::ThreadPool;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;
use std::fs;
use std::thread;
fn main() {
// tcp接続をリッスンする。
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(6);
// listenerで来たクライアントからのリクエストをfor文で処理し、確認して文をprintする
for stream in listener.incoming() {
let _stream = stream.unwrap();
pool.execute(|| {
handle_connection(_stream);
});
// thread::spawn(|| {
// handle_connection(_stream);
// });
}
}
// fn main() {
// // tcp接続をリッスンする。
// let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// // listenerで来たクライアントからのリクエストをfor文で処理し、確認して文をprintする
// for stream in listener.incoming() {
// let _stream = stream.unwrap();
// handle_connection(_stream);
// }
// }
// streamの内部状態が変化するためmutで可変とする
fn handle_connection(mut stream: TcpStream) {
// データ保持のためのbufferを用意、1024Byteとしている。0が1024個の配列。
// 十分な大きさとしているが、本来の実装では動的にバッファーを取れるようにする必要あり
let mut buffer = [0; 1024];
// // TcpStreamで読み取った値をbufferに保存する
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep3sec = b"GET /sleep3sec HTTP/1.1\r\n";
let sleep2sec = b"GET /sleep2sec HTTP/1.1\r\n";
let sleep10sec = b"GET /sleep10sec HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep3sec) {
thread::sleep(Duration::from_secs(3));
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep2sec) {
thread::sleep(Duration::from_secs(2));
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep10sec) {
thread::sleep(Duration::from_secs(10));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
status_line,
contents.len(),
contents
);
// byteに変換し、writeメソッドで接続に送信する
stream.write(response.as_bytes()).unwrap();
// flushで待機を行なって送信が終わるのを待つ
stream.flush().unwrap();
// bufferの値を文字列に変更する。lossyは読めない文字列を変換する
// println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
lib.rs
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::sync::mpsc;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(_size: usize) -> ThreadPool {
assert!(_size > 0);
// mpscで並列処理をするためのチャンネルを作成する。senderはメッセージを送る側。reciverは受け取る側になる
let (sender, receiver) = mpsc::channel();
// Arc<Mutex<Receiver<Job>>>とすることでreceiverに所有権の複製とその所有権をもつスレッドで更新ができることを許す。
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(_size);
for id in 0.._size {
// workersに所有権の複製をしたreceiverを複数作成しpushする
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
println!("{:?}",workers.len());
ThreadPool{workers,sender}
}
pub fn execute<F>(&self, f: F)
where F: FnOnce() + Send + 'static,
{
// ヒープメモリにデータを格納する
// (ヒープメモリでは任意のメモリ領域を確保しその先頭ポインタでアクセスするためサイズが決まってない場合に有用)
let job = Box::new(f);
// senderを用いてjobをreceiverにsendする
println!("test");
self.sender.send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
// joinHandleを使用することで現在の処理が終わるまで現在のスレッドをブロックして占有する。
// while letでreceiverが受け取ったジョブがなくなるまでジョブを継続的に受け取る
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker{id,thread}
}
}
ここから説明です
大いに参考にしたサイト様(コードも拝借させていただいています))
ArcとMutexについて(以下のコードについてです
let receiver = Arc::new(Mutex::new(receiver));
receiverは複数のスレッド間で使用されるクライアントからのレスポンス処理をsenderから受け取るものです。
ここでは、スレッドでメッセージを受け取るreceiverをArcとMutexというオブジェクトでラッパーしています。
なぜそうしないといけないのかを説明します。
Arc
Rustでは所有権という概念があります
例えば、以下のような場合はコンパイルエラーになります。
use std::thread;
fn main()
{
let v = vec![1,2,3];
//threadでvをプリントする
let hdl0 = thread::spawn(move || {
println!("{:?}", v);
});
//threadでvをプリントする
let hdl1 = thread::spawn(move || {
println!("{:?}", v);
});
hdl0.join().unwrap();
hdl1.join().unwrap();
}
これはvの所有権をhdl0が持ってしまうためhdl1はその値を所有できないためです。
そのためArcを使います
Arcを使用すると所有権をコピーし値の参照ができるようになります。
use std::thread;
fn main(){
let v = Arc::new(vec![1,2,3]);
// vec![]の所有権のクローン
let v0 = Arc::clone(&v);
let hdl0 = thread::spawn(move || {
println!("{:?}", v);
});
let hdl1 = thread::spawn(move || {
// クローンした所有権を使用して参照を行い値を読む
println!("{:?}", v0);
});
hdl0.join().unwrap();
hdl1.join().unwrap();
}
Mutex
しかし、この状態ではvの変更はできません。あくまで参照権限があるだけだからです。
これを変更できるように解決するものがMutexです。
Mutexは所有権を持っているスレッドに対して一時的に変更権限を与えます。
ここでの一時的というのはカラオケのマイクを考えてもらうとわかりやすいです。
カラオケのマイクがひとつしかない場合、歌を歌える人は一人だけです。そのため、マイクを持っている時のみ歌が歌えます。しかし、カラオケで同じ部屋にいる人たちは歌えないわけではないです。お金を払っているのでそのマイクを持つことができます。
Mutexはここでいうマイクを持たせるだけではなく使わせることができるように権限を与えるようなものです。
マイクを受け取った人が歌を歌い、それ以外の人は自分の番をまち、順番が来たらマイクを受け取り歌を歌う。
権限を持つ人たちの間で変更できる権利を受け渡していくことで、参照するだけでなく実行したりすることができるようになります。
このようにしてレシーバーを使いまわせるようにしているわけです。
use std::thread;
fn main()
{
let v = Arc::new(Mutex::new(vec![1,2,3]));
// vec![]の所有権のクローン
let v0 = Arc::clone(&v);
let v1 = Arc::clone(&v);
let hdl0 = thread::spawn(move || {
//vを書き換えることが可能になる。(裏では一つのスレッドのみが変更できる)
v0.push(4);
println!("{:?}", v);
});
let hdl1 = thread::spawn(move || {
v0.push(5);
// クローンした所有権を使用して参照を行い値を読む
println!("{:?}", v0);
});
hdl0.join().unwrap();
hdl1.join().unwrap();
}
以上の二つはスマートポインタというものに該当する。
スマートポインタはメモリのそのオブジェクトを使用するときだけメモリを取り、終わった後にそれを自動的に破棄してくれます。
スマートポインタは、ポインタのように振る舞うだけでなく、追加のメタデータと能力があるデータ構造です。 スマートポインタという概念は、Rustに特有のものではありません。スマートポインタは、C++に端を発し、 他の言語にも存在しています。
参照はデータを借用するだけのポインタなのです。 対照的に多くの場合、スマートポインタは指しているデータを所有します
Arcが参照のためのスマートポインタ、Mutexが所有のためのスマートポインタと言える(と思う)
トレイト境界とライフサイクル境界について
以下の部分です
pub fn execute<F>(&self, f: F)
where F: FnOnce() + Send + 'static,
{...
}
トレイトの概念
rustにはトレイトというものがあります。
これはある型に対しての共通の振る舞いを定義するものです。
例えば、以下のように形にある関数を使用することを指定できます
// 型の定義
pub struct Tweet {
pub username: String,
pub content: String,
pub reply: bool,
pub retweet: bool,
}
// トレイトを指定する
impl Summary for Tweet {
fn summarize(&self) -> String {
format!("{}: {}", self.username, self.content)
}
}
上記の関数ではジェネリクスを使って任意の型の引数を受け取ることを指定しています
fn execute<F>(&self, f: F)
この型は何でも良いのですが、ある型のトレイトで指定したメソッドを使う関数だからトレイトに絶対このメソッドが入っているというような指定をしたいときがあるかもしれません。
そのような時に役立つのがトレイト境界です
トレイト境界
トレイト境界は上のコードの中でwhereの部分になります。
where F: FnOnce() + Send + 'static
このコードではexcuteの引数の値が、FnOnce()とSendのトレイトを実装している必要があることを示しています。
ライフサイクル境界
では'staticとは何なのでしょうか?
これがライフサイクル境界というものです
rustにはライフサイクルという概念があります。
これは、ある参照した変数がいつまで参照されるかを示したものです。
例えば、以下のような場合、vのライフサイクルはtest関数内ということになります
つまり、test関数の処理が終わった段階でこの変数は死にます。
そのため、基本的に変数はある関数内でしか使えません。
fn test(){
let v = 0
// vをreturnする(vのライフサイクルはこの関数内なのでエラーになる)
v
}
しかしstaticをつけると使えるようになります
fn test() -> &'static i32{
let v: &'static i32 = &0;
// vをreturnする(staticであることを指定しているので使える)
v
}
fn main(){
println!("{}",test());
}
これはstaticがそのプログラムが走っている間ずっと有効なライフサイクルであることを指定するものであるためです。
staticライフサイクル境界
さて今回の処理のstaticに戻ります
where F: FnOnce() + Send + 'static
上のコードではトレイト境界と同じ部分に'staticが存在しています。
これは、渡される引数のトレイトがそのプログラムが走っている間ずっと有効なライフサイクルを持つ参照のみ持つことを指定しているものになります。
例えば以下のような場合です(以下のコードはこちらのブログから拝借させていただきました)
// 'static ライフタイム境界を満たす型Tなら何でも受け付ける
fn i_need_static_bound_type<T: 'static>(v: T) {}
// 'staticライフサイクルの参照だけを持つことを指定
struct IHaveStaticRef(&'static str);
fn main() {
i_need_static_bound_type(IHaveStaticRef("abc"));
}
逆に以下はエラーです
// 'static ライフタイム境界を満たす型Tなら何でも受け付ける
fn i_need_static_bound_type<T: 'static>(v: T) {}
// 'a というライフタイムの参照だけ含む
struct IHaveNonStaticRef<'a>(&'a str);
fn main() {
{
// local_stringのライフサイクルはこの{}スコープ内であるため'aはこのスコープ内のライフサイクルになる
let local_string: String = format!("abc");
i_need_static_bound_type(IHaveNonStaticRef(&local_string));
}
}
つまり以下のコードの'staticは取りうる引数の中でプログラミングで走っている時に永続でライフサイクルをもつ参照のみをしている引数しか取らないことを指定しているわけです。
where F: FnOnce() + Send + 'static
まとめ
マルチスレッドのサーバー構築の例をもとにrustの概念を説明しました。
難しいと思うと同時に、どの概念も低級言語で使われるものであるため、自分がいかに内部処理を理解せずにプログラミングをしているか思い知らされました。
Discussion