HonoのStreaming Helperについて調べた
開発でストリーミングしたい実装があったのでHonoのStreaming Helperについて調べました。
記事を書く中で前提知識が欠けてるのに気付いたので一緒に書いてあります。
前提知識
ストリーミングを利用することで、以下のようなメリットを享受できます。
- サーバーの負荷軽減: サーバーは全データを一度にメモリに保持する必要がなくなり、メモリ使用量を節約できます。
- クライアントのリアルタイムデータ取得: クライアントはデータをリアルタイムで受信し、全データが揃うのを待たずに処理を開始できます。
ストリーミングを行うために必要な要素は以下の2つです。両方知っておくと今後混乱がなくなると思います。
Transfer-Encoding: chunked
HTTP/1.1以降では、ヘッダーのTransfer-Encodingをchunkedに設定することで、サーバーはコンテンツの全長を事前に知らなくてもデータを送信できます。データは「チャンク」と呼ばれる小さなセグメントに分割され、各チャンクにはサイズ情報が含まれています。クライアントはこの情報を基にデータの終端を判断します。
Content-Type: text/event-stream
Content-Typeヘッダーがtext/event-streamである場合、サーバーからのレスポンスはServer-Sent Events (SSE) のストリーム、つまりイベントストリームであることを示しています。SSEはHTML5で導入され、HTTP/1.1の持続的な接続を利用してサーバーからクライアントへリアルタイムのデータを一方向に送信する仕組みです。
ChatGPTの会話をChromeの検証モードで確認したらこっちが使われているようでした。
SSEはHTTP/1.1からサポートされており、HTTP/2でも引き続き利用可能です。HTTP/2では通信の効率化やパフォーマンスの向上が図られていますが、SSEそのものはHTTP/1.1の機能であり、長い間広く使用されています。これにより、クライアントはサーバーから継続的にイベントを受信し、リアルタイムでの更新が可能になります。
stream()
最も汎用的なヘルパーです。
1秒ごとにバイナリとして'Hello'をストリーミングする簡単な例を作成しました。
import { Hono } from 'hono'
import { stream } from 'hono/streaming'
const app = new Hono()
app.get('/stream', (c) => {
return stream(c, async (stream) => {
await stream.write(new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f]))
await stream.sleep(1000)
await stream.write(new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f]))
await stream.sleep(1000)
await stream.write(new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f]))
})
})
export default app
ターミナルからエンドポイントにアクセスします。
curl http://localhost:8787/stream -N
1秒ごとに'Hello'が配信されているのが確認できました!
ヘッダーはこんな感じでした。
ちゃんとTransfer-Encoding:chunked
ですね!
逆にこれしかデフォルトでは設定してもらえないのでヘッダーを追加する場合は以下のようにコールバック内でヘッダーの値をコンテキストに設定しています。
app.get('/stream', (c) => {
return stream(c, async (stream) => {
c.header('Content-Type', 'text/plain')
// 省略....
})
})
調べるとContextの型定義にヘッダーが含まれていることが確認できました。
(ほとんど全てのプロパティやメソッドに使用例やドキュメントのリンクが貼ってあった...感激)
export declare class Context<E extends Env = any, P extends string = any, I extends Input = {}> {
/**
* `.header()` can set headers.
*
* @see {@link https://hono.dev/docs/api/context#body}
*
* @example
* ```ts
* app.get('/welcome', (c) => {
* // Set headers
* c.header('X-Message', 'Hello!')
* c.header('Content-Type', 'text/plain')
*
* return c.body('Thank you for coming')
* })
* ```
*/
header: SetHeaders;
// 省略...
}
interface SetHeaders {
(name: 'Content-Type', value?: BaseMime, options?: SetHeadersOptions): void;
(name: ResponseHeader, value?: string, options?: SetHeadersOptions): void;
(name: string, value?: string, options?: SetHeadersOptions): void;
}
streamText()
前に紹介したstream()
でテキストを返すパターンに特化してるヘルパーです。
サンプルはこんな感じ。
コールバック内でwriteIn()
を呼び出せるのが特徴です。
writeIn()
は出力するテキストの末尾に改行を追加してくれます。
import { Hono } from 'hono'
import { streamText } from 'hono/streaming'
const app = new Hono()
app.get('/streamText', (c) => {
return streamText(c, async (stream) => {
await stream.write('Who...')
await stream.sleep(1000)
await stream.writeln('are you?')
await stream.sleep(1000)
await stream.write(`I'm...`)
await stream.sleep(1000)
await stream.write(`SuperKojima.`)
})
})
export default app
curl http://localhost:8787/streamText -N
writeIn()
を実行したタイミングでテキストが改行されています!
streamSSE()
全部使ってみてこれが一番実務で使いそうだなって思いました。
個人的にdata
にjson文字列を入れて最終的にパースしなければSSEMessage
の型があるのが良いですね。
SSEで送れる文字列のパターンにはルールがあるみたいなのでチェックしてもらうと理解が進みます。
SSEのフィールドで列を構成したデータベースがあればいい感じのものが作れそう。
export interface SSEMessage {
data: string | Promise<string>;
event?: string;
id?: string;
retry?: number;
}
export declare class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream, readable: ReadableStream);
writeSSE(message: SSEMessage): Promise<void>;
}
公式の実装例がわかりやすかったのでそのまま実行してみました。
実際のデータはこちら、、、
event: time-update
data: It is 2024-09-19T07:00:17.762Z
id: 25
event: time-update
data: It is 2024-09-19T07:00:18.761Z
id: 26
event: time-update
data: It is 2024-09-19T07:00:19.762Z
id: 27
時間の掛かってユーザー操作でプロセスが追えなくなるのを避けるのにすごく使えそうでした。
今後の開発に活かして行けたらと思います。
では、また。
Discussion