🗽

URLSession(URLSessionDataDelegate)のストリームをAsyncStreamを通して使いやすくする

2023/04/20に公開

通常のストリーム

URLSessionDataDelegate

URLSessionDataDelegateを移譲するStreamProviderを用意します。
中身は固定なのでほぼ知らなくても大丈夫です。

StreamProvider
final class StreamProvider: NSObject, URLSessionDataDelegate {
  private let handler: (Item) -> Void
  private let errorHandle: ((any Error)?) -> Void

  private var task: URLSessionDataTask!

  init(
    request: URLRequest,
    handler: @escaping (Item) -> Void,
    errorHandler: @escaping ((any Error)?) -> Void
  ) {
    self.handler = handler
    self.errorHandle = errorHandler

    super.init()

    let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
    self.task = session.dataTask(with: request)
  }

  func stop() {
    task.cancel()
  }
  
  func start() {
    task.resume()
  }

  func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
    let item = try! JSONDecoder().decode(Item.self, from: data)
    handler(item)
  }

  func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
    errorHandle(error)
  }
}

利用側

let url = URL(string: "https://api.twitter.com/2/tweets/search/stream")!
let request = URLRequest(url: url)

let streamProvider = StreamProvider(request: request) { item in
  print(item)
} errorHandler: { error in
  if let error {
    print(error)
  }
}

streamProvider.start()
// 10秒間ストリームする
try await Task.sleep(nanoseconds: 10_000_000_000)
streamProvider.stop()

AsyncStream(AsyncThrowingStream)

URLSessionDataDelegateを使うと取得したデータを見た目上遠くで管理しないといけないですが、AsyncStreamを使うと利用側がスッキリして見えます。

for awaitという形で利用することができるようになります。

for try await item in stream {
  print(item)
}

実装

let stream = AsyncThrowingStream { continuation in
  let streamProvider = StreamProvider(request: request) { item in
    continuation.yield(item)
  } errorHandler: { error in
    continuation.finish(throwing: error)
  }
}

for try await item in stream {
  print(item)
}

キャンセル

途中でストリームを止めたい場合は、Taskとして持っておいて、10秒後などに、終了するなども可能です。

let task = Task {
  for try await item in stream {
    print(item)
  }
}

try await Task.sleep(nanoseconds: 10_000_000_000)

task.cancel()

Discussion