Open12

cloudflare workersで動作するrustプログラム実装

yunayuna

安価で、動的に設定が変更できるロードバランサーを作りたく
いろんな技術スタックを検証しています。

今回は、cloudflare workers上で動かすrust (webassembly)で、
ロードバランサーが作れるか検証してみます。

選択理由

  • cloudflare workersは、小さなアプリ向けの無料プランがある
  • KV(key value) storeが備わっていて、動的な設定変更が用意にできる
  • rustでの実装が可能

検証ポイント

  • rustでwebassemblyを作ったことがないので、どんな制約があるのか
  • cloudflare workersでできることと、制約

上記をある程度把握することで、どんなプロダクトに使えるか、ざっくりしたイメージを創りたい。

他の候補

yunayuna

前提知識と方針

workersへのデプロイやconfig操作などは、Wrangler というtypescriptライブラリを通して行います
wranglerなどのjs/typescriptを扱うためにbun をエンジンとして利用します
rustは、webassemblyとしてデプロイを行い、workers上で動作させます

やったこと

前提

開発環境構築

#プロエジェクトディレクトリ作成
mkdir wasm-lb

# bun初期化(package名などは、全てデフォルトでEnter)
bun init

# 必要モジュールを入れる
bun add -D wrangler

# プロジェクトをテンプレから作成して移動
bun wrangler generate my-project https://github.com/cloudflare/workers-sdk/templates/experimental/worker-rust

cd my-project

生成されたプロジェクト内のdependenciesを最新化

package.json
{
	"name": "template-worker-rust",
	"version": "0.0.0",
	"private": true,
	"scripts": {
		"deploy": "wrangler deploy",
		"dev": "wrangler dev --local"
	},
	"devDependencies": {
		"wrangler": "^3.21.0"
	}
}
Cargo.toml

[dependencies]
worker = "0.0.18"

初期のモジュール本体はこちら

lib.rs
use worker::*;

#[event(fetch)]
async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
    Response::ok("Hello, World!")
}
#開発環境での稼働(my-projectディレクトリ内で)
bun run dev

[INFO]: ⬇️  Installing wasm-bindgen...
[INFO]: Optional fields missing from Cargo.toml: 'description', 'repository', and 'license'. These are not necessary, but recommended
[INFO]: ✨   Done in 0.19s
[INFO]: 📦   Your wasm pkg is ready to publish at /home/masatoyuna/Project/wasm-lb/wasm-lb-rs/build.

  shim.mjs  12.4kb

⚡ Done in 5ms
⎔ Starting local server...
[wrangler:inf] Ready on http://localhost:44615

動きました。
http://localhost:44615/

-> Hello, World!

デプロイ

bun run deploy
yunayuna

もう少し色々触ってみる

本家サイトを参考に、hello worldの次の処理を実装してみる。

アクセス元のメソッド、パス、ロケーション(緯度経度)、region(東京、とか)がわかったり、
Postされたファイルのバイト数を計算して返したり、

lib.rs
use worker::*;

#[event(fetch)]
pub async fn main(mut req: Request, env: Env, _ctx: worker::Context) -> Result<Response> {
    console_log!(
        "{} {}, located at: {:?}, within: {}",
        req.method().to_string(),
        req.path(),
        req.cf().coordinates().unwrap_or_default(),
        req.cf().region().unwrap_or("unknown region".into())
    );

    if !matches!(req.method(), Method::Post) {
        return Response::error("Method Not Allowed", 405);
    }

    if let Some(file) = req.form_data().await?.get("file") {
        return match file {
            FormEntry::File(buf) => {
                Response::ok(&format!("size = {}", buf.bytes().await?.len()))
            }
            _ => Response::error("`file` part of POST form must be a file", 400),
        };
    }

    Response::error("Bad Request", 400)
}

router実装したり、色々と便利なモジュールが揃っているみたい
(exampleで、tokioがdev-dependenciesに含まれてたのでどうやら必要らしい)

Cargo.toml

[dev-dependencies]
tokio = { version = "1.35", features = ["full"] }

lib.rs
use serde::{Deserialize, Serialize};
use worker::*;

#[derive(Debug, Deserialize, Serialize)]
struct GenericResponse {
    status: u16,
    message: String,
}

#[event(fetch)]
async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
    Router::new()
        .get_async("/foo", handle_get)
        .post_async("/bar", handle_post)
        .delete_async("/baz", handle_delete)
        .run(req, env)
        .await
}

pub async fn handle_get(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
    Response::from_json(&GenericResponse {
        status: 200,
        message: "You reached a GET route!".to_string(),
    })
}

pub async fn handle_post(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
    Response::from_json(&GenericResponse {
        status: 200,
        message: "You reached a POST route!".to_string(),
    })
}

