🐈

Rust でシングルWEBサーバーをマルチスレッド化

2021/12/24に公開

まず初めに

前回で作成したwebサーバーでは、リクエストを順番に処理をします。最初の接続が処理し終わるまで、次の処理は行わない状態です。
アクセス量が増えていくほど、連続的な実行は迅速に対応できなくなり、リクエストが完了するまでに待たなければいけなくなります。

実際にサーバーの処理を遅くしてみた

let sleep = b"GET /sleep HTTP/1.1\r\n";

ここでルーティングの設定をします。

 } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")

の部分で、/sleep にアクセスがあれば同じhello.htmlを返しますが、thread::sleep(Duration::from_secs(5)); の部分が5秒間サーバーの処理を一時停止しています。

main.rs
use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}

スレッドプールでスループットを向上させる

Rust初学者なので説明文は自己解釈に近いものがあります。

スレッドプールとは

待機し、タスクを処理する準備のできた一塊りの大量に生成されたスレッドのことを指します。
高速などの料金所で処理が追いつかず渋滞する車のイメージで僕はいます(間違えていたらすみません)。
プール内のスレッドは、大量の悪意のある攻撃から保護するためにスレッドはあえて小さい数字に制限し、処理の停止などの混乱を招くことを防ぎます。

やったこと

ここまでだと /sleep にアクセスして、5秒経つまで / にアクセスしていても / 側の処理はブロックされてします。
これからのコードはスレッド作成し並行処理で /sleep アクセスし処理中でも / にアクセスでは即時レスポンスを返させることができるようにする。

まずはsrc/main.rs

main.rsを作成したsrc/binに移動します。そうすると、ライブラリクレート(Goでいうパッケージ)がhelloディレクトリ内で主要クレートになります。

extern crate hello;
use hello::ThreadPool;

の部分で、useでhelloのThreadPool構造体をスコープします。

src/bin/main.rsのコードは以下です。

src/bin/main.rs
extern crate hello;
use hello::ThreadPool;
use std::thread;
use std::thread::Thread;
use std::time::Duration;

use std::fs::File;
use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;


fn main() {
    let listener = TcpListener::bind("127.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);  
        });
    }
}


fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];

    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "index.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "index.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let mut file = File::open(filename).unwrap();
    let mut contents = String::new();

    file.read_to_string(&mut contents).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

次にsrc/lib.rs

処理の順ですが

src/bin/main.rs
let pool = ThreadPool::new(4);

の部分でThreadPoolのメソッドnew()に4←生成するスレッド数を引数として渡します。

そして、assert!マクロを使用して、ThreadPoolインスタンスを返す前に0よりも小さければプログラムを終了します。

assert!(size > 0);

mpsc::channel関数はタプルを返し、1つ目の要素は、送信側、2つ目の要素は受信側になります。
スレッドプールがワーカーを生成する際に各ワーカーにチャンネルの受信側を渡します。受信側はワーカーが大量生産するスレッド内で使用したいことがわかっているので、クロージャ内でreceiver引数を参照します。

let (sender, receiver) = mpsc::channel();

ThreadPoolにはsenderを、Workerにはreceiverを渡します。
複数のスレッドで所有権を共有しつつ、 スレッドに値を可変化させるためには、Arc<Mutex<T>>を使用する必要があります。Arc型は、 複数のワーカーに受信者を所有させ、Mutexにより、1度に受信者から1つの仕事をたった1つのワーカーが受け取ることを保証するようです。
with_capacity関数ですが、これはVec::newと同じ作業をしつつ、ベクタに予めスペースを確保します。

let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

id番号を取り、idと空のクロージャで大量生産されるスレッドを保持するWorkerインスタンスを返すWorker::new関数を定義する。

for id in 0..size {
	workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ここでWorker構造体の方へ

Worker::new関数が定義され、スレッドが作成され、loopを使用し、ロックと仕事をブロックの外ではなく、内側で獲得することで、 lockメソッドが返すMutexGuardはlet job文が終わると同時にドロップされます。 これにより、複数のリクエストを並行で提供し、ロックはrecvの呼び出しの間は保持されるけれども、 job.call_boxの呼び出しの前には解放されることを保証します。

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
	let thread = thread::spawn(move || {
		loop {
			let job = receiver.lock().unwrap().recv().unwrap();

			println!("Worker {} got a job; executing.", id);

			job.call_box();
		}
	});

そしてWorkerインスタンスを返します。

Worker {
	id,
	thread,
}

for文内のpool.executeがmain.rsで定義され、

src/bin/main.rs
for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);  
        });
    }

ThreadPoolにsenderが返されます。

src/lib.rsの内容はこちらになります。

src/lib.rs
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::sync::mpsc;

pub struct ThreadPool {
	workers: Vec<Worker>,
	sender: mpsc::Sender<Job>,
}

trait FnBox {
	fn call_box(self: Box<Self>);
}

impl <F: FnOnce()> FnBox for F {
	fn call_box(self: Box<F>) {
		(*self)()
	}
}

// トレイトオブジェクトには`dyn`キーワードを追加しなければならないみたい
type Job = Box<dyn FnBox + Send + 'static>;

impl ThreadPool {

	pub fn new(size: usize) -> ThreadPool {
		assert!(size > 0);

		let (sender, receiver) = mpsc::channel();

		let receiver = Arc::new(Mutex::new(receiver));

		let mut workers = Vec::with_capacity(size);

		for id in 0..size {
			workers.push(Worker::new(id, Arc::clone(&receiver)));
		}

		ThreadPool {
			workers,
			sender,
		}
	}

	pub fn execute<F>(&self, f: F) 
		where
			F: FnOnce() + Send + 'static
	{
		let job = Box::new(f);

		self.sender.send(job).unwrap();
	}
}


struct Worker {
	id: usize,
	thread: thread::JoinHandle<()>,
}


impl Worker {
	fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
		let thread = thread::spawn(move || {
			loop {
				let job = receiver.lock().unwrap().recv().unwrap();

				println!("Worker {} got a job; executing.", id);

				job.call_box();
			    }
		});

		Worker {
			id,
			thread,
		}
	}
}

これで /sleep にアクセスして処理の間、 / にアクセスしても即レスポンスが返ってくるようになりました。
ありがとうございました。

Discussion