Open5

Rustのaxum(tokio,hyper,tower)メモ

itoito

axumとは

RustのWebアプリケーションフレームワーク

Ergonomic and modular web framework built with Tokio, Tower, and Hyper

Tokio、Tower、Hyperで構築された人間工学に基づいたモジュール式ウェブフレームワーク

https://github.com/tokio-rs/axum

axumを支える技術

  • Tokio 非同期プログラミングをサポートするランタイム
  • Hyper TokioをベースとしたHTTPフレームワーク
  • Tower Rustでミドルウェアとなるサービスを書くためのフレームワーク

他のWebフレームワーク

Rust web framework comparison
https://github.com/flosse/rust-web-framework-comparison#server-frameworks

GitHubスター数
axtix-webも有名らしいがaxumの勢いがありそう
Star History Chart

Rust補足

手早くRustの構文覚えるには、Tour of Rustで慣れるのが良さそう
https://tourofrust.com/00_ja.html

他のドキュメント

- The Rust Programming Language 日本語版
https://doc.rust-jp.rs/book-ja/

- Rust入門 Zenn
https://zenn.dev/mebiusbox/books/22d4c1ed9b0003

プロダクションレベルのWeb APIコードの参考
(これはactix-web)
https://www.lpalmieri.com/posts/2020-06-06-zero-to-production-1-setup-toolchain-ides-ci/

itoito

tokioとは

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...

Rustで信頼性の高い非同期アプリケーションを書くためのランタイム。I/O、ネットワーク、スケジューリング、タイマー、...を提供します。

https://github.com/tokio-rs/tokio

非同期処理とはプログラムがタスクを完了を待たずに次のタスクを実行し、複数のタスクを並行して実行すること

Rustの非同期処理について

  • FutureはJavaScriptでいうPromiseみたいなもの?
  • async/await構文はFutureの糖衣構文。
  • asyncキーワードで関数を非同期関数に変換し、awaitは非同期処理を待つ。

Rustの非同期処理に関するドキュメント
https://rust-lang.github.io/async-book/

tokioサンプルコード

tokioを使ったTCPエコーサーバ

新規プロジェクト作成して、Cargo.tomlとmain.rsを更新

$ cargo new tokio-sample
Cargo.toml
Cargo.toml
[dependencies]
tokio = { version = "1.33.0", features = ["full"] }
log = "0.4"
env_logger = "0.10"
main.rs
// tokioを使ったTCPエコーサーバのサンプル
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

// #[tokio::main] はtokioランタイムで非同期コードを実行するためのエントリーポイントを定義する
// async fn mainは関数を非同期処理として定義する
// Result型は成功または失敗を表現する型。Rustのエラーハンドリングに関わる重要な機能
//   Box<dyn std::error::Error>は、さまざまなエラー型を一つの型として抽象化するための書き方
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Loggerのセットアップ
    // ログを表示したい場合は ```$ RUST_LOG=debug cargo run```で実行する
    env_logger::builder().format_timestamp(None).init();

    // TCPリスナーのバインド bindは非同期関数だがawaitで成功するまで待機
    // ?オペレータは、Result<T, E>がOK(T)成功の場合はTの値を抽出して変数にバインドし、
    //    Err(E)の場合は即座に関数からエラーを返して終了する。(main関数がErrを返すことになる)
    info!("Starting server on 127.0.0.1:8080");
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    // TCP接続を待って、接続のたびに新しい非同期タスクを生成するループ
    // 生成されたタスクは並行して実行されるので、複数のクライアントからの接続を同時に処理できる
    loop {
        // TCP接続を待ち受ける
        // awaitキーワードで一時停止して接続があるまで待ってる
        let (mut socket, addr) = listener.accept().await?;
        info!("Accepted connection from {}", addr);

        // tokio::spawn(async move { /*非同期で実行する処理*/ }); で非同期タスクを生成する
        // moveをつけとクロージャ内で使用される変数の所有権が、そのクロージャに移動することを示す
        //    この場合だと、socketとaddrの所有権がクロージャに移動する
        //    moveを使う理由は次の2点が考えられる
        //      1.クロージャと変数のライフタイムを一致させることでデータがタスク終了まで有効であることを保証する
        //      2.タスクが実行されている間に安全にアクセスできデータ競合のない並行処理を実現する
        tokio::spawn(async move {
            // 1024バイトのバッファを0で初期化して用意する
            let mut buf = [0; 1024];

            loop {
                // socket.readは非同期関数なのでawaitキーワードで一時停止してデータが届くのを待つ
                let n = match socket.read(&mut buf).await {
                    // 読み込みバイト数が0、つまり接続が閉じられたらループを抜ける
                    Ok(n) if n == 0 => {
                        info!("Connection from {} closed", addr);
                        return;
                    }
                    // 読み込み成功
                    Ok(n) => {
                        info!("Received {} bytes from {}", n, addr);
                        n
                    }
                    // 読み込み失敗
                    Err(e) => {
                        error!("Failed to read from {}; err = {:?}", addr, e);
                        return;
                    }
                };

                // socket.write_allはソケットに書き込む非同期処理
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    error!("Failed to write to {}; err = {:?}", addr, e);
                    return;
                } else {
                    info!("Sent {} bytes to {}", n, addr);
                }
            }
        });
    }
}

