♻️
RxSwift: ObservableをSequenceで同期処理に変更する
class SyncFIFO<T> {
var buf = Array<T>()
let lock = DispatchSemaphore(value: 1)
let sem = DispatchSemaphore(value: 0)
func push(_ val: T) {
self.lock.wait()
defer { self.lock.signal() }
self.buf.append(val)
self.sem.signal()
}
func popSync() -> T {
sem.wait()
lock.wait()
defer { lock.signal() }
return buf.removeFirst()
}
}
extension ObservableType {
func toSequenceSync<T>() -> AnySequence<Event<T>> where T == E {
return AnySequence { () -> AnyIterator<Event<T>> in
let buf = SyncFIFO<Event<T>?>()
let disposable = self.subscribe(buf.push)
return AnyIterator {
guard let ev = buf.popSync() else {
disposable.dispose()
return nil
}
switch ev {
case .error, .completed:
buf.push(nil)
fallthrough
case .next:
return ev
}
}
}
}
}
-
AnySequence
の.makeIterator()
でsubscribe
-
AnyIterator
の.next()
でEvent
が流れてくるまでブロックします
※ デッドロックに注意
Discussion