🌾

[Swift] [Combine] nil を含む CurrentValueSubject で nil 以外がくるまでのタイムアウト設定

2021/12/25に公開

やりたいこと

初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定すること。
環境:Xcode 12.5.1

結論

初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定するには、以下の順で処理してあげるとうまくいきます。

  1. compactMap()
  2. first()
  3. timeout()
サンプル
import Combine
import Foundation

enum TestError: Error {
    case minusError
    case nilError
    case timeoutError
    case otherError
}

func twice(num: Int) -> Int {
    return num * 2
}

var cancellables = Set<AnyCancellable>()

var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
    .compactMap { $0 } // ← nilを排除
    .first() // ← nil 以外の値がくれば completion が呼ばれる
    .setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
    .timeout(
        .seconds(3), // 今回は3秒で設定
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .flatMap { num -> AnyPublisher<Int, TestError> in
        guard num > 0 else {
            return Fail(error: TestError.minusError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
    subjectL.send(5) // ← その2秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectL.send(6) // ← その4秒後に実行
    }
}

// 出力
// receiveValue: 10
// finished ← first()のおかげで 4 秒待つことはなく timeoutError にならない

もちろん、3 秒以内に nil 以外の値がこなければタイムアウトします↓

subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) { // ← 4秒待たせる
    subjectL.send(5) // ← その4秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectL.send(6) // ← その4秒後に実行
    }
}

// 出力
// error: timeoutError

以上です。

上記のサンプル処理がよくわからない人へ↓

以降はサンプル処理の解説でないですが、Combine の挙動のポイントを書き記したものになります。

前提処理

以下の定義はすべてに共通します。

import Combine
import Foundation

enum TestError: Error {
    case minusError
    case nilError
    case timeoutError
    case otherError
}

func twice(num: Int) -> Int {
    return num * 2
}

var cancellables = Set<AnyCancellable>()
  • TestError という適当なエラーを用意
  • twice(num: Int) という数値を 2 倍する適当な関数を用意
  • cancellables という適当な AnyCancellable.store(in: &cancellables) できる変数を用意

お断り

所々で nil を返却しない処理で、compactMap() を使用してしまっておりますが、誤った使い方です。
そういう場合は map() を使いましょう。

また、map()AnyPublisher<Output, Failure> を返すような場合は、map() ではなく flatMap() としてあげた方が良いです。

申し訳ございません。

CurrentValueSubject の初期化

失敗例
var subjectA: CurrentValueSubject<Int?, Never>

// CurrentValueSubject<Output, Failure> は初期化が必要
subject.value = 0 // ← いきなり代入するとエラーになる
成功例
var subjectA: CurrentValueSubject<Int?, Never>

subjectA = CurrentValueSubject<Int?, Never>(nil) // nilで初期化
subjectA.value = 1 // 初期化後ならエラーにならない
  • CurrentValueSubject<Output, Failure> は初期化が必要

sink の挙動

var subjectB = CurrentValueSubject<Int?, Never>(nil)
subjectB
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

// 出力↓
// receiveValue: nil ← いきなりnilでながれる
  • sink するといきなり処理が流れる → 初期値が流れる

dropFirst の挙動

var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
    .dropFirst() // ← 追加
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

// 出力↓
// なし ← .dropFirst()のおかげで初回(初期値)のストリームはながれない
  • dropFirst で初回(初期値)のストリームを流れるのを防ぐ

send の挙動

var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
    .dropFirst()
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

/// sendまたはvalueの代入によってストリームが走る
subjectC.send(1)
subjectC.send(nil)
subjectC.send(2)
subjectC.value = 3

// 出力↓
// receiveValue: Optional(1)
// receiveValue: nil
// receiveValue: Optional(2)
// receiveValue: Optional(3)
  • send または value の代入によってストリームが走る

cancel の挙動

var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
    .dropFirst()
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

/// cancel の処理の挙動
subjectC.send(1)
subjectC.send(nil)
cancellables.forEach{ $0.cancel() } // cancel()の実施
subjectC.send(2) // cancel()以降処理しなくなる
subjectC.value = 3 // cancel()以降処理しなくなる

// 出力↓
// receiveValue: Optional(1)
// receiveValue: nil
  • cancel 以降のストリームは流れない

completion: .finished の挙動

var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
    .dropFirst()
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

subjectC.send(1)
subjectC.send(completion: .finished) // completion
subjectC.send(nil) // sinkのcompletionの処理に入ったためsinkは走らない
subjectC.send(2) // sinkのcompletionの処理に入ったためsinkは走らない
subjectC.value = 3 // sinkのcompletionの処理に入ったためsinkは走らない

// 出力↓
// receiveValue: Optional(1)
// finished
  • completion の処理に入るとストリームに値は走らなくなる

map の挙動

var subjectD = CurrentValueSubject<Int?, Never>(nil)
subjectD
    .dropFirst()
    .map { twice(num: $0 ?? 0) } // ← 追加
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

