🦀

RustでマルチスレッドのWebサーバを立ち上げ、example.comのHTMLを返す

2022/05/31に公開

やりたかったこと

Rustで立ち上げたWebサーバーの http://localhost:7878/ にアクセスしたら、example.comHTMLを表示したい。

背景

RustでWebアプリケーションの開発をしたいという思いがあり、The Rust Programming Language 日本語版にて、マルチスレッドのWebサーバ構築までは実装できました。

次のステップの練習として、http://localhost:7878/ にアクセスしたら、example.comを表示できるようになりたいと考えました。

マルチスレッドのWebサーバ構築

The Rust Programming Language 日本語版にわかりやすく書いてあるので、参考にしてください。

ファイル構成

📦src
 ┣ 📂bin
 ┃ ┗ 📜main.rs
 ┗ 📜lib.rs

src/bin/main.rs

今回The Rust Programming Language 日本語版にてマルチスレッドのWebサーバ構築をした後に、変更したのは以下3箇所です。

http_example_get関数の呼び出し
http_example_get関数は、http://example.com にアクセスして内容を取得する。
handle_connection関数内で同期関数として呼ぶ。

main.rs
fn handle_connection(mut stream: TcpStream) {
  // ... 省略
  let body = http_example_get();
  // ... 省略
}

http_example_get関数の実装
http://example.com にアクセスして内容を取得する処理を実装。
非同期関数のため、関数定義にはasyncを付与。

main.rs
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[tokio::main]
// async処理はランタイムの上でしか動かないから、tokioランタイムを準備
async fn http_example_get() -> Result<String> {
    let resp = reqwest::get("https://www.example.com/").await?;
    let body = resp.text().await?;
    Ok(body)
}

http://example.comHTMLをRustWebサーバーのレスポンスとして返す
追加したのは、★部分のみです。

main.rs
fn handle_connection(mut stream: TcpStream) {
  // ... 省略
  let body = http_example_get();
  // ★http_example_get関数の戻り値をcontentsに格納
  let contents = match body {
  Ok(v) => v,
  Err(_) => panic!("エラー")
  };

  // 以下変更なし
  let response = format!("{}{}", status_line, contents);

  // TcpStream::write関数に文字列のバッファを渡すとクライアント側に送信する
  stream.write(response.as_bytes()).unwrap();
  // flush:バイトが全て接続に書き込まれるまでプログラムが継続するのを防ぐ
  stream.flush().unwrap();
}

変更した箇所は、以上となります。

src/bin/main.rs

わかりやすいように、ソースコード全体を載せます。

main.rs
extern crate a_chat_api;
use a_chat_api::ThreadPool;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    // TcpListenerはサーバーが公開しているIPアドレスとポートへクライアントが接続する許可を与える
    // TcpListenerのbind関数にサーバ側のIPアドレスとオープンされているポート番号を指定する
    // 許可されたIPアドレスと公開されたポートである場合、そのポートへの接続をリクエストしたクライアントをキャッチするリスナーオブジェクトができる。
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    let pool = ThreadPool::new(4);

    //  TcpListenerのincomingメソッドは、一連のストリームを与えるイテレータを返す
    for stream in listener.incoming() {
        // streamは内部状態を変更するためmutをつけてあげる
        // 以下で成功した時点でTcpListenerの役目は終わりで、後はTcpStreamを扱うだけとなる
        // TcpListenerはこのまま再利用可能で、他のクライアントをaccept関数で受けるようにできる
        let stream = stream.unwrap();

        // 各ストリームに対して新しいスレッドを立ち上げる
        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    // この時点で、streamは既に外部からアクセスしてきたクライアントと会話ができる状態
    // クライアント側が何らかの文字列を送信した場合、それがTcpStreamオブジェクト内に保持される
    let mut buffer = [0; 1024]; // バッファを作成
    // クライアントから送られてきたデータを読み込むには、TcpStream::read関数を使う
    stream.read(&mut buffer).unwrap(); // クライアントが送信したものを受信するまで待機

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let status_line = if buffer.starts_with(get) {
        "HTTP/1.1 200 OK\r\n\r\n"
    } else if buffer.starts_with(sleep){
        thread::sleep(Duration::from_secs(5));
        "HTTP/1.1 200 OK\r\n\r\n"
    } else {
        "HTTP/1.1 404 NOT FOUND\r\n\r\n"
    };

    let body = http_example_get();
    let contents = match body {
        Ok(v) => v,
        Err(_) => panic!("エラー")
    };

    let response = format!("{}{}", status_line, contents);

    // TcpStream::write関数に文字列のバッファを渡すとクライアント側に送信する
    stream.write(response.as_bytes()).unwrap();
    // flush:バイトが全て接続に書き込まれるまでプログラムが継続するのを防ぐ
    stream.flush().unwrap();
}

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[tokio::main]
// async処理はランタイムの上でしか動かないから、tokioランタイムを準備
// awaitを使う関数の上には必ず「#[tokio::main]」を設置
async fn http_example_get() -> Result<String> {
    let resp = reqwest::get("https://www.example.com/").await?;
    let body = resp.text().await?;
    Ok(body)
}

