reqwestを使ってSSEでデータを送受信する際の注意点
はじめに
最近reqwest
を使ってstreamの処理を実装したんですが、受け取ったバイト列をUTF8文字列に変換しようとすると切られてしまう事象に遭遇しました。
結論からいうとhyper
側でレスポンスボディをバッファリングするため、場合によってはUTF8文字列の変換が失敗することがわかりました。
本記事では、どのような実装で問題が発生したのか、reqwest
+hyper
の実装を追いかけつつ、最終的にどう対処するのかについて書いていきます。
問題の実装について
今回、問題が発生した実装は次のような感じとなっています。
特段変わったことはしておらず、stream.next(...)
で受け取ったバイト列からUTF8文字列に変換しているだけです。
use tokio_stream::StreamExt as _;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let resp = reqwest::get("http://localhost:9998/sse").await?;
let mut stream = resp.bytes_stream();
while let Some(s) = stream.next().await.transpose()? {
println!(
"length: {}, invalid utf8 char: {}",
s.len(),
String::from_utf8(s.to_vec()).is_err()
);
// do something
}
Ok(())
}
使用しているクレートとバージョンは次となっています。
reqwest
が使用しているhyper
のバージョンは0.14.28
です。
あとで動作検証するので、patch
でローカルにcloneしたhyper
を使うようにしています。
[dependencies]
reqwest = { version = "0.11.23", features = ["stream"] }
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1.14", default-features = false }
bytes = "1.5.0"
[patch.crates-io]
hyper = { path = "/Users/skanehira/dev/github.com/hyperium/hyper" }
一見特に問題ないように見えますが、冒頭にも書いたとおりUTF8文字列が切られてしまうことがあります。
reqwest
とhyper
の実装について
reqwest
はhyper
をwrapしてより使いやすくしたクレートで、リクエストの処理自体はhyper
が行っています。
reqwest::get(...)
の実装を見てみるとClient
構造体を生成してsend()
を呼ぶ出していることがわかります。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
Client
は次のようになっていて、ClientRef
がhyper::Client
を持っています。
type HyperClient = hyper::Client<Connector, super::body::ImplStream>;
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
struct ClientRef {
...
hyper: HyperClient,
...
}
さらにhyper::Client
を追っておくとconn::Builder
というのがいます。
pub struct Client<C, B = Body> {
config: Config,
#[cfg_attr(feature = "deprecated", allow(deprecated))]
conn_builder: conn::Builder,
connector: C,
pool: Pool<PoolClient<B>>,
}
引き続きを追っておくとconn::Builder
のhandshake()
が呼ばれます。
このhandshake()
ではio(読み書きできるなにか)をproto::Conn::new()
に渡して、コネクション管理するConn
を生成しているようです。
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: HttpBody + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
let opts = self.clone();
async move {
trace!("client handshake {:?}", opts.version);
let (tx, rx) = dispatch::channel();
let proto = match opts.version {
#[cfg(feature = "http1")]
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
...
ProtoClient::H1 { h1: dispatch }
}
...
};
Ok((
SendRequest { dispatch: tx },
Connection { inner: Some(proto) },
))
}
}
Conn
の定義は次のようになっていて、このio
にレスポンスボディがバッファリングされます。
pub(crate) struct Conn<I, B, T> {
io: Buffered<I, EncodedBuf<B>>,
state: State,
_marker: PhantomData<fn(T)>,
}
Buffered::new()
を見ていくと次のようになっています。
impl<T, B> Buffered<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Buf,
{
pub(crate) fn new(io: T) -> Buffered<T, B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
let write_buf = WriteBuf::new(strategy);
Buffered {
flush_pipeline: false,
io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
write_buf,
}
}
...
}
どうやら、WriteStrategy::Queue
の場合はバッファリング、そうじゃない場合はバッファリングしないようです。
つづけてWriteBuf
が名前からバッファっぽいので見ていくと、次のようになっています。
// an internal buffer to collect writes before flushes
pub(super) struct WriteBuf<B> {
/// Re-usable buffer that holds message headers
headers: Cursor<Vec<u8>>,
max_buf_size: usize,
/// Deque of user buffers if strategy is Queue
queue: BufList<B>,
strategy: WriteStrategy,
}
impl<B: Buf> WriteBuf<B> {
fn new(strategy: WriteStrategy) -> WriteBuf<B> {
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufList::new(),
strategy,
}
}
}
デバッガーで実際の挙動を確認すると、このheaders
に一度データが入ったあとqueue
にためていくようです。
headers
のサイズがINIT_BUFFER_SIZE
、queue
のサイズがDEFAULT_MAX_BUFFER_SIZE
のようです。
/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 81920;
/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
検証
INIT_BUFFER_SIZE
/DEFAULT_MAX_BUFFER_SIZE
の値がどのように作用しているかを調べるため、Denoで簡単なSSE
サーバーを用意して確認してきます。
import { Application, Router } from "https://deno.land/x/oak@v12.6.1/mod.ts";
const app = new Application();
const router = new Router();
// 大量の「あ」が入っているテキストファイル
const file = "./data.txt";
const data = Deno.readTextFileSync(file).trim();
router.get("/sse", (ctx) => {
const target = ctx.sendEvents({
keepAlive: true,
});
target.dispatchMessage(data);
target.close();
});
app.use(router.routes());
await app.listen({ port: 9998 });
冒頭の実装を動かすと、次の結果を得られました。
1回目はINIT_BUFFER_SIZE
ではないが、ほぼ同じ値になっていますね。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false
1回目以降もサイズが増加していることがわかりました。
おそらくDEFAULT_MAX_BUFFER_SIZE
まで増加したら、増えなくなると思われます。
しかし、何回か実行してみるとINIT_BUFFER_SIZE
を大幅下回ることがあることがわかりました。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false
skanehira@godzilla sandbox/rust/stream
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 65094, invalid utf8 char: false
length: 81920, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 63220, invalid utf8 char: false
次にINIT_BUFFER_SIZE
/DEFAULT_MAX_BUFFER_SIZE
の値をそれぞれ変更してみます。
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 * 10;
結果は次のようになりました。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/stream`
length: 7958, invalid utf8 char: false
length: 16384, invalid utf8 char: false
length: 32768, invalid utf8 char: true
length: 65536, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 5668, invalid utf8 char: false
やはりDEFAULT_MAX_BUFFER_SIZE
のサイズまで増加したらそれ以上は増えないようです。
対処法
変数名とコメント、検証結果から想像するに
-
INIT_BUFFER_SIZE
: 初回バッファリング時のバッファサイズ -
DEFAULT_MAX_BUFFER_SIZE
: バッファの上限サイズ
という感じでしょうか。
バッファ自体のサイズはDEFAULT_MAX_BUFFER_SIZE
ですが、初回はINIT_BUFFER_SIZE
なので、このサイズを超えるようなバイト列をUTF8文字列に変換しようとすると失敗する可能性があります。
また、初回は必ずINIT_BUFFER_SIZE
に到達するとは限らないこともわかりました。
streamを返すのが外部サービスの場合、受け取る側でUTF8文字列に変換する際はUTF8文字が切られていないかをチェックして、失敗したらバッファリングして次回受け取ったデータと結合してチェックして…を繰り返すのがよいかなと思います。
以前Goでブラウザをターミナル代わりにするrttyというものを作った時にも似たような問題に当たったんですが、そのときは上記のようなやり方で回避しました。
自分たちのサービスでstreamを返す場合は、データ次第ですが送る側で調整するのも1つの手です。
今回試した感じだと60kbくらいであれば切られることがなさそうなので、60kbを目安に考えればよいかもしれません。
さいごに
こういった問題をフレームワーク側で吸収してほしいなと個人的に思いますが、フレームワークの責務か?といわれると微妙なところとも思います。
ひとまず今回はreqwest
で起きた問題ですが、言語問わずstreamでUTF8文字列変換をする場合は、バッファリングのことを気にしたほうがよいかなと思います。
この記事が参考になれば幸いです。
Discussion
valid_up_toが役に立つかもしれません。
@Toru3
ありがとうございます
valid_up_to 知りませんでした
良さそうですね