cloudflare workersで動作するrustプログラム実装
安価で、動的に設定が変更できるロードバランサーを作りたく
いろんな技術スタックを検証しています。
今回は、cloudflare workers上で動かすrust (webassembly)で、
ロードバランサーが作れるか検証してみます。
選択理由
- cloudflare workersは、小さなアプリ向けの無料プランがある
- KV(key value) storeが備わっていて、動的な設定変更が用意にできる
- rustでの実装が可能
検証ポイント
- rustでwebassemblyを作ったことがないので、どんな制約があるのか
- cloudflare workersでできることと、制約
上記をある程度把握することで、どんなプロダクトに使えるか、ざっくりしたイメージを創りたい。
他の候補
- aws lambda
- Google Cloud Run
この記事参考になりそう
https://qiita.com/manymanyuni/items/85ca8c642b78a7181baa - shuttle (https://www.shuttle.rs/)
参考にさせていただいた資料
workersのrustモジュール本家
workersのrust実装テンプレート
前提知識と方針
workersへのデプロイやconfig操作などは、Wrangler というtypescriptライブラリを通して行います
wranglerなどのjs/typescriptを扱うためにbun をエンジンとして利用します
rustは、webassemblyとしてデプロイを行い、workers上で動作させます
やったこと
前提
- bun のインストール
https://bun.sh/docs/installation - rust のインストール
https://www.rust-lang.org/tools/install
開発環境構築
#プロエジェクトディレクトリ作成
mkdir wasm-lb
# bun初期化(package名などは、全てデフォルトでEnter)
bun init
# 必要モジュールを入れる
bun add -D wrangler
# プロジェクトをテンプレから作成して移動
bun wrangler generate my-project https://github.com/cloudflare/workers-sdk/templates/experimental/worker-rust
cd my-project
生成されたプロジェクト内のdependenciesを最新化
{
"name": "template-worker-rust",
"version": "0.0.0",
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"dev": "wrangler dev --local"
},
"devDependencies": {
"wrangler": "^3.21.0"
}
}
[dependencies]
worker = "0.0.18"
初期のモジュール本体はこちら
use worker::*;
#[event(fetch)]
async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
Response::ok("Hello, World!")
}
#開発環境での稼働(my-projectディレクトリ内で)
bun run dev
[INFO]: ⬇️ Installing wasm-bindgen...
[INFO]: Optional fields missing from Cargo.toml: 'description', 'repository', and 'license'. These are not necessary, but recommended
[INFO]: ✨ Done in 0.19s
[INFO]: 📦 Your wasm pkg is ready to publish at /home/masatoyuna/Project/wasm-lb/wasm-lb-rs/build.
shim.mjs 12.4kb
⚡ Done in 5ms
⎔ Starting local server...
[wrangler:inf] Ready on http://localhost:44615
動きました。
-> Hello, World!
デプロイ
bun run deploy
もう少し色々触ってみる
本家サイトを参考に、hello worldの次の処理を実装してみる。
アクセス元のメソッド、パス、ロケーション(緯度経度)、region(東京、とか)がわかったり、
Postされたファイルのバイト数を計算して返したり、
use worker::*;
#[event(fetch)]
pub async fn main(mut req: Request, env: Env, _ctx: worker::Context) -> Result<Response> {
console_log!(
"{} {}, located at: {:?}, within: {}",
req.method().to_string(),
req.path(),
req.cf().coordinates().unwrap_or_default(),
req.cf().region().unwrap_or("unknown region".into())
);
if !matches!(req.method(), Method::Post) {
return Response::error("Method Not Allowed", 405);
}
if let Some(file) = req.form_data().await?.get("file") {
return match file {
FormEntry::File(buf) => {
Response::ok(&format!("size = {}", buf.bytes().await?.len()))
}
_ => Response::error("`file` part of POST form must be a file", 400),
};
}
Response::error("Bad Request", 400)
}
router実装したり、色々と便利なモジュールが揃っているみたい
(exampleで、tokioがdev-dependenciesに含まれてたのでどうやら必要らしい)
[dev-dependencies]
tokio = { version = "1.35", features = ["full"] }
use serde::{Deserialize, Serialize};
use worker::*;
#[derive(Debug, Deserialize, Serialize)]
struct GenericResponse {
status: u16,
message: String,
}
#[event(fetch)]
async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
Router::new()
.get_async("/foo", handle_get)
.post_async("/bar", handle_post)
.delete_async("/baz", handle_delete)
.run(req, env)
.await
}
pub async fn handle_get(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
Response::from_json(&GenericResponse {
status: 200,
message: "You reached a GET route!".to_string(),
})
}
pub async fn handle_post(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
Response::from_json(&GenericResponse {
status: 200,
message: "You reached a POST route!".to_string(),
})
}
pub async fn handle_delete(_: Request, _ctx: RouteContext<()>) -> worker::Result<Response> {
Response::from_json(&GenericResponse {
status: 200,
message: "You reached a DELETE route!".to_string(),
})
}
kvの操作事例
durable_objectの操作事例
ロードバランサーの要件整理
まず、ここでロードバランサーの実装要件を整理する
要件
- layer7(http) でヘッダー情報に応じて、予め設定しておいたサーバーへ処理を振り分ける
- 振り分け先のサーバー構成や、ヘッダーによる分岐条件は、api経由で動的に変更できる
設計
- 分岐先のサーバー情報を、定期的に(恐らく比較的レイテンシ遅延が大きい)オブジェクトから読み取り、比較的レイテンシが低いと思われる場所(kv?)に保存する
- リクエストに対し、ヘッダーを解析してkvを元に分岐先のサーバーを決定し、リクエストを再送信する。
- 再送信したリクエストからレスポンスを受取り、元のリクエストに対してレスポンスを返す
課題
cloudflare workersの pricingを観ると、フリーで使える機能は以下の通り
- Request: 100,000 per day
- CPU time: 10 milliseconds of CPU time per invocation
- 1GB - Key-valueストレージスペース
(KV) - 10万 - キー値の読み取り(1日あたり)
- 1,000 - キー値の書き込み(1日あたり)
- 1,000 - キー値の削除(1日あたり)
- 1,000 - キー値のリスト(1日あたり)
KVは問題ないが、1リクエストに対してCPU利用時間が10ms、これはフリーで使うのは無理か?
となると有料版だが、
- 5$ / month for subscription (定額料金)
- Request: 10 million included per month +$0.30 per additional million (これは大丈夫そう)
- 30 million CPU milliseconds incude per month (これ足りるかはやってみないと分からない)
なんだかんだで、普通のAWSロードバランサー並の料金はかかる気がする。
現段階の考察
- workersは、エッジで完結させる比較的軽微なアプリケーションに向いている
- kvへのアクセス速度が数msでないとロードバランサーとしての利用は難しいと思われる
- kv以外で、動的情報を変数などを使って、瞬時に取得する方法はある?
- フリー枠でロードバランサーとして使うのは無理?(10 milliseconds of CPU time per invocation って、await中の時間は入らない?)
いくつかの課題
cloudflare worrkers上のリクエストは worker::Request
で扱う必要がある。
また、レスポンスはworker::Response
を返す。
本来、hyper::Request
/ hyper::Response
などで扱えると、その後のリクエストにそのまま使えて便利なのだが、
これはwebAssemblyからデータを受信するために独自の処理が必要なため、現状、worker::Request
である必要があるため、自分で詰め直さなければいけない。
※議論もされてるが、現時点では未対応
とりあえずサンプルなどを参考にしながら書いたhyper0.14版はこちら。
通常のapi requestは捌ける。
SSE(server sent events)やファイルアップロード・ダウンロードなどのstream処理やwebsocketは未確認。
※注
開発時の localhost:xxx へのアクセスは、httpsとして転送されるので、ssl認証でエラーが生じる。
接続先でssl認証を行わないようにするか、httpで転送することで回避できる。
.secure_transport(SecureTransport::Off)
.connect(SERVER_IP, 80)?;
[package]
name = "worker-rust"
version = "0.1.0"
edition = "2021"
# https://github.com/rustwasm/wasm-pack/issues/1247
[package.metadata.wasm-pack.profile.release]
wasm-opt = false
[lib]
crate-type = ["cdylib"]
[dependencies]
cfg-if = "1.0.0"
serde = "1.0.193"
worker = { version = "0.0.18" }
tokio = { version = "1.35", features = ['io-util', 'macros'] }
hyper = { version = "0.14.27", default-features = false, features = [
'http1',
'http2',
'client',
] }
console_error_panic_hook = { version = "0.1.7" }
wasm-bindgen-futures = "0.4"
tracing = "0.1.40"
tracing-web = "0.1.3"
tracing-subscriber = { version = "0.3", features = [
'time',
'json',
"env-filter",
"registry",
"fmt",
"std",
] }
time = { version = "0.3", features = ['wasm-bindgen'] }
use std::str::Utf8Error;
use tracing_subscriber::fmt::format::Pretty;
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
use tracing_web::{performance_layer, MakeConsoleWriter};
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
use worker::*;
use tracing::{error, info, Level};
pub use console_error_panic_hook::set_once as set_panic_hook;
const SERVER_DOMAIN: &str = "sample.com";
const SERVER_IP: &str = "111.111.111.111"; //対象サーバーのip
async fn make_request(
mut sender: hyper::client::conn::SendRequest<hyper::Body>,
request: hyper::Request<hyper::Body>,
) -> Result<Response> {
// Send and recieve HTTP request
let hyper_response = sender
.send_request(request)
.await
.map_err(map_hyper_error)?;
// Convert back to worker::Response
let hyper_headers = hyper_response.headers().clone();
let buf = hyper::body::to_bytes(hyper_response)
.await
.map_err(map_hyper_error)?;
let body = buf.as_ref().to_vec();
let mut response = Response::from_bytes(body)?;
for (name, value) in hyper_headers.iter() {
info!("response header: {:?}: {:?}", name, value);
response.headers_mut().append(name.as_str(), value.to_str().unwrap())?;
}
// let text = std::str::from_utf8(&buf).map_err(map_utf8_error)?;
// response.headers_mut().append("Content-Type", "text/html")?;
Ok(response)
}
async fn conv(mut req: Request) -> Result<hyper::Request<hyper::Body>> {
let mut url = req.url().unwrap();
info!("url.domain(): {:?}", url.domain());
info!("url.fragment(): {:?}", url.fragment());
info!("url.host(): {:?}", url.host());
info!("url.origin(): {:?}", url.origin());
info!("url.password(): {:?}", url.password());
info!("url.path(): {:?}", url.path());
info!("method: {:?}", req.method().to_string().to_uppercase());
// let cf = req.cf();
let mut builder = hyper::Request::builder().method(req.method().to_string().to_uppercase().as_str()).uri(url.path());
// builder.extensions_mut().insert(cf);
for (name, value) in req.headers().into_iter() {
if name != "host" {
builder = builder.header(name.as_str(), value.as_str());
} else if name == "host" {
builder = builder.header(name.as_str(), SERVER_IP);
}
}
// builder = builder.header("Host", "example.com");
let body_bytes = req.bytes().await.unwrap();
let body = hyper::Body::from(body_bytes);
let hyper_request = builder.body(body).map_err(map_hyper_http_error)?;
Ok(hyper_request)
}
// Multiple calls to `init` will cause a panic as a tracing subscriber is already set.
// So we use the `start` event to initialize our tracing subscriber when the worker starts.
#[event(start)]
fn start() {
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_ansi(false) // Only partially supported across JavaScript runtimes
.with_timer(UtcTime::rfc_3339()) // std::time is not available in browsers
.with_writer(MakeConsoleWriter); // write events to the console
let perf_layer = performance_layer().with_details_from_fields(Pretty::default());
tracing_subscriber::registry()
.with(EnvFilter::from_default_env().add_directive(Level::INFO.into()))
.with(fmt_layer)
.with(perf_layer)
.init();
}
#[event(fetch)]
async fn main(mut _req: Request, _env: Env, _ctx: Context) -> worker::Result<Response> {
set_panic_hook();
let request = conv(_req).await.unwrap();
let socket = Socket::builder()
.secure_transport(SecureTransport::On)
.connect(SERVER_IP, 443)?;
let (sender, connection) = hyper::client::conn::handshake(socket)
.await
.map_err(map_hyper_error)?;
tokio::select!(
res = connection => {
console_error!("Connection exited: {:?}", res);
Err(worker::Error::RustError("Connection exited".to_string()))
},
result = make_request(sender, request) => result
)
}
fn map_utf8_error(error: Utf8Error) -> worker::Error {
worker::Error::RustError(format!("Utf8Error: {:?}", error))
}
fn map_hyper_error(error: hyper::Error) -> worker::Error {
worker::Error::RustError(format!("hyper::Error: {:?}", error))
}
fn map_hyper_http_error(error: hyper::http::Error) -> worker::Error {
worker::Error::RustError(format!("hyper::http::Error: {:?}", error))
}
ここまでで分かったこと
WebAssembly上で処理を行うため、OSやハードウェアに依存するモジュールは使えない
- rustの、native-tlsを使ったモジュール(例:hyper-tls)は使えない(ので、hyper_tls::HttpsConnector が使えない)
- tokioでfeatureをfullにすると、どれかのfeatureが引っかかりエラーが出る。必要なfeatureのみを個別で指定
結論
無料での利用は難しそう。
有料でもリクエストあたりの実時間:30秒以内、とのことで、
SSEとかロングポーリングがあるアプリケーションにはtimeoutを長く取れるというのも必要な要件になるので、そのようなアプリケーションのロードバランサーとして使うには不向きかもしれない。