🦍

reqwestを使ってSSEでデータを送受信する際の注意点

はじめに

最近reqwestを使ってstreamの処理を実装したんですが、受け取ったバイト列をUTF8文字列に変換しようとすると切られてしまう事象に遭遇しました。
結論からいうとhyper側でレスポンスボディをバッファリングするため、場合によってはUTF8文字列の変換が失敗することがわかりました。

本記事では、どのような実装で問題が発生したのか、reqwest +hyperの実装を追いかけつつ、最終的にどう対処するのかについて書いていきます。

問題の実装について

今回、問題が発生した実装は次のような感じとなっています。
特段変わったことはしておらず、stream.next(...)で受け取ったバイト列からUTF8文字列に変換しているだけです。

use tokio_stream::StreamExt as _;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let resp = reqwest::get("http://localhost:9998/sse").await?;
    let mut stream = resp.bytes_stream();

    while let Some(s) = stream.next().await.transpose()? {
        println!(
            "length: {}, invalid utf8 char: {}",
            s.len(),
            String::from_utf8(s.to_vec()).is_err()
        );
        // do something
    }

    Ok(())
}

使用しているクレートとバージョンは次となっています。
reqwestが使用しているhyperのバージョンは0.14.28です。
あとで動作検証するので、patchでローカルにcloneしたhyperを使うようにしています。

[dependencies]
reqwest = { version = "0.11.23", features = ["stream"] }
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1.14", default-features = false }
bytes = "1.5.0"

[patch.crates-io]
hyper = { path = "/Users/skanehira/dev/github.com/hyperium/hyper" }

一見特に問題ないように見えますが、冒頭にも書いたとおりUTF8文字列が切られてしまうことがあります。

reqwesthyperの実装について

reqwesthyperをwrapしてより使いやすくしたクレートで、リクエストの処理自体はhyperが行っています。

reqwest::get(...)の実装を見てみるとClient構造体を生成してsend()を呼ぶ出していることがわかります。

pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
    Client::builder().build()?.get(url).send().await
}

Clientは次のようになっていて、ClientRefhyper::Clientを持っています。

type HyperClient = hyper::Client<Connector, super::body::ImplStream>;

#[derive(Clone)]
pub struct Client {
    inner: Arc<ClientRef>,
}

struct ClientRef {
    ...
    hyper: HyperClient,
    ...
}

さらにhyper::Clientを追っておくとconn::Builderというのがいます。

pub struct Client<C, B = Body> {
    config: Config,
    #[cfg_attr(feature = "deprecated", allow(deprecated))]
    conn_builder: conn::Builder,
    connector: C,
    pool: Pool<PoolClient<B>>,
}

