🗽
URLSession(URLSessionDataDelegate)のストリームをAsyncStreamを通して使いやすくする
通常のストリーム
URLSessionDataDelegate
URLSessionDataDelegateを移譲するStreamProviderを用意します。
中身は固定なのでほぼ知らなくても大丈夫です。
StreamProvider
final class StreamProvider: NSObject, URLSessionDataDelegate {
private let handler: (Item) -> Void
private let errorHandle: ((any Error)?) -> Void
private var task: URLSessionDataTask!
init(
request: URLRequest,
handler: @escaping (Item) -> Void,
errorHandler: @escaping ((any Error)?) -> Void
) {
self.handler = handler
self.errorHandle = errorHandler
super.init()
let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
self.task = session.dataTask(with: request)
}
func stop() {
task.cancel()
}
func start() {
task.resume()
}
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
let item = try! JSONDecoder().decode(Item.self, from: data)
handler(item)
}
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
errorHandle(error)
}
}
利用側
let url = URL(string: "https://api.twitter.com/2/tweets/search/stream")!
let request = URLRequest(url: url)
let streamProvider = StreamProvider(request: request) { item in
print(item)
} errorHandler: { error in
if let error {
print(error)
}
}
streamProvider.start()
// 10秒間ストリームする
try await Task.sleep(nanoseconds: 10_000_000_000)
streamProvider.stop()
AsyncStream(AsyncThrowingStream)
URLSessionDataDelegateを使うと取得したデータを見た目上遠くで管理しないといけないですが、AsyncStreamを使うと利用側がスッキリして見えます。
for await
という形で利用することができるようになります。
for try await item in stream {
print(item)
}
実装
let stream = AsyncThrowingStream { continuation in
let streamProvider = StreamProvider(request: request) { item in
continuation.yield(item)
} errorHandler: { error in
continuation.finish(throwing: error)
}
}
for try await item in stream {
print(item)
}
キャンセル
途中でストリームを止めたい場合は、Taskとして持っておいて、10秒後などに、終了するなども可能です。
let task = Task {
for try await item in stream {
print(item)
}
}
try await Task.sleep(nanoseconds: 10_000_000_000)
task.cancel()
Discussion