pub async fn handle_delete(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
    Response::from_json(&GenericResponse {
        status: 200,
        message: "You reached a DELETE route!".to_string(),
    })
}
yunayuna

ロードバランサーの要件整理

まず、ここでロードバランサーの実装要件を整理する

要件

  • layer7(http) でヘッダー情報に応じて、予め設定しておいたサーバーへ処理を振り分ける
  • 振り分け先のサーバー構成や、ヘッダーによる分岐条件は、api経由で動的に変更できる

設計

  • 分岐先のサーバー情報を、定期的に(恐らく比較的レイテンシ遅延が大きい)オブジェクトから読み取り、比較的レイテンシが低いと思われる場所(kv?)に保存する
  • リクエストに対し、ヘッダーを解析してkvを元に分岐先のサーバーを決定し、リクエストを再送信する。
  • 再送信したリクエストからレスポンスを受取り、元のリクエストに対してレスポンスを返す

課題

cloudflare workersの pricingを観ると、フリーで使える機能は以下の通り

  • Request: 100,000 per day
  • CPU time: 10 milliseconds of CPU time per invocation
  • 1GB - Key-valueストレージスペース
    (KV)
  • 10万 - キー値の読み取り(1日あたり)
  • 1,000 - キー値の書き込み(1日あたり)
  • 1,000 - キー値の削除(1日あたり)
  • 1,000 - キー値のリスト(1日あたり)

KVは問題ないが、1リクエストに対してCPU利用時間が10ms、これはフリーで使うのは無理か?
となると有料版だが、

  • 5$ / month for subscription (定額料金)
  • Request: 10 million included per month +$0.30 per additional million (これは大丈夫そう)
  • 30 million CPU milliseconds incude per month (これ足りるかはやってみないと分からない)

なんだかんだで、普通のAWSロードバランサー並の料金はかかる気がする。

yunayuna

現段階の考察

  • workersは、エッジで完結させる比較的軽微なアプリケーションに向いている
  • kvへのアクセス速度が数msでないとロードバランサーとしての利用は難しいと思われる
  • kv以外で、動的情報を変数などを使って、瞬時に取得する方法はある?
  • フリー枠でロードバランサーとして使うのは無理?(10 milliseconds of CPU time per invocation って、await中の時間は入らない?)
yunayuna

いくつかの課題

cloudflare worrkers上のリクエストは worker::Requestで扱う必要がある。
また、レスポンスはworker::Response を返す。
本来、hyper::Request / hyper::Responseなどで扱えると、その後のリクエストにそのまま使えて便利なのだが、
これはwebAssemblyからデータを受信するために独自の処理が必要なため、現状、worker::Requestである必要があるため、自分で詰め直さなければいけない。

※議論もされてるが、現時点では未対応
https://github.com/cloudflare/workers-rs/issues/13

yunayuna

とりあえずサンプルなどを参考にしながら書いたhyper0.14版はこちら。
通常のapi requestは捌ける。
SSE(server sent events)やファイルアップロード・ダウンロードなどのstream処理やwebsocketは未確認。

※注
開発時の localhost:xxx へのアクセスは、httpsとして転送されるので、ssl認証でエラーが生じる。
接続先でssl認証を行わないようにするか、httpで転送することで回避できる。

        .secure_transport(SecureTransport::Off)
        .connect(SERVER_IP, 80)?;
Cargo.toml
[package]
name = "worker-rust"
version = "0.1.0"
edition = "2021"

# https://github.com/rustwasm/wasm-pack/issues/1247
[package.metadata.wasm-pack.profile.release]
wasm-opt = false

[lib]
crate-type = ["cdylib"]

[dependencies]
cfg-if = "1.0.0"
serde = "1.0.193"
worker = { version = "0.0.18" }
tokio = { version = "1.35", features = ['io-util', 'macros'] }
hyper = { version = "0.14.27", default-features = false, features = [
    'http1',
    'http2',
    'client',
] }
console_error_panic_hook = { version = "0.1.7" }
wasm-bindgen-futures = "0.4"
tracing = "0.1.40"
tracing-web = "0.1.3"
tracing-subscriber = { version = "0.3", features = [
    'time',
    'json',
    "env-filter",
    "registry",
    "fmt",
    "std",
] }
time = { version = "0.3", features = ['wasm-bindgen'] }

lib.rs
use std::str::Utf8Error;
use tracing_subscriber::fmt::format::Pretty;
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
use tracing_web::{performance_layer, MakeConsoleWriter};
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};

use worker::*;
use tracing::{error, info, Level};

pub use console_error_panic_hook::set_once as set_panic_hook;

const SERVER_DOMAIN: &str = "sample.com";
const SERVER_IP: &str = "111.111.111.111"; //対象サーバーのip

