📡

JuliaでServer-Sent Eventsを使ってストリーミングレスポンスを実装する

2024/11/09に公開

はじめに

ChatGPTのようなAIアプリケーションの普及により、処理に時間がかかる結果を段階的に返すUIが一般的になってきました。

本記事では、JuliaでServer-Sent Events (SSE) を使用してストリーミングレスポンスを実装する方法についてご紹介します。Juliaにおいても、数値計算の逐次配信といったユースケースに応用できます。

Server-Sent Eventsとは

Server-Sent Events (SSE)は、サーバーからクライアントへ一方向のリアルタイム通信を実現するためのプロトコルです。WebSocketと比べて、

  • より軽量
  • HTTP上で動作
  • 自動再接続機能を標準でサポート

という特徴があります。

実装の概要

今回は以下のような機能をHTTP.jlを用いて実装します。

  1. クライアントからPOSTリクエストでJSONデータを受信
  2. 受け取ったデータを基に処理
  3. SSEを使用して計算結果を逐次的に返信

なお、サンプルコードではテキストを逐次的に返すモックを実装しています。同様のサンプルコードをGitHubで公開しています。

サーバーの実装

以下のパッケージを使用します:

using Pkg
Pkg.add([
    "HTTP",
    "JSON3",
    "StructTypes"
])

実装コード

using HTTP
using JSON3
using StructTypes

const WORD_DELAY = 0.2  # 疑似的な遅延間隔

struct Request
    message::String
end
StructTypes.StructType(::Type{Request}) = StructTypes.Struct()

function stream(stream::HTTP.Stream)
    HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*")
    HTTP.setheader(stream, "Access-Control-Allow-Methods" => "POST, OPTIONS")
    HTTP.setheader(stream, "Access-Control-Allow-Headers" => "Content-Type")
    HTTP.setheader(stream, "Content-Type" => "text/event-stream")
    HTTP.setheader(stream, "Cache-Control" => "no-cache")

    startwrite(stream)

    if HTTP.method(stream.message) == "OPTIONS"
        return nothing
    end

    request = JSON3.read(String(readavailable(stream)), Request)

    message = "Hello! I am an AI assistant. " *
               "You said: $(request.message). " *
               "I am here to help you with your questions and provide assistance."

    for word in split(message)
        write(stream, "data: $(word)\n\n")
        sleep(WORD_DELAY)
    end

    write(stream, "data: [DONE]\n\n")
    closewrite(stream)

    return nothing
end

const ROUTER = HTTP.Router()
HTTP.register!(ROUTER, "/event", stream)

HTTP.serve(ROUTER, "0.0.0.0", 8080; stream = true)

実装コードの解説

1. ヘッダーの設定

HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*")
HTTP.setheader(stream, "Access-Control-Allow-Methods" => "POST, OPTIONS")
HTTP.setheader(stream, "Access-Control-Allow-Headers" => "Content-Type")
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
HTTP.setheader(stream, "Cache-Control" => "no-cache")

レスポンスヘッダーには大きく分けてCORS関連とSSE関連の2種類を設定しています。SSE関連のヘッダーでは、ストリーミングに必要な設定を行っています。

2. OPTIONSリクエストの処理

if HTTP.method(stream.message) == "OPTIONS"
    return nothing
end

ブラウザは、実際のPOSTリクエストの前にサーバーの許可を確認するためのOPTIONSリクエスト(プリフライトリクエスト)を送信します。これに対応するためにCORSヘッダーをレスポンスとして返しておきます。

3. リクエストデータの受け取り

struct Request
    message::String
end
StructTypes.StructType(::Type{Request}) = StructTypes.Struct()

request = JSON3.read(String(readavailable(stream)), Request)
  • Request構造体でPOSTリクエストのボディの構造を定義します
  • StructTypes.Struct()でJSON3による自動パースを利用可能にします
  • readavailableでリクエストボディを読み込み、JSON3.readでパースします

4. 計算結果のストリーミング処理

startwrite(stream)

response = "Hello! I am an AI assistant. " *
          "You said: $(request.message). " *
          "I am here to help you with your questions."

for word in split(response)
    write(stream, "data: $(word)\n\n")
    sleep(0.3)
end

write(stream, "data: [DONE]\n\n")
closewrite(stream)
  • write: データを送信します(SSEのフォーマットに従ってdata: プレフィックスと改行を付加します)
  • [DONE]マーカーでストリームの完了を通知します

5. ルーティングの設定

const ROUTER = HTTP.Router()
HTTP.register!(ROUTER, "/event", stream)

HTTP.serve(ROUTER, "0.0.0.0", 8080; stream = true)
  • /eventエンドポイントにストリーミング処理用のstream関数を登録します
  • stream = trueでストリーミング機能を有効化します

Channelを用いたストリーミング処理

前述の実装では、テキスト生成とストリーミング配信のロジックが一体となっていました。Pythonではジェネレーターを使うことでロジックを分離できます。JuliaにおいてもChannelを使うことで、これらのロジックを分離することができます。

テキスト生成をChannelとして切り出します。

function text_generator(message::String)
    channel = Channel{String}() do ch
        response = "Hello! I am an AI assistant. " *
                  "You said: $(message). " *
                  "I am here to help you with your questions."
        
        for word in split(response)
            put!(ch, word)
            sleep(WORD_DELAY)
        end
    end
    
    return channel
end

このChannelを使うことで、ストリーミング配信をシンプルに書くことができます。

startwrite(stream)

request = JSON3.read(String(readavailable(stream)), Request)

for word in text_generator(request.message)
    write(stream, "data: $(word)\n\n")
end

write(stream, "data: [DONE]\n\n")
closewrite(stream)

このように、計算とストリーミング配信のロジックを分離することで、計算処理とストリーミング処理を疎結合にできます。

クライアントの実装

Server-Sent Eventsを受信するクライアントサイド(JavaScript)の基本的な実装も合わせて紹介しておきます。ここでは、サーバーからストリーミングされるテキストをconsole.logに出力する最小限の実装を示します。

async function sendMessage() {
    const response = await fetch('http://localhost:8080/event', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
        },
        body: JSON.stringify({
            message: "How are you?"
        })
    });

    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const lines = value.split('\n');
        
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = message.substring(6).trim();
                if (data === '[DONE]') {
                    console.log('Stream completed');
                    break;
                }
                console.log('Received:', data);
            }
        }
    }
}

この JavaScript実装では、

  • fetch APIでPOSTリクエストを送信
  • Response Streamの受信データをUTF-8テキストにデコードするReaderを取得
  • SSEフォーマットのデータを解析して単語を表示
  • [DONE]マーカーを検知してストリームを終了

というシンプルな流れでストリーミングデータを処理しています。

まとめ

本記事では、JuliaでServer-Sent Eventsを使用したストリーミングレスポンスの実装方法について解説しました。HTTP.jlを用いることで簡単なテキストストリーミングを実装することができました。

参考リンク

Discussion