Rust HyperプロキシーとRedisでレートリミットを実装
この記事ではRust Hyper
のリバースプロキシーとRedis
を使用してレートリミットを実装してみます。ソースコードをgistに置きます。
1 Redis レートリミット仕様
Redisの公式サイトのサンプルにあるようにIncr
を使用して実装します。
レートリミットの仕様は1秒当たりのリクエスト数をIPアドレス毎に設定します。
Redisのキーは、 RATELIMIT:{リモートIPアドレス}:{Unixタイムスタンプ}
の形式になります。
例えばIPv4であれば以下のようにRedisキーと値が作成されていきます。
RATELIMIT:40.234.76.183:1655340291 = 1
RATELIMIT:40.234.76.183:1655340291 = 2
RATELIMIT:40.234.76.183:1655340291 = 3
RATELIMIT:40.234.76.183:1655340292 = 1
RATELIMIT:60.11.37.54:1655340292 = 1
RATELIMIT:40.234.76.183:1655340293 = 1
Rust実装のコードです。
リクエスト数が設定した閾値を超えると、exceed_rate_limit()
はtrue
を返します。
// returns true if it exceeds the rate limit.
fn exceed_rate_limit(pool: Pool, ipaddr: String) -> Result<bool> {
let ut = unix_time(); // Unixタイムスタンプ秒
let key = format!("RATELIMIT:{ipaddr}:{ut}");
let mut con = pool.get()?; // Redisコネクションの取得
let count: i32 = con.incr(&key, 1)?;
let _: () = con.expire(&key, 10)?; // delete the key in 10s
let ret = count > RATE_LIMIT; // 閾値の判定
println!("Redis key>{} count>{} Above the limit?>{}", key, count, ret);
Ok(ret)
}
trueが返ると、Ok(x) if x
にマッチします。処理を中断し、429エラーを返します。
match exceed_rate_limit(ctx.pool.clone(), client_ip_addr) {
Ok(x) if x => {
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS) // status=429
.body(Body::empty())
.expect("failed to get Result"));
}
R2D2 コネクションプール
R2D2コネクションプールのredis実装です。
let redis_endpoint = env::var("REDIS_ENDPOINT").expect("plz set REDIS_ENDPOINT");
let redis_url = format!("redis://{}", redis_endpoint);
let manager =
RedisConnectionManager::new(redis_url).expect("failed to create a connection manager");
let pool = r2d2::Pool::builder()
.max_size(CONNECTION_POOL_SIZE)
.build(manager)
.expect("pool");
生成したプール(pool)をContextに入れてハンドラー(handle)に渡しています。
type Pool = r2d2::Pool<RedisConnectionManager>;
#[derive(Clone)]
struct AppContext {
pool: Pool,
upstream: String,
}
async fn handle(
ctx: AppContext,
client_ip: IpAddr,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
2 Hyperリバースプロキシー
レートリミットを無事通過した場合は、リバースプロキシーでリクエストをupstream(バックエンド)に転送します。Hyperリバースプロキシーをそのまま利用します。
hyper_reverse_proxy::call
を呼び出すだけです。注意点としてupstreamで発生したエラーはOk(response)
にマッチします。一方、プロキシー側の呼び出しで発生した場合はErr(e)にマッチします。どちら側でコケたのか分かりやすいように、BAD_GATEWAY(502)
を返しています。
// Lastly, dispatch a request to the upstream server asynchronously.
match hyper_reverse_proxy::call(client_ip, &ctx.upstream, req).await {
Ok(response) => Ok(response), // status could not be 200.
Err(e) => {
let msg = format!("{:?}", e);
println!("{:?}", msg);
Ok(Response::builder()
.status(StatusCode::BAD_GATEWAY) // error occurred in proxy side.
.body(Body::from(msg))
.unwrap())
}
}
hyperライブラリの中身のコードです(私のではなく)。hop-by-hop
ヘッダーの削除をしてくれます。
lazy_static! {
static ref HOP_HEADERS: Vec<Ascii<&'static str>> = vec![
Ascii::new("Connection"),
Ascii::new("Keep-Alive"),
Ascii::new("Proxy-Authenticate"),
Ascii::new("Proxy-Authorization"),
Ascii::new("Te"),
Ascii::new("Trailers"),
Ascii::new("Transfer-Encoding"),
Ascii::new("Upgrade"),
];
}
HOP_HEADERS.iter().any(|h| h == &name)
クラウド環境やIPv4でのはまりポイント
GCPやAWSなどのクラウド環境にデプロイする場合はロードバランサー(LB)が前段にきます。したがって、remote_addr
の値はLBのIPが返されてしまいます。GCPの場合は、x-forwarded-for
にLBの手前のサーバのIPがセットされますのでそれを利用する必要があります。
// On GCP, end user's ip is set to x-forwarded-for.
let x_forward = req.headers().get("x-forwarded-for");
let client_ip_addr: String = match x_forward {
None => client_ip.to_string(),
Some(x) => x.to_str().unwrap_or("").into(),
};
println!("provider global IP>{}", client_ip_addr);
GCP Cloud Runにデプロイしたインスタンスにソフトバンク光経由IPv6でcURLアクセスした場合のダンプの例です。x-forwarded-for
にIPv6テンポラリアドレスがセットされたのがわかります。
Request { method: GET, uri: /, version: HTTP/1.1, headers: {"host": "test-xxx.a.run.app", "user-agent": "curl/7.79.1", "accept": "*/*", "x-cloud-trace-context": "xxx", "traceparent": "00-xxx-01", "x-forwarded-for": "2400:2651:xxx", "x-forwarded-proto": "https", "forwarded": "for=\"[2400:2651:xxx]\";proto=https"}, body: Body(Empty) }
3 Upstreamサーバー
リバースプロキシーの裏に隠れているサーバをupstreamと言います。検証用にRustの公式チュートリアルのTcpListener
のコードを拝借します。リバースプロキシーから送信されるデータをそのまま出力できるためです。
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024]; // read upto 1024
let size = stream.read(&mut buffer).unwrap();
println!("{:?}", std::str::from_utf8(&buffer[0..size]));
4 テストする
では実際にcURLでアクセスしてレートリミットが作動するかテストしてみましょう。
Redisサーバを適宜インストールしてください(説明略)。ここではローカルのmacにインストールしポート6380で起動します。
redis-server --port 6380
Upstreamサーバーを起動します。ポート7878で起動します。
cargo run --bin tcp1
最後にリバースプロキシーを起動します。ポート8080で起動します。
export REDIS_ENDPOINT=localhost:6380
export UPSTREAM_ENDPOINT=http://localhost:7878
cargo run --bin ratelimit
cURLでアクセスしてみましょう。
curl -v http://localhost:8080/
ローカルループバックを使用しているので、127.0.0.1がセットされます。
Running server on 0.0.0.0:8080
Redis key>RATELIMIT:127.0.0.1:1655334614 count>1 Above the limit?>false
無事、Upstreamにリクエストが届きました。
Ok("GET / HTTP/1.1\r\nhost: localhost:8080\r\nuser-agent: curl/7.79.1\r\naccept: */*\r\nx-forwarded-for: 127.0.0.1\r\n\r\n")
vegetaで負荷をかける
さて本題のレートリミットの検証をします。
vegeta
でレートリミットを超過するリクエストを投げてみます。
リクエストの半分がレートリミットを超えるように1秒間に20リクエスト送信してみましょう。
-rate=20
❯ echo 'GET http://localhost:8080/' | vegeta attack -rate=20 -duration=10s | tee results.bin | vegeta report
Requests [total, rate, throughput] 200, 20.10, 10.45
Duration [total, attack, wait] 9.951s, 9.95s, 1.215ms
Latencies [min, mean, 50, 90, 95, 99, max] 872.854µs, 2.478ms, 2.818ms, 4.011ms, 4.804ms, 6.388ms, 9.806ms
Bytes In [total, mean] 16640, 83.20
Bytes Out [total, mean] 0, 0.00
Success [ratio] 52.00%
Status Codes [code:count] 200:104 429:96
Error Set:
429 Too Many Requests
綺麗にHTTPステータス200と429がほぼ半数になりました。理論値に近い結果になりました。実際にはインスタンス自体がボトルネックにならないように、オートスケールする環境にデプロイする必要がありますし、そもそもシングルインスタンスの場合にはわざわざRedisを使用する必要はなくグローバル変数で事足ります。
また、Rustのスレッド数に関しても注意が必要です。こちらの記事も参考にしてください。
Discussion