引き続きを追っておくとconn::Builderhandshake()が呼ばれます。
このhandshake()ではio(読み書きできるなにか)をproto::Conn::new()に渡して、コネクション管理するConnを生成しているようです。

    pub fn handshake<T, B>(
        &self,
        io: T,
    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
    where
        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
        B: HttpBody + 'static,
        B::Data: Send,
        B::Error: Into<Box<dyn StdError + Send + Sync>>,
    {
        let opts = self.clone();

        async move {
            trace!("client handshake {:?}", opts.version);

            let (tx, rx) = dispatch::channel();
            let proto = match opts.version {
                #[cfg(feature = "http1")]
                Proto::Http1 => {
                    let mut conn = proto::Conn::new(io);

                    ...

                    ProtoClient::H1 { h1: dispatch }
                }
                ...
            };

            Ok((
                SendRequest { dispatch: tx },
                Connection { inner: Some(proto) },
            ))
        }
    }

Connの定義は次のようになっていて、このioにレスポンスボディがバッファリングされます。

pub(crate) struct Conn<I, B, T> {
    io: Buffered<I, EncodedBuf<B>>,
    state: State,
    _marker: PhantomData<fn(T)>,
}

Buffered::new()を見ていくと次のようになっています。

impl<T, B> Buffered<T, B>
where
    T: AsyncRead + AsyncWrite + Unpin,
    B: Buf,
{
    pub(crate) fn new(io: T) -> Buffered<T, B> {
        let strategy = if io.is_write_vectored() {
            WriteStrategy::Queue
        } else {
            WriteStrategy::Flatten
        };
        let write_buf = WriteBuf::new(strategy);
        Buffered {
            flush_pipeline: false,
            io,
            read_blocked: false,
            read_buf: BytesMut::with_capacity(0),
            read_buf_strategy: ReadStrategy::default(),
            write_buf,
        }
    }
    ...
}

どうやら、WriteStrategy::Queueの場合はバッファリング、そうじゃない場合はバッファリングしないようです。
つづけてWriteBufが名前からバッファっぽいので見ていくと、次のようになっています。

// an internal buffer to collect writes before flushes
pub(super) struct WriteBuf<B> {
    /// Re-usable buffer that holds message headers
    headers: Cursor<Vec<u8>>,
    max_buf_size: usize,
    /// Deque of user buffers if strategy is Queue
    queue: BufList<B>,
    strategy: WriteStrategy,
}

impl<B: Buf> WriteBuf<B> {
    fn new(strategy: WriteStrategy) -> WriteBuf<B> {
        WriteBuf {
            headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
            max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
            queue: BufList::new(),
            strategy,
        }
    }
}

デバッガーで実際の挙動を確認すると、このheadersに一度データが入ったあとqueueにためていくようです。
headersのサイズがINIT_BUFFER_SIZEqueueのサイズがDEFAULT_MAX_BUFFER_SIZEのようです。

/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 81920;

/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;

検証

INIT_BUFFER_SIZE/DEFAULT_MAX_BUFFER_SIZEの値がどのように作用しているかを調べるため、Denoで簡単なSSEサーバーを用意して確認してきます。

import { Application, Router } from "https://deno.land/x/oak@v12.6.1/mod.ts";

const app = new Application();
const router = new Router();

// 大量の「あ」が入っているテキストファイル
const file = "./data.txt";
const data = Deno.readTextFileSync(file).trim();

router.get("/sse", (ctx) => {
  const target = ctx.sendEvents({
    keepAlive: true,
  });
  target.dispatchMessage(data);
  target.close();
});

app.use(router.routes());
await app.listen({ port: 9998 });

冒頭の実装を動かすと、次の結果を得られました。
1回目はINIT_BUFFER_SIZEではないが、ほぼ同じ値になっていますね。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false

1回目以降もサイズが増加していることがわかりました。
おそらくDEFAULT_MAX_BUFFER_SIZEまで増加したら、増えなくなると思われます。

しかし、何回か実行してみるとINIT_BUFFER_SIZEを大幅下回ることがあることがわかりました。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/stream`
length: 81686, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 128548, invalid utf8 char: false
skanehira@godzilla sandbox/rust/stream
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/stream`
length: 65094, invalid utf8 char: false
length: 81920, invalid utf8 char: true
length: 163840, invalid utf8 char: true
length: 63220, invalid utf8 char: false

次にINIT_BUFFER_SIZE/DEFAULT_MAX_BUFFER_SIZEの値をそれぞれ変更してみます。

pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 * 10;

結果は次のようになりました。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.15s
     Running `target/debug/stream`
length: 7958, invalid utf8 char: false
length: 16384, invalid utf8 char: false
length: 32768, invalid utf8 char: true
length: 65536, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 81920, invalid utf8 char: true
length: 5668, invalid utf8 char: false

やはりDEFAULT_MAX_BUFFER_SIZEのサイズまで増加したらそれ以上は増えないようです。

対処法

変数名とコメント、検証結果から想像するに

  • INIT_BUFFER_SIZE: 初回バッファリング時のバッファサイズ
  • DEFAULT_MAX_BUFFER_SIZE: バッファの上限サイズ

という感じでしょうか。

バッファ自体のサイズはDEFAULT_MAX_BUFFER_SIZEですが、初回はINIT_BUFFER_SIZEなので、このサイズを超えるようなバイト列をUTF8文字列に変換しようとすると失敗する可能性があります。
また、初回は必ずINIT_BUFFER_SIZEに到達するとは限らないこともわかりました。

streamを返すのが外部サービスの場合、受け取る側でUTF8文字列に変換する際はUTF8文字が切られていないかをチェックして、失敗したらバッファリングして次回受け取ったデータと結合してチェックして…を繰り返すのがよいかなと思います。
以前Goでブラウザをターミナル代わりにするrttyというものを作った時にも似たような問題に当たったんですが、そのときは上記のようなやり方で回避しました。

自分たちのサービスでstreamを返す場合は、データ次第ですが送る側で調整するのも1つの手です。
今回試した感じだと60kbくらいであれば切られることがなさそうなので、60kbを目安に考えればよいかもしれません。

さいごに

こういった問題をフレームワーク側で吸収してほしいなと個人的に思いますが、フレームワークの責務か?といわれると微妙なところとも思います。
ひとまず今回はreqwestで起きた問題ですが、言語問わずstreamでUTF8文字列変換をする場合は、バッファリングのことを気にしたほうがよいかなと思います。
この記事が参考になれば幸いです。

FRAIMテックブログ

Discussion