♻️

RxSwift: ObservableをSequenceで同期処理に変更する

2020/09/26に公開
class SyncFIFO<T> {
    var buf = Array<T>()
    let lock = DispatchSemaphore(value: 1)
    let sem = DispatchSemaphore(value: 0)

    func push(_ val: T) {
        self.lock.wait()
        defer { self.lock.signal() }
        self.buf.append(val)
        self.sem.signal()
    }
    
    func popSync() -> T {
        sem.wait()
        lock.wait()
        defer { lock.signal() }
        return buf.removeFirst()
    }
}

extension ObservableType {
    func toSequenceSync<T>() -> AnySequence<Event<T>> where T == E {
        return AnySequence { () -> AnyIterator<Event<T>> in
            let buf = SyncFIFO<Event<T>?>()
            let disposable = self.subscribe(buf.push)
            return AnyIterator {
                guard let ev = buf.popSync() else {
                    disposable.dispose()
                    return nil
                }

                switch ev {
                case .error, .completed:
                    buf.push(nil)
                    fallthrough
                case .next:
                    return ev
                }
            }
        }
    }
}
  • AnySequence.makeIterator()subscribe
  • AnyIterator.next()Eventが流れてくるまでブロックします

※ デッドロックに注意

Discussion