async fn make_request(
    mut sender: hyper::client::conn::SendRequest<hyper::Body>,
    request: hyper::Request<hyper::Body>,
) -> Result<Response> {
    // Send and recieve HTTP request
    let hyper_response = sender
        .send_request(request)
        .await
        .map_err(map_hyper_error)?;

    // Convert back to worker::Response
    let hyper_headers = hyper_response.headers().clone();
    let buf = hyper::body::to_bytes(hyper_response)
        .await
        .map_err(map_hyper_error)?;
    let body = buf.as_ref().to_vec();
    let mut response = Response::from_bytes(body)?;
    for (name, value) in hyper_headers.iter() {
        info!("response header: {:?}: {:?}", name, value);
        response.headers_mut().append(name.as_str(), value.to_str().unwrap())?;
    }
    // let text = std::str::from_utf8(&buf).map_err(map_utf8_error)?;
    
    // response.headers_mut().append("Content-Type", "text/html")?;
    Ok(response)
}

async fn conv(mut req: Request) -> Result<hyper::Request<hyper::Body>> {
    let mut url = req.url().unwrap();
    info!("url.domain(): {:?}", url.domain());
    info!("url.fragment(): {:?}", url.fragment());
    info!("url.host(): {:?}", url.host());
    info!("url.origin(): {:?}", url.origin());
    info!("url.password(): {:?}", url.password());
    info!("url.path(): {:?}", url.path());
    info!("method: {:?}", req.method().to_string().to_uppercase());
    // let cf = req.cf();
    

    let mut builder = hyper::Request::builder().method(req.method().to_string().to_uppercase().as_str()).uri(url.path());
    // builder.extensions_mut().insert(cf);
    for (name, value) in req.headers().into_iter() {
        if name != "host" {
            builder = builder.header(name.as_str(), value.as_str());
        } else if name == "host" {
            builder = builder.header(name.as_str(), SERVER_IP);
        }
    }
    // builder = builder.header("Host", "example.com");
    let body_bytes = req.bytes().await.unwrap();
    let body = hyper::Body::from(body_bytes);
    let hyper_request = builder.body(body).map_err(map_hyper_http_error)?;

    Ok(hyper_request)
}

// Multiple calls to `init` will cause a panic as a tracing subscriber is already set.
// So we use the `start` event to initialize our tracing subscriber when the worker starts.
#[event(start)]
fn start() {
    let fmt_layer = tracing_subscriber::fmt::layer()
        .json()
        .with_ansi(false) // Only partially supported across JavaScript runtimes
        .with_timer(UtcTime::rfc_3339()) // std::time is not available in browsers
        .with_writer(MakeConsoleWriter); // write events to the console
    let perf_layer = performance_layer().with_details_from_fields(Pretty::default());
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env().add_directive(Level::INFO.into()))
        .with(fmt_layer)
        .with(perf_layer)
        .init();
}


#[event(fetch)]
async fn main(mut _req: Request, _env: Env, _ctx: Context) -> worker::Result<Response> {
    set_panic_hook();

    let request = conv(_req).await.unwrap();
    let socket = Socket::builder()
        .secure_transport(SecureTransport::On)
        .connect(SERVER_IP, 443)?;

    let (sender, connection) = hyper::client::conn::handshake(socket)
        .await
        .map_err(map_hyper_error)?;

    tokio::select!(
        res = connection => {
            console_error!("Connection exited: {:?}", res);
            Err(worker::Error::RustError("Connection exited".to_string()))
        },
        result = make_request(sender, request) => result
    )
}

fn map_utf8_error(error: Utf8Error) -> worker::Error {
    worker::Error::RustError(format!("Utf8Error: {:?}", error))
}

fn map_hyper_error(error: hyper::Error) -> worker::Error {
    worker::Error::RustError(format!("hyper::Error: {:?}", error))
}

fn map_hyper_http_error(error: hyper::http::Error) -> worker::Error {
    worker::Error::RustError(format!("hyper::http::Error: {:?}", error))
}
yunayuna

ここまでで分かったこと

WebAssembly上で処理を行うため、OSやハードウェアに依存するモジュールは使えない

  • rustの、native-tlsを使ったモジュール(例:hyper-tls)は使えない(ので、hyper_tls::HttpsConnector が使えない)
  • tokioでfeatureをfullにすると、どれかのfeatureが引っかかりエラーが出る。必要なfeatureのみを個別で指定
yunayuna

結論

無料での利用は難しそう。

有料でもリクエストあたりの実時間:30秒以内、とのことで、
SSEとかロングポーリングがあるアプリケーションにはtimeoutを長く取れるというのも必要な要件になるので、そのようなアプリケーションのロードバランサーとして使うには不向きかもしれない。