src/lib.rs

The Rust Programming Language 日本語版の内容から変更したところはありません。

lib.rs
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;


trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F:FnOnce()> FnBox for F {
    fn call_box(self: Box<F>){
        (*self)()
    }
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// ThreadPoolはチャネルを生成し、チャネルの送信側に就く
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        // 複数のsender、単独のreceiver
        let (sender, receiver) = mpsc::channel();

        // Arc<Mutex<T>>: 複数のスレッドで所有権を共有しつつ、スレッドに値を可変化させる
        // Rc: 単独の値が複数の所有者を持つ場合があり、Rc<T>型は、値がまだ使用中かどうか決定する値への参照の数を追跡する
        // Arc: Rc<T>はスレッド間で共有するには安全ではない。Arc<T>はスレッドセーフに行える。
        // ただ、スレッド安全性は本当に必要な時だけ支払いたいパフォーマンスの犠牲とともに得られるものだから、標準ライブラリはRc<T>を使っている
        // Mutex: どんな時も1つのスレッドにしか何らかのデータへのアクセスを許可しない
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            // スレッドを生成してベクタに格納する
            // スレッドプールにJoinHandle<()>インスタンスのベクタを格納する代わりに、Worker構造体のインスタンスを格納し
            // 各Workerが単独のJoinHandle<()>インスタンスを格納する
            // Workerに実行するコードのクロージャを取り、既に走っているスレッドに実行してもらうために送信するメソッドを実装する
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        ThreadPool {
            workers,
            sender
        }
    }

    pub fn execute<F>(&self, f:F)
    // where句を使った明確なトレイト境界では、ジェネリック型に対して「このトレイトを実装していなければならない」という成約を課すもの
    // トレイト境界により、ジェネリック型は指定されたトレイトのメソッド等を使用できるようになる
    where
        F: FnOnce() + Send + 'static
    {
        // 各クロージャを保持するBoxに対してJob型エイリアスを生成し、そこからチャネルに仕事を送信する
        let job = Box::new(f);
        self.sender.send(job).unwrap();

    }
}

type Job = Box<dyn FnBox + Send + 'static>;


// ThreadPoolからスレッドにコードを送信する責任を負うWorker構造体
// Workerはチャネルの受信側
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// Workerのスレッドで仕事を受け取り、実行する
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // thread::spawn: クロージャを渡して、スレッドを立ち上げる
        let thread = thread::spawn(move||{
            loop {
                // recv():チャネルからJobを受け取る
                let job = receiver.lock().unwrap().recv().unwrap();

                // ワーカー{}は仕事を得ました; 実行します
                println!("Worker {} got a job; executing.", id);
                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

起動コマンド

$ cargo run

全体の感想

私が詰まったのは、Rustのasync/awaitFutureをしっかり勉強したことがなく、非同期処理の実装の仕方がわからなかったことです。

まずは、非同期処理の学ぶべく、以下のサイトで勉強しました。
Rust の Future に入門した
Async/Await

特に興味深かったのは以下3点です。

  • Futureはランタイムを利用もしくは実装しないと処理されない(poll関数を呼ばないといけないから)。故に、今回はtokioランタイムを使用し、非同期処理を書きたい関数定義の上には必ず#[tokio::main]という注釈を入れた
  • 関数を async fn のように非同期として宣言すると、その中で .await 構文を使うことができる
  • Rust の Futureawait したタイミングではじめて実行される

また、Futureを勉強する中で、Pinについて疑問に思い、以下のYoutubeで勉強しました。

Pinを知るにはRustを追わねば - Writing an OS in Rust 輪読会 (by hsjoihs)

とても詳しく、わかりやすく説明いただいているので理解に困っている方はぜひ御覧ください。

Discussion