reqwestを使ってSSEでデータを送受信する際の注意点
はじめに
最近reqwestを使ってstreamの処理を実装したんですが、受け取ったバイト列をUTF8文字列に変換しようとすると切られてしまう事象に遭遇しました。
結論からいうとhyper側でレスポンスボディをバッファリングするため、場合によってはUTF8文字列の変換が失敗することがわかりました。
本記事では、どのような実装で問題が発生したのか、reqwest +hyperの実装を追いかけつつ、最終的にどう対処するのかについて書いていきます。
問題の実装について
今回、問題が発生した実装は次のような感じとなっています。
特段変わったことはしておらず、stream.next(...)で受け取ったバイト列からUTF8文字列に変換しているだけです。
use tokio_stream::StreamExt as _;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let resp = reqwest::get("http://localhost:9998/sse").await?;
let mut stream = resp.bytes_stream();
while let Some(s) = stream.next().await.transpose()? {
println!(
"length: {}, invalid utf8 char: {}",
s.len(),
String::from_utf8(s.to_vec()).is_err()
);
// do something
}
Ok(())
}
使用しているクレートとバージョンは次となっています。
reqwestが使用しているhyperのバージョンは0.14.28です。
あとで動作検証するので、patchでローカルにcloneしたhyperを使うようにしています。
[dependencies]
reqwest = { version = "0.11.23", features = ["stream"] }
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1.14", default-features = false }
bytes = "1.5.0"
[patch.crates-io]
hyper = { path = "/Users/skanehira/dev/github.com/hyperium/hyper" }
一見特に問題ないように見えますが、冒頭にも書いたとおりUTF8文字列が切られてしまうことがあります。
reqwestとhyperの実装について
reqwestはhyperをwrapしてより使いやすくしたクレートで、リクエストの処理自体はhyperが行っています。
reqwest::get(...)の実装を見てみるとClient構造体を生成してsend()を呼ぶ出していることがわかります。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
Clientは次のようになっていて、ClientRefがhyper::Clientを持っています。
type HyperClient = hyper::Client<Connector, super::body::ImplStream>;
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
struct ClientRef {
...
hyper: HyperClient,
...
}
さらにhyper::Clientを追っておくとconn::Builderというのがいます。
pub struct Client<C, B = Body> {
config: Config,
#[cfg_attr(feature = "deprecated", allow(deprecated))]
conn_builder: conn::Builder,
connector: C,
pool: Pool<PoolClient<B>>,
}
引き続きを追っておくとconn::Builderのhandshake()が呼ばれます。
このhandshake()ではio(読み書きできるなにか)をproto::Conn::new()に渡して、コネクション管理するConnを生成しているようです。
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: HttpBody + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
let opts = self.clone();
async move {
trace!("client handshake {:?}", opts.version);
let (tx, rx) = dispatch::channel();
let proto = match opts.version {
#[cfg(feature = "http1")]
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
...
ProtoClient::H1 { h1: dispatch }
}
...
};
Ok((
SendRequest { dispatch: tx },
Connection { inner: Some(proto) },
))
}
}
Connの定義は次のようになっていて、このioにレスポンスボディがバッファリングされます。
pub(crate) struct Conn<I, B, T> {
io: Buffered<I, EncodedBuf<B>>,
state: State,
_marker: PhantomData<fn(T)>,
}
Buffered::new()を見ていくと次のようになっています。
impl<T, B> Buffered<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Buf,
{
pub(crate) fn new(io: T) -> Buffered<T, B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
let write_buf = WriteBuf::new(strategy);
Buffered {
flush_pipeline: false,
io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
write_buf,
}
}
...
}
どうやら、WriteStrategy::Queueの場合はバッファリング、そうじゃない場合はバッファリングしないようです。
つづけてWriteBufが名前からバッファっぽいので見ていくと、次のようになっています。
// an internal buffer to collect writes before flushes
pub(super) struct WriteBuf<B> {
/// Re-usable buffer that holds message headers
headers: Cursor<Vec<u8>>,
max_buf_size: usize,
/// Deque of user buffers if strategy is Queue
queue: BufList<B>,
strategy: WriteStrategy,
}
impl<B: Buf> WriteBuf<B> {
fn new(strategy: WriteStrategy) -> WriteBuf<B> {
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufList::new(),
strategy,
}
}
}
デバッガーで実際の挙動を確認すると、このheadersに一度データが入ったあとqueueにためていくようです。
headersのサイズがINIT_BUFFER_SIZE、queueのサイズがDEFAULT_MAX_BUFFER_SIZEのようです。
/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 81920;
/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
検証
INIT_BUFFER_SIZE/DEFAULT_MAX_BUFFER_SIZEの値がどのように作用しているかを調べるため、Denoで簡単なSSEサーバーを用意して確認してきます。
import { Application, Router } from "https://deno.land/x/oak@v12.6.1/mod.ts";
const app = new Application();
const router = new Router();
// 大量の「あ」が入っているテキストファイル
const file = "./data.txt";
const data = Deno.readTextFileSync(file).trim();
router.get("/sse", (ctx) => {
const target = ctx.sendEvents({
keepAlive: true,
});
target.dispatchMessage(data);
target.close();
});
app.use(router.routes());
await app.listen({ port: 9998 });
冒頭の実装を動かすと、次の結果を得られました。
1回目はINIT_BUFFER_SIZEではないが、ほぼ同じ値になっていますね。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false
1回目以降もサイズが増加していることがわかりました。
おそらくDEFAULT_MAX_BUFFER_SIZEまで増加したら、増えなくなると思われます。
しかし、何回か実行してみるとINIT_BUFFER_SIZEを大幅下回ることがあることがわかりました。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false
skanehira@godzilla sandbox/rust/stream
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/stream`
length: 65094, invalid utf8 char: false
length: 81920, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 63220, invalid utf8 char: false
次にINIT_BUFFER_SIZE/DEFAULT_MAX_BUFFER_SIZEの値をそれぞれ変更してみます。
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 * 10;
結果は次のようになりました。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/stream`
length: 7958, invalid utf8 char: false
length: 16384, invalid utf8 char: false
length: 32768, invalid utf8 char: true
length: 65536, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 5668, invalid utf8 char: false
やはりDEFAULT_MAX_BUFFER_SIZEのサイズまで増加したらそれ以上は増えないようです。
対処法
変数名とコメント、検証結果から想像するに
-
INIT_BUFFER_SIZE: 初回バッファリング時のバッファサイズ -
DEFAULT_MAX_BUFFER_SIZE: バッファの上限サイズ
という感じでしょうか。
バッファ自体のサイズはDEFAULT_MAX_BUFFER_SIZEですが、初回はINIT_BUFFER_SIZEなので、このサイズを超えるようなバイト列をUTF8文字列に変換しようとすると失敗する可能性があります。
また、初回は必ずINIT_BUFFER_SIZEに到達するとは限らないこともわかりました。
streamを返すのが外部サービスの場合、受け取る側でUTF8文字列に変換する際はUTF8文字が切られていないかをチェックして、失敗したらバッファリングして次回受け取ったデータと結合してチェックして…を繰り返すのがよいかなと思います。
以前Goでブラウザをターミナル代わりにするrttyというものを作った時にも似たような問題に当たったんですが、そのときは上記のようなやり方で回避しました。
自分たちのサービスでstreamを返す場合は、データ次第ですが送る側で調整するのも1つの手です。
今回試した感じだと60kbくらいであれば切られることがなさそうなので、60kbを目安に考えればよいかもしれません。
さいごに
こういった問題をフレームワーク側で吸収してほしいなと個人的に思いますが、フレームワークの責務か?といわれると微妙なところとも思います。
ひとまず今回はreqwestで起きた問題ですが、言語問わずstreamでUTF8文字列変換をする場合は、バッファリングのことを気にしたほうがよいかなと思います。
この記事が参考になれば幸いです。
Discussion
valid_up_toが役に立つかもしれません。
@Toru3
ありがとうございます
valid_up_to 知りませんでした
良さそうですね