subjectD.send(1)
subjectD.send(nil)
subjectD.value = 2

// 出力
// receiveValue: 2 ← map の処理が適応
// receiveValue: 0 ← nilは $0 ?? 0 によって0に変換されて処理された
// receiveValue: 4 ← map の処理が適応
  • 配列操作の map とほぼ同じように処理される
  • 関数を通して値を変換させるのに相性が良い
  • 場合に応じて型も変えることができる

compactMap の挙動

var subjectE = CurrentValueSubject<Int?, Never>(nil)
subjectE
    .dropFirst()
    .compactMap { $0 } // ← 追加
    .map { twice(num: $0 )} // 先にcompactMap()することによってアンラップ処理を省略
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

subjectE.send(1)
subjectE.send(nil) // ← compactMapによりnilは出力なし
subjectE.value = 2

// 出力
// receiveValue: 2 ← オプショナルが外れる
// receiveValue: 4 ← オプショナルが外れる
  • compactMap() は nil の際に処理を流れなくする
  • compactMap { $0 } は Combine ならではの guard let hoge = hoge else { return } のようなイメージ

compactMap { $0 } は書き換えると以下のようになる。

var subjectF = CurrentValueSubject<Int?, Never>(nil)
subjectF
    .dropFirst()
    .compactMap { num -> Int? in // ← compactMap { $0 } と同等の処理
        guard let num = num else {
            return nil
        }
        return num
    }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \(String(describing: $0))")
    }
    .store(in: &cancellables)

FailJusteraseToAnyPublishersetFailureTypeflatMap の合わせ技

var subjectG = CurrentValueSubject<Int?, Never>(nil)
subjectG
    .dropFirst()
    .flatMap { num -> AnyPublisher<Int, TestError> in // 型を指定
        guard let num = num else {
            return Fail(error: TestError.nilError) // Fail(error: Error)を返す
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self) // AnyPublisher<Int, TestError> にあわせる
            .eraseToAnyPublisher()
    }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectG.send(1)
subjectG.send(nil) // ←  nil のため、Fail(error: TestError.nilError) 発生
subjectG.value = 2 // ← 上の処理により sink の completion ブロックに入ったため処理が走らない