実行方法

実行

$ RUST_LOG=debug cargo run

動作確認

ncコマンドでテスト

$ nc 127.0.0.1 8080
hello
hello
world
world
^C

サーバ側ログ

[INFO  tokio_sample] Starting server on 127.0.0.1:8080
[INFO  tokio_sample] Accepted connection from 127.0.0.1:52795
[INFO  tokio_sample] Received 6 bytes from 127.0.0.1:52795
[INFO  tokio_sample] Sent 6 bytes to 127.0.0.1:52795
[INFO  tokio_sample] Received 6 bytes from 127.0.0.1:52795
[INFO  tokio_sample] Sent 6 bytes to 127.0.0.1:52795
[INFO  tokio_sample] Connection from 127.0.0.1:52795 closed
itoito

hyperとは

A fast and correct HTTP implementation for Rust.

Rust 用の高速かつ正確な HTTP 実装

https://github.com/hyperium/hyper

hyperはtokioを基本とした、非同期のHTTPサーバフレームワーク
tokioが非同期機能、hyperがHTTPレベルの機能

hyperサンプルコード

hyperを使ったURLのパス文字列を返すHTTPサーバ

Cargo.toml
Cargo.toml
[dependencies]
hyper = "0.14"
tokio = { version = "1", features = ["full"] }
env_logger = "0.10"
log = "0.4"
main.rs
// リクエストのパスを取得し、そのままレスポンスボディとして返すHTTPサーバのサンプル
use env_logger;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode};
use log::{error, info};
use std::convert::Infallible;

async fn echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
    // リクエストのパスを取得  to_owned() は String に変換するため
    let path = req.uri().path().to_owned();

    // レスポンスを作成
    let response = Response::builder()
        .status(StatusCode::OK)
        .body(Body::from(path))
        .unwrap();
    info!("Response: {:?}", response);
    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Loggerのセットアップ
    env_logger::builder().format_timestamp(None).init();

    // ローカルホストの8080ポートでリッスンするアドレス  intoはSocketAddrに変換するため
    let addr = ([127, 0, 0, 1], 8080).into();

    // サービスを作成する
    // make_service_fnは、リクエストを受け取った時に呼び出される関数を作成する。
    //    新しいTCP接続が確立されるたびにmake_service_fnは提供したクロージャを呼び出す
    // _connはクライアントからの新しい接続を表すオブジェクト
    let make_svc = make_service_fn(|_conn| {
        // echo関数を呼び出すサービスを作成する
        let svc = service_fn(echo);
        // async moveで非同期ブロック生成. moveしているのでsrvの所有権を奪う
        // Result<_, Infallible>はエラーが発生しないことを表す
        async move { Ok::<_, Infallible>(svc) }
    });

    // サーバインスタンス作成
    let server = Server::bind(&addr).serve(make_svc);
    info!("Listening on http://{}", addr);

    // サーバを実行する
    // エラーが発生した場合はエラーをログ出力して終了. ?はResultのエラーを返す
    server.await.map_err(|e| {
        error!("server error: {}", e);
        e
    })?;

    Ok(())
}

実行方法

実行

$ RUST_LOG=debug cargo run

動作確認

curlコマンドでテスト

$ curl -w "\n" 127.0.0.1:8080/
/
$ curl -w "\n" 127.0.0.1:8080/hello/world
/hello/world

サーバ側ログ

[INFO  hyper_sample] Listening on http://127.0.0.1:8080
[INFO  hyper_sample] Response: Response { status: 200, version: HTTP/1.1, headers: {}, body: Body(Full(b"/")) }
[INFO  hyper_sample] Response: Response { status: 200, version: HTTP/1.1, headers: {}, body: Body(Full(b"/hello/world")) }
itoito

towerとは

Tower is a library of modular and reusable components for building robust networking clients and servers.

Tower は、堅牢なネットワーキングクライアントとサーバーを構築するための、モジュール化された再利用可能なコンポーネントのライブラリです。

https://github.com/tower-rs/tower

tokioと同じとこが作ってる
Hyperの内部で使われてるミドルウェア的なもの

