Server-Side Swift Vaporでロングポーリングを実装してみる

2024/08/22に公開

概要

Webアプリケーションにおける双方向通信には、「ポーリング」、「ロングポーリング」、「WebSocket」、「WebRTC」などの手法があります。その中で、ロングポーリングの仕組みは理解しているものの、具体的な実装方法がわからなかったため、自己流で実装してみました。

この記事では、クライアントからGET /waitにアクセスした場合にそのレスポンスを保留し、別クライアントからPOST /sendにメッセージが送られた際にアクセス保留中のクライアントにそのメッセージをレスポンスとして返却するというWeb APIサーバーを実装します。

ロングポーリング(Commet)とは

ロングポーリング(Comet)は、WebSocketがない時代に、既存技術だけでサーバーからの通信を実現する一般的な手法です。ポーリングのようにクライアントが定期的にリクエストを投げるのではなく、一度クライアントがリクエストを投げ、サーバーがそのリクエストに即時にレスポンスを返すのではなく保留状態にしておき、サーバーからの通信が必要になった際にレスポンスとしてクライアントに送信するというものです。

結論

以下のコードは、Vaporを使用してロングポーリングを実装した例です。

routes.swift
import Vapor

actor PollingStore {
    var pollings: [@Sendable (String) -> Void] = []
    
    init() { }
    
    func subscribe() async -> String {
        await withCheckedContinuation { continuation in
            pollings.append { str in
                continuation.resume(returning: str)
            }
        }
    }
    
    func distribute(str: String) {
        for polling in pollings {
            polling(str)
        }
        pollings = []
    }
}

let store = PollingStore()

func routes(_ app: Application) throws {
    /// サーバーにPOST /sendにメッセージが送られるまで待機
    app.get("wait") { req async -> String in
        await store.subscribe()
    }

    /// メッセージをポーリング中のクライアント全員へサーバーから送信する
    app.post("send") { req async -> HTTPStatus in
        guard let str = req.body.string else { return .badRequest }
        await store.distribute(str: str)
        return .ok
    }
}

実装の過程

WebSocketからロングポーリングへ

WebSocketを使用する場合、接続を配列で管理し、イベントが発生したときにソケット経由でメッセージを送信します。これと同様に、ロングポーリングでも接続を管理するために、クロージャの配列を使用するようにしました。

import Vapor

actor SocketStore {
    private var sockets: [WebSocket] = []
    
    func append(_ ws: WebSocket) {
        sockets.append(ws)
    }
    
    func distribute(str: String) {
        for socket in sockets {
            socket.send(str)
        }
    }
}

let store = SocketStore()

func routes(_ app: Application) throws {
    /// WebSocketを接続
    app.webSocket("ws") { req, ws in
        await store.append(ws)
    }

    // WebSocketが接続中のユーザーに"Hello, World!"を送信する
    app.post("send") { req -> HTTPStatus in
        await store.distribute(str: "Hello, World!")
        return .ok
    }
}

このWebSocketの処理をロングポーリングに置き換えるため、クロージャの配列を保持するように変更しました。

routes.swift
import Vapor

actor PollingStore {
    var pollings: [@Sendable (String) -> Void] = []
    
    init() { }
}

非同期関数による待機処理

Swift 5.5から導入されたasync/awaitを使うことで、従来のコールバック方式の非同期処理をより簡潔に表現できるようになりました。これを活用し、待機する際にはクロージャを配列に追加し、クロージャが実行されたときに非同期関数が適切に結果を返すように実装します。

以下のコードでは、コールバック方式の非同期関数をwithCheckedThrowingContinuationを用いてasync/await方式に変換する方法を例示しています。

import Vapor

// Callback
func readFileContent(req: Request) -> EventLoopFuture<String> {
    req.fileio.collectFile(at: "/path/to/file").map { byteBuffer in
        String(buffer: byteBuffer)
    }
}

// Async
func readFileContent(req: Request) async throws -> String {
    try await withCheckedThrowingContinuation { continuation in
        let future = readFileContent(req: req)
        future.whenSuccess { str in
            continuation.resume(returning: str)
        }
        future.whenFailure { error in
            continuation.resume(throwing: error)
        }
    }
}

withCheckedContinuationを利用して、実行された際に非同期関数の結果が返るクロージャを作成して、pollings配列に入れるようにします。

これで、subscribe()非同期関数は、polling配列に入れたクロージャが実行するまで完了にならず、レスポンスは保留されます。

routes.swift
import Vapor

actor PollingStore {
    var pollings: [@Sendable (String) -> Void] = []
    
    init() { }
    
+   func subscribe() async -> String {
+       await withCheckedContinuation { continuation in
+           pollings.append { str in
+               continuation.resume(returning: str)
+           }
+       }
+   }
}

+let store = PollingStore()

+func routes(_ app: Application) throws {
+   /// サーバーにPOST /sendにメッセージが送られるまで待機
+   app.get("wait") { req async -> String in
+       await store.subscribe()
+   }
+}

メッセージの配信とクロージャの消去

メッセージを配信する際に、クロージャを実行して待機中のリクエストにレスポンスを返却し、その後クロージャの配列をクリアします。

routes.swift
import Vapor

actor PollingStore {
    var pollings: [@Sendable (String) -> Void] = []
    
    init() { }
    
    func subscribe() async -> String {
        await withCheckedContinuation { continuation in
            pollings.append { str in
                continuation.resume(returning: str)
            }
        }
    }
    
+   func distribute(str: String) {
+       for polling in pollings {
+           polling(str)
+       }
+       pollings = []
+   }
}

let store = PollingStore()

func routes(_ app: Application) throws {
    /// サーバーにPOST /sendにメッセージが送られるまで待機
    app.get("wait") { req async -> String in
        await store.subscribe()
    }
    
+   /// メッセージをポーリング中のクライアント全員へサーバーから送信する
+   app.post("send") { req async -> HTTPStatus in
+       guard let str = req.body.string else { return .badRequest }
+       await store.distribute(str: str)
+       return .ok
+   }
}

あとがき

ロングポーリングは、コネクションを長時間維持するためサーバーに負荷がかかりやすく、現在ではあまり使用されていません。しかし、WebSocketと比較してクライアント側の実装がシンプルで、扱いやすい点があります。ハッカソンやプロトタイプの開発では、手軽にリアルタイム通信を実現する手段として、今でも有用だと感じました。

また、並行処理では競合などが懸念されますが、Swift 6でのStrict Concurrencyによって型的に保証されているため、安全に開発が進められるのは非常に素晴らしいと感じています。

nextbeat Tech Blog

Discussion