// 出力
// receiveValue: 2
// error: nilError
  • Fail はエラーを返す Publisher(struct Fail<Output, Failure> : Publisher where Failure : Error
  • Just はエラーを返さない Publisher(struct Just<Output> : Publisher
  • .setFailureType(to: Error.self) によって、Publisher に任意のエラーを付与できる ← これをつかって Just にエラーを付与することも可能
  • AnyPublisher<Output, Error> のような publisher を返す場合は .eraseToAnyPublisher() をつけてあげると型が揃う
  • flatMap()AnyPublisher<Output, Failure> を流すストリームを作成する (ここを map() にすると AnyPublisher<Output, Failure> の型であってもストリームは流れない)
  • Fail を検知すると sinkcompletion ブロックに入りそれ以降のストリームに流れてきた値は処理しない

errorMap の挙動

var subjectH = CurrentValueSubject<Int?, Never>(nil)
subjectH
    .dropFirst()
    .compactMap { num -> AnyPublisher<Int, TestError> in // 型を指定
        guard let num = num else {
            return Fail(error: TestError.nilError) // Fail(error: Error)を返す
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self) // AnyPublisher<Int, TestError> にあわせる
            .eraseToAnyPublisher()
    }
    .flatMap { $0 } // AnyPublisher<Int, TestError> を Int と TestError に分ける
    .map { twice(num: $0) }
    .mapError { error -> TestError in
        print("before error: \(error)")
        return TestError.otherError // すべてTestError.otherErrorに変換する
    }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectH.send(1)
subjectH.send(nil) // ←  nil のため、Fail(error: TestError.nilError) 発生
subjectH.value = 2 // ← 上の処理により sink の completion ブロックに入ったため処理が走らない

// 出力
// receiveValue: 2
// before error: nilError
// error: otherError // ← nilError だったエラーが otherError に変更されている
  • mapOutput に対してのみ処理をしないし、errorMapFailure にのみ処理を適応する

first の挙動

var subjectI = CurrentValueSubject<Int?, Never>(nil)
subjectI
    .dropFirst()
    .first() // ← 追加
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard let num = num else {
            return Fail(error: TestError.nilError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .mapError { error -> TestError in
        print("before error: \(error)")
        return TestError.otherError // すべてTestError.otherErrorに変換する
    }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectI.send(1) // ← first()によって1つの処理が終了したらsinkのcompletionブロックに入る
subjectI.send(nil) // ← すでにsinkのcompletionブロックに入っているため処理が走らない
subjectI.value = 2 // ← すでにsinkのcompletionブロックに入っているため処理が走らない

// 出力
// receiveValue: 2
  • first() はひとつのストリームを受け取ったら completion となる
  • ひとつめで completion するのではなく、N 回目で completion するかをコントロールするには prefix() を用いる↓

【補足】 prefix の挙動

var subjectI = CurrentValueSubject<Int?, Never>(nil)
subjectI
    .dropFirst()
    .prefix(2) // ← 追加
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard let num = num else {
            return Fail(error: TestError.nilError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .mapError { error -> TestError in
        print("before error: \(error)")
        return TestError.otherError // すべてTestError.otherErrorに変換する
    }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectI.send(1) // 1つめ
subjectI.send(2) // 2つめ ← prefix(2)によりこの時点でcompletionする
subjectI.value = 3 // 3つめ ← completionブロックに入ったため処理されない

// 出力
// receiveValue: 2
// receiveValue: 4
// finished
  • prefix() で、値が来てから N 回目で completion するかをコントロールできる
  • つまり、first()prefix(1) は同じ挙動(のはず)

timeout の挙動

基本形

var subjectJ = CurrentValueSubject<Int?, Never>(nil)
subjectJ
    .dropFirst()
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard let num = num else {
            return Fail(error: TestError.nilError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .timeout( // ← 追加
        .seconds(3),
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectJ.send(5) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
    subjectJ.send(6) // ← send(5)の2秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectJ.send(7) // ← send(6)の4秒後に実行 ← timeoutが3秒のため実行されない
    }
}


// 出力
// receiveValue: 10
// receiveValue: 12 // ← subjectJ.send(5)実行から2秒後
// error: timeoutError // ← subjectJ.send(6)実行から3秒後
  • timeout() で設定する期間は、値を最初に受け始めてから completion するまでの時間ではなく、ストリームが流れていない期間を指定したこととなる
  • 一度タイムアウトすると completion 処理が実行されるため、ストリームは走らなくなる

わざとタイムアウトさせる方法

var subjectK = CurrentValueSubject<Int?, Never>(nil)
subjectK
    .compactMap { _ -> Int? in return nil } // ← わざと全てnilに変換する
    .compactMap { num -> AnyPublisher<Int, TestError>? in
        guard num > 0 else {
            return Fail(error: TestError.minusError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .timeout(
        .seconds(3),
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectK.send(7)

// 出力
// error: timeoutError
  • .compactMap { _ -> Int? in return nil } で、わざと全て nil に変換する処理を compactMap() 内で行うと、ストリームが流れないので timeout する

タイトル回収

初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定する

うまくいく例

初期値が nil の CurrentValueSubject があるとき、nil 以外の最初の一つ目の値がくるまでタイムアウトを設定するには、以下の順で処理してあげるとうまくいく。

  1. compactMap()
  2. first()
  3. timeout()
var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
    .compactMap { $0 } // ← nilを排除
    .first() // ← nil 以外の値がくれば completion が呼ばれる
    .setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
    .timeout(
        .seconds(3),
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard num > 0 else {
            return Fail(error: TestError.minusError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
    subjectL.send(5) // ← その2秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectL.send(6) // ← その4秒後に実行
    }
}

// 出力
// receiveValue: 10
// finished ← first()のおかげで 4 秒待つことはなく timeoutError にならない

うまくいかない例1

first() を抜いてみた場合↓

var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
    .compactMap { $0 } // ← nilを排除
//    .first() // ← nil 以外の値がくれば completion が呼ばれる
    .setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
    .timeout(
        .seconds(3),
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard num > 0 else {
            return Fail(error: TestError.minusError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
    subjectL.send(5) // ← その2秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectL.send(6) // ← その4秒後に実行
    }
}

// 出力
// receiveValue: 10
// receiveValue: 12
// error: timeoutError
  • first() がないため、明確にストリームを止めない限り completion のブロックには行かないためタイムアウトエラーとなる

うまくいかない例2

first() のあとに compactMap() した場合↓

var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
    .dropFirst()
    .first() // ← 値がくれば completion が呼ばれる
    .compactMap { $0 } // ← nilを排除
    .setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
    .timeout(
        .seconds(3),
        scheduler: DispatchQueue.main,
        options: nil,
        customError: { .timeoutError }
    )
    .compactMap { num -> AnyPublisher<Int, TestError> in
        guard num > 0 else {
            return Fail(error: TestError.minusError)
                .eraseToAnyPublisher()
        }
        return Just(num)
            .setFailureType(to: TestError.self)
            .eraseToAnyPublisher()
    }
    .flatMap { $0 }
    .map { twice(num: $0) }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case let .failure(error):
            print("error: \(error)")
        }
    } receiveValue: {
        print("receiveValue: \($0)")
    }
    .store(in: &cancellables)

subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
    subjectL.send(5) // ← その2秒後に実行
    DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
        subjectL.send(6) // ← その4秒後に実行
    }
}

// 出力
// finished
  • subjectL.send(nil) で nil が来て、first() を通過するので、その時点で completion することが確定し、compactMap { $0 } で nil が排除されてすぐに completion のブロックに入る
GitHubで編集を提案

Discussion