itoito

axumサンプルコード

axumを使ったHTTP APIのサンプル

公式GitHubにexamplesがあるが、ブランチを指定しないとバージョンが古くて動かないことがある
ver 0.6.xのサンプル https://github.com/tokio-rs/axum/tree/v0.6.x/examples/readme

Cargo.toml
Cargo.toml
[dependencies]
axum = "0.6.20"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.68"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
main.rs
use axum::{
    http::StatusCode,
    response::IntoResponse,
    routing::{get, post},
    Json, Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;

#[tokio::main]
async fn main() {
    // トレーシング(ログ、イベントトラッキング)の初期化
    tracing_subscriber::fmt::init();

    // アプリケーションの構築(ルーティングの設定)
    let app = Router::new()
        // `GET /` に対して `root`ハンドラを設定
        .route("/", get(root))
        // `POST /users` に対して `create_user`ハンドラを設定
        .route("/users", post(create_user));

    // アドレスのバインド
    // これはlet addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); とも書ける
    //   詳しくはFromトレイト、Intoトレイトで調べれば出てくる
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tracing::debug!("listening on {}", addr);

    // hyperを使ってアプリケーション実行
    //   axum::Serverは実際にはhyper::Serverと同等
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

// `GET /` に対するハンドラだが、単なる非同期関数であり、Webフレームワークと直接の依存はない
// &'static strはプログラムの生存期間全体で有効な文字列の型を表す
//    "Hello, World!"は文字列リテラルであり、コンパイル時にバイナリに埋め込まれるので、この型になる
async fn root() -> &'static str {
    "Hello, World!"
}

// `POST /users` に対するハンドラ
//   リクエストボディを受け取って、レスポンスボディを返す
// 引数の部分(Json(payload): Json<CreateUser>)はAxumがよしなにやってくれてる
// impl IntoResponseは、IntoResponseトレイトを実装している型を返すという意味
//   この関数の戻り値(StatusCode, impl IntoResponse)もIntoResponseトレイトを実装している
//   IntoResponseトレイトを実装している型は、axumがよしなにレスポンスに変換してくれる
//   トレイトは、抽象化というかインターフェース的なもの
async fn create_user(
    // リクエストボディをJSONとしてパースしてCreateUser構造体に変換する
    Json(payload): Json<CreateUser>,
) -> impl IntoResponse {
    // リエストボディを受け取って、レスポンスボディを作る
    let user = User {
        id: 1337,
        username: payload.username,
    };

    // (ステータスコード 201 Created, Jsonに変換されたuser)
    // このタプルはIntoResponseトレイトが実装されている
    (StatusCode::CREATED, Json(user))
}

// リクエストボディのデシリアライズ用の構造体
// リクエストボディはJSON形式だが、これを構造体に変換するためにDeserializeを使う
// #[derive(Deserialize)]を書くことでDeserializeトレイトが構造体に実装される
#[derive(Deserialize)]
struct CreateUser {
    username: String,
}

// レスポンスボディのシリアライズ用の構造体
// 構造体からJSON形式のレスポンスボディを作るためにSerializeを使う
#[derive(Serialize)]
struct User {
    id: u64,
    username: String,
}
実行方法

実行

$ RUST_LOG=debug cargo run

動作確認

curlコマンドでテスト

$ curl -w "\n" http://127.0.0.1:3000/
Hello, World!

$ curl -w "\n" -X POST -H "Content-Type: application/json" -d '{"username": "example_user"}' http://127.0.0.1:3000/users
{"id":1337,"username":"example_user"}

サーバ側ログ

2023-10-11T15:58:21.343739Z DEBUG axum_sample: listening on 127.0.0.1:3000
2023-10-11T15:58:32.843611Z DEBUG hyper::proto::h1::io: parsed 3 headers
2023-10-11T15:58:32.843660Z DEBUG hyper::proto::h1::conn: incoming body is empty
2023-10-11T15:58:32.843899Z DEBUG hyper::proto::h1::io: flushed 130 bytes
2023-10-11T15:58:32.844270Z DEBUG hyper::proto::h1::conn: read eof
2023-10-11T15:58:39.765773Z DEBUG hyper::proto::h1::io: parsed 5 headers
2023-10-11T15:58:39.765805Z DEBUG hyper::proto::h1::conn: incoming body is content-length (28 bytes)
2023-10-11T15:58:39.765888Z DEBUG hyper::proto::h1::conn: incoming body completed
2023-10-11T15:58:39.766086Z DEBUG hyper::proto::h1::io: flushed 150 bytes
2023-10-11T15:58:39.766265Z DEBUG hyper::proto::h1::conn: read eof