🌾
[Swift] [Combine] nil を含む CurrentValueSubject で nil 以外がくるまでのタイムアウト設定
やりたいこと
初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定すること。
環境:Xcode 12.5.1
結論
初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定するには、以下の順で処理してあげるとうまくいきます。
compactMap()
first()
timeout()
サンプル
import Combine
import Foundation
enum TestError: Error {
case minusError
case nilError
case timeoutError
case otherError
}
func twice(num: Int) -> Int {
return num * 2
}
var cancellables = Set<AnyCancellable>()
var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
.compactMap { $0 } // ← nilを排除
.first() // ← nil 以外の値がくれば completion が呼ばれる
.setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
.timeout(
.seconds(3), // 今回は3秒で設定
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.flatMap { num -> AnyPublisher<Int, TestError> in
guard num > 0 else {
return Fail(error: TestError.minusError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
subjectL.send(5) // ← その2秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectL.send(6) // ← その4秒後に実行
}
}
// 出力
// receiveValue: 10
// finished ← first()のおかげで 4 秒待つことはなく timeoutError にならない
もちろん、3 秒以内に nil 以外の値がこなければタイムアウトします↓
subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) { // ← 4秒待たせる
subjectL.send(5) // ← その4秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectL.send(6) // ← その4秒後に実行
}
}
// 出力
// error: timeoutError
以上です。
上記のサンプル処理がよくわからない人へ↓
以降はサンプル処理の解説でないですが、Combine の挙動のポイントを書き記したものになります。
前提処理
以下の定義はすべてに共通します。
import Combine
import Foundation
enum TestError: Error {
case minusError
case nilError
case timeoutError
case otherError
}
func twice(num: Int) -> Int {
return num * 2
}
var cancellables = Set<AnyCancellable>()
-
TestError
という適当なエラーを用意 -
twice(num: Int)
という数値を 2 倍する適当な関数を用意 -
cancellables
という適当なAnyCancellable
を.store(in: &cancellables)
できる変数を用意
お断り
所々で nil
を返却しない処理で、compactMap()
を使用してしまっておりますが、誤った使い方です。
そういう場合は map()
を使いましょう。
また、map()
で AnyPublisher<Output, Failure>
を返すような場合は、map()
ではなく flatMap()
としてあげた方が良いです。
申し訳ございません。
CurrentValueSubject
の初期化
失敗例
var subjectA: CurrentValueSubject<Int?, Never>
// CurrentValueSubject<Output, Failure> は初期化が必要
subject.value = 0 // ← いきなり代入するとエラーになる
成功例
var subjectA: CurrentValueSubject<Int?, Never>
subjectA = CurrentValueSubject<Int?, Never>(nil) // nilで初期化
subjectA.value = 1 // 初期化後ならエラーにならない
-
CurrentValueSubject<Output, Failure>
は初期化が必要
sink
の挙動
var subjectB = CurrentValueSubject<Int?, Never>(nil)
subjectB
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
// 出力↓
// receiveValue: nil ← いきなりnilでながれる
-
sink
するといきなり処理が流れる → 初期値が流れる
dropFirst
の挙動
var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
.dropFirst() // ← 追加
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
// 出力↓
// なし ← .dropFirst()のおかげで初回(初期値)のストリームはながれない
-
dropFirst
で初回(初期値)のストリームを流れるのを防ぐ
send
の挙動
var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
.dropFirst()
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
/// sendまたはvalueの代入によってストリームが走る
subjectC.send(1)
subjectC.send(nil)
subjectC.send(2)
subjectC.value = 3
// 出力↓
// receiveValue: Optional(1)
// receiveValue: nil
// receiveValue: Optional(2)
// receiveValue: Optional(3)
-
send
またはvalue
の代入によってストリームが走る
cancel
の挙動
var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
.dropFirst()
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
/// cancel の処理の挙動
subjectC.send(1)
subjectC.send(nil)
cancellables.forEach{ $0.cancel() } // cancel()の実施
subjectC.send(2) // cancel()以降処理しなくなる
subjectC.value = 3 // cancel()以降処理しなくなる
// 出力↓
// receiveValue: Optional(1)
// receiveValue: nil
-
cancel
以降のストリームは流れない
completion: .finished
の挙動
var subjectC = CurrentValueSubject<Int?, Never>(nil)
subjectC
.dropFirst()
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
subjectC.send(1)
subjectC.send(completion: .finished) // completion
subjectC.send(nil) // sinkのcompletionの処理に入ったためsinkは走らない
subjectC.send(2) // sinkのcompletionの処理に入ったためsinkは走らない
subjectC.value = 3 // sinkのcompletionの処理に入ったためsinkは走らない
// 出力↓
// receiveValue: Optional(1)
// finished
-
completion
の処理に入るとストリームに値は走らなくなる
map
の挙動
var subjectD = CurrentValueSubject<Int?, Never>(nil)
subjectD
.dropFirst()
.map { twice(num: $0 ?? 0) } // ← 追加
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
subjectD.send(1)
subjectD.send(nil)
subjectD.value = 2
// 出力
// receiveValue: 2 ← map の処理が適応
// receiveValue: 0 ← nilは $0 ?? 0 によって0に変換されて処理された
// receiveValue: 4 ← map の処理が適応
- 配列操作の
map
とほぼ同じように処理される - 関数を通して値を変換させるのに相性が良い
- 場合に応じて型も変えることができる
compactMap
の挙動
var subjectE = CurrentValueSubject<Int?, Never>(nil)
subjectE
.dropFirst()
.compactMap { $0 } // ← 追加
.map { twice(num: $0 )} // 先にcompactMap()することによってアンラップ処理を省略
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
subjectE.send(1)
subjectE.send(nil) // ← compactMapによりnilは出力なし
subjectE.value = 2
// 出力
// receiveValue: 2 ← オプショナルが外れる
// receiveValue: 4 ← オプショナルが外れる
-
compactMap()
は nil の際に処理を流れなくする -
compactMap { $0 }
は Combine ならではのguard let hoge = hoge else { return }
のようなイメージ
compactMap { $0 }
は書き換えると以下のようになる。
var subjectF = CurrentValueSubject<Int?, Never>(nil)
subjectF
.dropFirst()
.compactMap { num -> Int? in // ← compactMap { $0 } と同等の処理
guard let num = num else {
return nil
}
return num
}
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \(String(describing: $0))")
}
.store(in: &cancellables)
Fail
、Just
、eraseToAnyPublisher
、setFailureType
、flatMap
の合わせ技
var subjectG = CurrentValueSubject<Int?, Never>(nil)
subjectG
.dropFirst()
.flatMap { num -> AnyPublisher<Int, TestError> in // 型を指定
guard let num = num else {
return Fail(error: TestError.nilError) // Fail(error: Error)を返す
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self) // AnyPublisher<Int, TestError> にあわせる
.eraseToAnyPublisher()
}
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectG.send(1)
subjectG.send(nil) // ← nil のため、Fail(error: TestError.nilError) 発生
subjectG.value = 2 // ← 上の処理により sink の completion ブロックに入ったため処理が走らない
// 出力
// receiveValue: 2
// error: nilError
-
Fail
はエラーを返す Publisher(struct Fail<Output, Failure> : Publisher where Failure : Error
) -
Just
はエラーを返さない Publisher(struct Just<Output> : Publisher
) -
.setFailureType(to: Error.self)
によって、Publisher に任意のエラーを付与できる ← これをつかってJust
にエラーを付与することも可能 -
AnyPublisher<Output, Error>
のような publisher を返す場合は.eraseToAnyPublisher()
をつけてあげると型が揃う -
flatMap()
はAnyPublisher<Output, Failure>
を流すストリームを作成する (ここをmap()
にするとAnyPublisher<Output, Failure>
の型であってもストリームは流れない) -
Fail
を検知するとsink
のcompletion
ブロックに入りそれ以降のストリームに流れてきた値は処理しない
errorMap
の挙動
var subjectH = CurrentValueSubject<Int?, Never>(nil)
subjectH
.dropFirst()
.compactMap { num -> AnyPublisher<Int, TestError> in // 型を指定
guard let num = num else {
return Fail(error: TestError.nilError) // Fail(error: Error)を返す
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self) // AnyPublisher<Int, TestError> にあわせる
.eraseToAnyPublisher()
}
.flatMap { $0 } // AnyPublisher<Int, TestError> を Int と TestError に分ける
.map { twice(num: $0) }
.mapError { error -> TestError in
print("before error: \(error)")
return TestError.otherError // すべてTestError.otherErrorに変換する
}
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectH.send(1)
subjectH.send(nil) // ← nil のため、Fail(error: TestError.nilError) 発生
subjectH.value = 2 // ← 上の処理により sink の completion ブロックに入ったため処理が走らない
// 出力
// receiveValue: 2
// before error: nilError
// error: otherError // ← nilError だったエラーが otherError に変更されている
-
map
はOutput
に対してのみ処理をしないし、errorMap
はFailure
にのみ処理を適応する
first
の挙動
var subjectI = CurrentValueSubject<Int?, Never>(nil)
subjectI
.dropFirst()
.first() // ← 追加
.compactMap { num -> AnyPublisher<Int, TestError> in
guard let num = num else {
return Fail(error: TestError.nilError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.mapError { error -> TestError in
print("before error: \(error)")
return TestError.otherError // すべてTestError.otherErrorに変換する
}
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectI.send(1) // ← first()によって1つの処理が終了したらsinkのcompletionブロックに入る
subjectI.send(nil) // ← すでにsinkのcompletionブロックに入っているため処理が走らない
subjectI.value = 2 // ← すでにsinkのcompletionブロックに入っているため処理が走らない
// 出力
// receiveValue: 2
-
first()
はひとつのストリームを受け取ったらcompletion
となる - ひとつめで
completion
するのではなく、N 回目でcompletion
するかをコントロールするにはprefix()
を用いる↓
prefix
の挙動
【補足】 var subjectI = CurrentValueSubject<Int?, Never>(nil)
subjectI
.dropFirst()
.prefix(2) // ← 追加
.compactMap { num -> AnyPublisher<Int, TestError> in
guard let num = num else {
return Fail(error: TestError.nilError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.mapError { error -> TestError in
print("before error: \(error)")
return TestError.otherError // すべてTestError.otherErrorに変換する
}
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectI.send(1) // 1つめ
subjectI.send(2) // 2つめ ← prefix(2)によりこの時点でcompletionする
subjectI.value = 3 // 3つめ ← completionブロックに入ったため処理されない
// 出力
// receiveValue: 2
// receiveValue: 4
// finished
-
prefix()
で、値が来てから N 回目でcompletion
するかをコントロールできる - つまり、
first()
とprefix(1)
は同じ挙動(のはず)
timeout
の挙動
基本形
var subjectJ = CurrentValueSubject<Int?, Never>(nil)
subjectJ
.dropFirst()
.compactMap { num -> AnyPublisher<Int, TestError> in
guard let num = num else {
return Fail(error: TestError.nilError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.timeout( // ← 追加
.seconds(3),
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectJ.send(5) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
subjectJ.send(6) // ← send(5)の2秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectJ.send(7) // ← send(6)の4秒後に実行 ← timeoutが3秒のため実行されない
}
}
// 出力
// receiveValue: 10
// receiveValue: 12 // ← subjectJ.send(5)実行から2秒後
// error: timeoutError // ← subjectJ.send(6)実行から3秒後
-
timeout()
で設定する期間は、値を最初に受け始めてからcompletion
するまでの時間ではなく、ストリームが流れていない期間を指定したこととなる - 一度タイムアウトすると
completion
処理が実行されるため、ストリームは走らなくなる
わざとタイムアウトさせる方法
var subjectK = CurrentValueSubject<Int?, Never>(nil)
subjectK
.compactMap { _ -> Int? in return nil } // ← わざと全てnilに変換する
.compactMap { num -> AnyPublisher<Int, TestError>? in
guard num > 0 else {
return Fail(error: TestError.minusError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.timeout(
.seconds(3),
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectK.send(7)
// 出力
// error: timeoutError
-
.compactMap { _ -> Int? in return nil }
で、わざと全てnil
に変換する処理をcompactMap()
内で行うと、ストリームが流れないので timeout する
タイトル回収
初期値が nil の CurrentValueSubject について nil 以外の値がくるまでのタイムアウトを設定する
うまくいく例
初期値が nil の CurrentValueSubject があるとき、nil 以外の最初の一つ目の値がくるまでタイムアウトを設定するには、以下の順で処理してあげるとうまくいく。
compactMap()
first()
timeout()
var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
.compactMap { $0 } // ← nilを排除
.first() // ← nil 以外の値がくれば completion が呼ばれる
.setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
.timeout(
.seconds(3),
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.compactMap { num -> AnyPublisher<Int, TestError> in
guard num > 0 else {
return Fail(error: TestError.minusError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
subjectL.send(5) // ← その2秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectL.send(6) // ← その4秒後に実行
}
}
// 出力
// receiveValue: 10
// finished ← first()のおかげで 4 秒待つことはなく timeoutError にならない
うまくいかない例1
first()
を抜いてみた場合↓
var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
.compactMap { $0 } // ← nilを排除
// .first() // ← nil 以外の値がくれば completion が呼ばれる
.setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
.timeout(
.seconds(3),
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.compactMap { num -> AnyPublisher<Int, TestError> in
guard num > 0 else {
return Fail(error: TestError.minusError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
subjectL.send(5) // ← その2秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectL.send(6) // ← その4秒後に実行
}
}
// 出力
// receiveValue: 10
// receiveValue: 12
// error: timeoutError
-
first()
がないため、明確にストリームを止めない限りcompletion
のブロックには行かないためタイムアウトエラーとなる
うまくいかない例2
first()
のあとに compactMap()
した場合↓
var subjectL = CurrentValueSubject<Int?, Never>(nil)
subjectL
.dropFirst()
.first() // ← 値がくれば completion が呼ばれる
.compactMap { $0 } // ← nilを排除
.setFailureType(to: TestError.self) // ← timeout の Failure を設定してあげる
.timeout(
.seconds(3),
scheduler: DispatchQueue.main,
options: nil,
customError: { .timeoutError }
)
.compactMap { num -> AnyPublisher<Int, TestError> in
guard num > 0 else {
return Fail(error: TestError.minusError)
.eraseToAnyPublisher()
}
return Just(num)
.setFailureType(to: TestError.self)
.eraseToAnyPublisher()
}
.flatMap { $0 }
.map { twice(num: $0) }
.sink { completion in
switch completion {
case .finished:
print("finished")
case let .failure(error):
print("error: \(error)")
}
} receiveValue: {
print("receiveValue: \($0)")
}
.store(in: &cancellables)
subjectL.send(nil) // ← 即時実行
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) {
subjectL.send(5) // ← その2秒後に実行
DispatchQueue.main.asyncAfter(deadline: .now() + 4.0) {
subjectL.send(6) // ← その4秒後に実行
}
}
// 出力
// finished
-
subjectL.send(nil)
で nil が来て、first()
を通過するので、その時点でcompletion
することが確定し、compactMap { $0 }
で nil が排除されてすぐにcompletion
のブロックに入る
Discussion