【Swift】Combine Operators
※ 作成中
一部のPublisher(Publishers.MulticastやTimer.Timerなど)は、最初からConnectable
autoconnectを利用することで、毎回connectを呼び出す必要がなくなる
multicast tryCatch
breakpoint
デバッグ時に便利なオペレータ。購読時、出力時、完了時に条件式を設定し、trueとなった場合に実行を停止し、アラートで知らせる。
import Combine
let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()
subject
.breakpoint(
receiveSubscription: nil,
receiveOutput: { num in num == 0 },
receiveCompletion: nil)
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
subject.send(100)
subject.send(200)
subject.send(0)
subject.send(300)
subject.send(400)
// 出力
// 100
// 200
// 0
// ※ 300も出力される場合がある
breakpointOnError
上流のパブリッシャがエラーを投げたとき、実行を停止する
"上流"のパブリッシャがエラーを投げたときなので、breakpointOnErrorオペレータの位置を間違えないようにする
import Combine
let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()
struct SomeError: Error {}
subject
.tryMap { num in
guard num != 0 else { throw SomeError() }
return num * 10
}
.breakpointOnError()
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
subject.send(1)
subject.send(2)
subject.send(0) // ここで止まる
subject.send(3)
subject.send(4)
switchToLatest
切り替え可能なPublisherを提供するって感じかな
import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<PassthroughSubject<String, Never>, Never>()
let 日本人 = PassthroughSubject<String, Never>()
let アメリカ人 = PassthroughSubject<String, Never>()
let フランス人 = PassthroughSubject<String, Never>()
subject
.switchToLatest()
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
日本人.send("こんにちは")
// 初期状態ではsubjectが何も購読していないため無意味
subject.send(日本人)
// subjectが日本人(Publisher)を購読開始
日本人.send("こんにちは")
// 「こんにちは」が出力される
subject.send(アメリカ人)
// subjectがアメリカ人(Publisher)を購読開始
日本人.send("こんにちは")
// この時点でsubjectは日本人(Publisher)を購読していないため無意味
アメリカ人.send("Hello")
// 「Hello」が出力される
subject.send(フランス人)
// subjectがフランス人(Publisher)を購読開始
日本人.send("こんにちは")
アメリカ人.send("Hello")
// subjectがフランス人(Publisher)を購読しているので無意味
フランス人.send("Bonjour")
// 「Bonjour」が出力される
subject.send(日本人)
// 日本人を再度購読
日本人.send("こんにちは")
// 「こんにちは」が再度出力される
prefix(_:)
上流から指定された数の要素を受け取りCompletionを流す
import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<Int, Never>()
subject
.prefix(3)
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4) // 無視
subject.send(5) // 無視
// 出力
// 1
// 2
// 3
// finished
prefix(while:)
指定する値が条件に合致していれば、その値を下流に流す。条件に合致しない値を受信したらcompletionを流す。
import Combine
var cancellables = Set<AnyCancellable>()
let numberPublisher = [1, 2, 3, 4, 5].publisher
numberPublisher
.prefix(while: {
$0 <= 3
})
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { num in
print(num)
})
.store(in: &cancellables)
// 出力
// 1
// 2
// 3
prefix(untilOutputFrom:)
指定したPublisherが何か値を放出したとき、Completionを流す
import Combine
var cancellables = Set<AnyCancellable>()
let firstSubject = PassthroughSubject<Int, Never>()
let secondSubject = PassthroughSubject<Int, Never>()
firstSubject
.prefix(untilOutputFrom: secondSubject)
.print()
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
firstSubject.send(1)
firstSubject.send(2)
secondSubject.send(999)
firstSubject.send(3) // 無視
firstSubject.send(4) // 無視
merge
複数のパブリッシャーからの出力を,一つのパブリッシャーに統合する
どこかしらのパブリッシャーからエラーが流れてきたらそこで終了
import Combine
var cancellables = Set<AnyCancellable>()
struct SomeError: Error {}
let subject1 = PassthroughSubject<Int, SomeError>()
let subject2 = PassthroughSubject<Int, SomeError>()
let subject3 = PassthroughSubject<Int, SomeError>()
let subject4 = PassthroughSubject<Int, SomeError>()
let subject5 = PassthroughSubject<Int, SomeError>()
subject1
.merge(with: subject2, subject3, subject4, subject5)
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { num in
print(num)
})
.store(in: &cancellables)
subject1.send(1)
subject2.send(2)
subject3.send(3)
subject4.send(4)
subject5.send(5)
subject1.send(completion: .failure(SomeError()))
subject1.send(1) // 無視
subject2.send(2) // 無視
subject3.send(3) // 無視
subject4.send(4) // 無視
subject5.send(5) // 無視
// 出力
// 1
// 2
// 3
// 4
// 5
// failure(__lldb_expr_189.SomeError())
drop(while:)
dropオペレータは指定した条件に合わなくなるまで、要素を放出しない。条件に合わなくなったあとは全て放出することに注意
import Combine
let publisher = [-3, -2, -1, 0, 1, 2, 3, -100].publisher
publisher
.drop(while: { num in
num < 0
})
.sink { completion in
print(completion)
} receiveValue: { element in
print(element)
}
// 0
// 1
// 2
// 3
// -100 負の値だが、一度条件が合わなくなったので放出されている
// finished
drop(untilOutputFrom:)
指定したPublisherが値を放出するまで、下流に何も流さないようにするオペレータ
import Combine
var cancellables = Set<AnyCancellable>()
let mainStream = PassthroughSubject<Int, Never>()
let subStream = PassthroughSubject<Void, Never>()
mainStream
.drop(untilOutputFrom: subStream)
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
.store(in: &cancellables)
mainStream.send(1)
mainStream.send(2)
subStream.send(())
mainStream.send(3)
mainStream.send(4)
// 出力
// 3
// 4
replaceError
エラーを指定された値に置き換え終了する
import Combine
var cancellables = Set<AnyCancellable>()
struct ZeroError: Error {}
let subject = PassthroughSubject<Int, ZeroError>()
subject
.tryMap {
if $0 == 0 {
throw ZeroError()
} else {
return $0
}
}
.replaceError(with: 999)
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { output in
print(output)
})
.store(in: &cancellables)
subject.send(1)
subject.send(2)
subject.send(0)
subject.send(4)
subject.send(5)
// 出力
// 1
// 2
// 999
// finished
replaceNil
nilを受信した時に、指定した値に置き換えてくれる
import Combine
var cancellables = Set<AnyCancellable>()
[1, nil, 3].publisher
.replaceNil(with: 2)
.sink(receiveValue: { value in
print(value)
})
.store(in: &cancellables)
// 出力
// Optional(1)
// Optional(2)
// Optional(3)
encode
受け取った要素をエンコードする。エンコードに失敗した場合はエラーを流す。
import Combine
struct User: Codable, Identifiable {
var id = UUID()
var name: String
}
var cancellables = Set<AnyCancellable>()
Just(User(name: "John Doe"))
.encode(encoder: JSONEncoder())
.sink(
receiveCompletion: { completion in
print("completion:", completion)
},
receiveValue: { data in
guard let jsonString = String(data: data, encoding: .utf8),
let decodedDataAsUser = try? JSONDecoder().decode(User.self, from: data)
else { return }
print("jsonStringは「\(jsonString)」です。")
print("decodedDataAsUserは「\(decodedDataAsUser)」です。")
}
)
.store(in: &cancellables)
// jsonStringは「{"name":"John Doe","id":"5718544D-4E25-48AB-9D18-48DEC5BE3AE6"}」です。
// decodedDataAsUserは「User(id: 5718544D-4E25-48AB-9D18-48DEC5BE3AE6, name: "John Doe")」です。
// completion: finished
makeConnectable
connectが呼ばれるまでデータの送信を待機できる。
import Combine
var cancellables = Set<AnyCancellable>()
let just = Just<String>("Hello, world.")
let publisher = just.makeConnectable()
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
publisher.sink { print("【1】", $0) }.store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
publisher.sink { print("【2】", $0) }.store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
publisher.connect()
}
//【1】 Hello, world.
//【2】 Hello, world.
multicast
makeConnectableと似てるが、Subjectを介す必要あり。
あとで調べる
import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<Int, Never>()
let numbers = [1, 2, 3, 4, 5].publisher
let multicastPublisher = numbers
.multicast(subject: subject)
// 複数のサブスクライバーを追加
multicastPublisher
.sink(receiveValue: { print("【1】\($0)") })
.store(in: &cancellables)
multicastPublisher
.sink(receiveValue: { print("【2】\($0)") })
.store(in: &cancellables)
// パブリッシャーを接続
let connection = multicastPublisher.connect()
// リソースの解放
connection.cancel()
ignoreOutput
上流のパブリッシャーから流れてくる要素を全て無視し、completionを受け取った時のみそれを伝達する。
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher
numbers
.ignoreOutput()
.sink(
receiveCompletion: { completion in
print("completion:", completion)
},
receiveValue: { value in
print("value:", value)
}
)
.store(in: &cancellables)
// 出力: Completed
count
上流から流れてくる要素の数を記録し、completion.finishedが流れてきたタイミングで流れてきた要素の総数を放出する。
import Combine
let subject = PassthroughSubject<Int, Never>()
let subscription = subject
.print()
.count()
.sink { completion in
print(completion)
} receiveValue: { output in
print(output)
}
subject.send(1)
subject.send(1)
subject.send(1)
subject.send(completion: .finished)
subject.send(1)
subject.send(1)
// receive subscription: (PassthroughSubject)
// request unlimited
// receive value: (1)
// receive value: (1)
// receive value: (1)
// receive error: (SomeError())
// failure(__lldb_expr_103.SomeError())
output
特定のインデックス、もしくは特定の範囲を出力するオペレーター。特定のインデックスの放出後、もしくは特定の範囲を出力後にcompletionを流す。
import Combine
let characters = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n"]
characters.publisher
.output(at: 3)
.sink { completion in
print(completion)
} receiveValue: { character in
print(character)
}
// 出力
// d
// finished
print("- - - - - -")
characters.publisher
.output(in: 3...6)
.sink { completion in
print(completion)
} receiveValue: { output in
print(output)
}
// 出力
// d
// e
// f
// g
// finished
print("- - - - - -")
// completionが流されるタイミングに関して
let subject = PassthroughSubject<Int, Never>()
let subscription = subject
.output(at: 2)
.sink { completion in
print(completion)
} receiveValue: { num in
print(num)
}
subject.send(10) // 何も起こらない
subject.send(20) // 何も起こらない
subject.send(30) // この直後にcompletionが流される
subject.send(40) // 何も起こらない
subject.send(50) // 何も起こらない
// 出力
// 30
// finished
reduce
受信した値を集約することができるオペレーター。scanとの違いは、scanは値を受信するたびに出力があるが、reduceはcompletionを受信したときに集約した値を出力する。
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher
print("【reduce】")
numbers
.reduce(0, { accumulator, value in
return accumulator + value
})
.print()
.sink(receiveValue: { total in
print("Total is \(total)")
})
.store(in: &cancellables)
print("【scan】")
numbers
.scan(0, { accumulator, value in
return accumulator + value
})
.print()
.sink(receiveValue: { total in
print("Total is \(total)")
})
.store(in: &cancellables)
// 【reduce】
// receive subscription: (Once)
// request unlimited
// receive value: (15)
// Total is 15
// receive finished
// 【scan】
// receive subscription: ([1, 3, 6, 10, 15])
// request unlimited
// receive value: (1)
// Total is 1
// receive value: (3)
// Total is 3
// receive value: (6)
// Total is 6
// receive value: (10)
// Total is 10
// receive value: (15)
// Total is 15
// receive finished
tryReduce
エラーを投げることのできるreduceオペレーター
enum MyError: Error {
case zero
}
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [3, 2, 1, 0].publisher
numbers
.tryReduce(0) { accumulator, value in
if value != 0 {
return accumulator + value
} else {
throw MyError.zero
}
}
.sink(
receiveCompletion: { completion in
print("completion:", completion)
},
receiveValue: { total in
print("Total is \(total)")
}
)
.store(in: &cancellables)
timeout
指定された時間要素が流れてこなければタイムアウトとし、completion.finishedを流す。
import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<String, Never>()
subject
.timeout(3, scheduler: DispatchQueue.global())
.sink(
receiveCompletion: { completion in
print("completion:", completion)
},
receiveValue: { value in
print("value:", value)
}
)
.store(in: &cancellables)
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
print("「a」を出力します。")
subject.send("a")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
print("「b」を出力します。")
subject.send("b")
}
DispatchQueue.global().asyncAfter(deadline: .now() + 10) {
print("「c」を出力します。")
subject.send("c")
}
// 「a」を出力します。
// value: a
// 「b」を出力します。
// value: b
// completion: finished
// 「c」を出力します。
zip
2つのパブリッシャーからの要素をタプルにまとめ、下流に流す。3つでもOK
Publsher1とPublisher2があった場合、Publisher1のn番目とPublisher2のn番目がセットで放出される感じ
一つでもcompletionが流れてくるとcompletionを流す
import Combine
var cancellables = Set<AnyCancellable>()
let numbers1 = 1...10
let numbers2 = 97...100
Publishers.Zip(numbers1.publisher, numbers2.publisher)
.sink { completion in
print(completion)
// } receiveValue: { num1, num2 in
// print(num1, num2)
// }
} receiveValue: { output in
print(output)
}
.store(in: &cancellables)
// 出力
// (1, 97)
// (2, 98)
// (3, 99)
// (4, 100)
// finished
// ※ numbers2がcompletionを流したので、finished
print("- - - - - -")
let subject1 = PassthroughSubject<String, Never>()
let subject2 = PassthroughSubject<String, Never>()
Publishers.Zip(subject1, subject2)
.sink { completion in
print(completion)
} receiveValue: { 苗字, 名前 in
print(苗字, 名前)
}
.store(in: &cancellables)
subject1.send("田中")
subject2.send("太郎")
// "田中 太郎"が出力される
subject1.send("高橋")
subject1.send("佐藤")
subject2.send("二郎")
// "高橋 二郎"が出力される
// "佐藤"は次に名前が流れてきたときに使われる
collect
流れてきた要素を配列にまとめてから下流に流す。completionを受信するまで要素を受け取り続け配列を流さないパターンと、指定された数の要素を受け取ったらそれを配列にして流すパターンがある。
import Combine
var cancellbles = Set<AnyCancellable>()
// completionが流れたときに今までの要素を全てまとめ、配列として流す
let subject1 = PassthroughSubject<Int, Never>()
subject1
.collect()
.sink { completion in
print(completion)
} receiveValue: { output in
print(output)
}
.store(in: &cancellbles)
subject1.send(1) // 何も起こらない
subject1.send(2) // 何も起こらない
subject1.send(3) // 何も起こらない
subject1.send(completion: .finished) // この行が実行されたときに[1, 2, 3]が出力される
print("- - - - -")
// 指定された数の分の要素が流れてきたとき、配列にして流す。completionが流れてきたとき、中途半端に要素を持っていれば、それを配列として流し、completionを流す。
let subject2 = PassthroughSubject<Int, Never>()
subject2
.collect(3)
.sink { completion in
print(completion)
} receiveValue: { output in
print(output)
}
.store(in: &cancellbles)
subject2.send(1) // 何も起こらない
subject2.send(2) // 何も起こらない
subject2.send(3) // 出力:[1, 2, 3]
subject2.send(4) // 何も起こらない
subject2.send(5) // 何も起こらない
subject2.send(6) // 出力:[4, 5, 6]
subject2.send(7) // 何も起こらない
subject2.send(completion: .finished) // この行が実行されたときに[7]が出力される
prepend
Publisherの先頭に要素を付け加える。単一の要素だけでなく、複数な要素、またはPublisherでもOK
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [3, 4, 5].publisher
numbers
.prepend(2)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 2
// 3
// 4
// 5
print("- - - - -")
numbers
.prepend(0, 1, 2)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 0
// 1
// 2
// 3
// 4
// 5
print("- - - - -")
let prefixPublisher = [0, 1, 2].publisher
numbers
.prepend(prefixPublisher)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 0
// 1
// 2
// 3
// 4
// 5
print("- - - - -")
// あくまで先頭に要素が加わるだけ。要素が流れるたびにprependで指定したものが流れるわけではない。
let subject = PassthroughSubject<Int, Never>()
subject.prepend(100)
.sink { num in
print(num)
}
.store(in: &cancellables)
subject.send(10)
subject.send(20)
subject.send(30)
// 出力
// 100
// 10
// 20
// 30
append
prependの逆。Publisherの最後に要素を付け加える。単一の要素だけでなく、複数な要素、またはPublisherでもOK。
※ completionが流れてくるまで何も起きない
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3].publisher
numbers
.append(4)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 1
// 2
// 3
// 4
print("- - - - -")
numbers
.append(4, 5, 6)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 1
// 2
// 3
// 4
// 5
// 6
print("- - - - -")
let prefixPublisher = [4, 5, 6].publisher
numbers
.append(prefixPublisher)
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
// 出力
// 0
// 1
// 2
// 3
// 4
// 5
first
Publisherが発行する最初の要素を出力し、その後completionを流す。where節で条件を指定可能(numbers2)
import Combine
var cancellables = Set<AnyCancellable>()
// 最初の要素を出力後、completionを流す。
let numbers1 = [1, 2, 3, 4, 5].publisher
numbers1
.first()
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { num in
print(num)
})
.store(in: &cancellables)
// 出力
// 1
// finished
// 条件に合致する最初の要素を出力後、completionを流す
let numbers2 = Array(1...10).publisher
numbers2
.first(where: { $0 >= 8 })
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { output in
print(output)
})
.store(in: &cancellables)
// 出力
// 8
// finished
last
// completionを受信するまで要素をバッファリングする。copletion受信後、最後に流れてきた要素を下流に流し、completionを流す。
where節も使える
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher
numbers
.print("前")
.last()
.print("後")
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { num in
print(num)
})
.store(in: &cancellables)
// 前: receive subscription: ([1, 2, 3, 4, 5])
// 後: receive subscription: (Last)
// 後: request unlimited
// 前: request unlimited
// 前: receive value: (1)
// 前: receive value: (2)
// 前: receive value: (3)
// 前: receive value: (4)
// 前: receive value: (5)
// 前: receive finished
// 後: receive value: (5)
// 5
// 後: receive finished
// finished
dropFirst
指定された数の要素を無視してから下流に要素を流す。
import Combine
let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()
subject.dropFirst(3)
.sink { value in
print(value)
}
.store(in: &cancellables)
subject.send(1) // 無視
subject.send(2) // 無視
subject.send(3) // 無視
subject.send(4) // 出力される
subject.send(5) // 出力される
filter
シーケンスプロトコルに準拠したものが使用できるfilterと同じと思って問題なさそう
import Combine
let nums = Array(1...100)
nums.publisher
.filter { num in
num % 10 == 0
}
.sink { num in
print(num)
}
subscribe(on:)
subscribe(on:)
サブスクライブ自体がどのスレッドやキューで行われるかを制御
receive(on:)
データの出力や完了イベントがどのスレッドやキューで受け取られるかを制御
import Combine
let publisher = Just("Hello, World!")
.delay(for: .seconds(3), scheduler: DispatchQueue.global())
.map { str -> String in
return str.lowercased()
}
.subscribe(on: DispatchQueue.global()) // リクエストの受け取り。ここで指定したキューでサブスクライブ処理を開始
.receive(on: DispatchQueue.main) // 結果をメインスレッドで受け取る
let cancellable = publisher
.sink(receiveValue: { value in
print(value) // メインスレッドで実行される
})
share
通常、サブスクライバーが Publisher にサブスクライブすると、そのサブスクライバーごとに新しいデータストリームが開始されます。しかし、share() オペレータを使用すると、すべてのサブスクライバーが同じデータストリームを共有することができます。これは、特にデータの取得にコストがかかる操作(例えば、ネットワークリクエストなど)を行う場合や、サブスクライバーが同じデータセットにアクセスする必要がある場合に有用です。
Publishers.Share は Publishers.Multicast と PassthroughSubject パブリッシャーに暗黙の autoconnect() を組み合わせたもの
Publishers.Shareは他のパブリッシャーと同じ構造体ではなく、クラス
以下のサンプルコードは、appleのやつを少しいじったもの
shareされていることで、Stream1とStream2の値が同じになる
shareオペレータをコメントアウトするとよくわかる
import Combine
var cancellables = Set<AnyCancellable>()
let numberPublisher = (1...3).publisher
.delay(for: 1, scheduler: DispatchQueue.main)
.map( { _ in return Int.random(in: 0...100) } )
.print("Random")
.share()
numberPublisher
.sink { print ("Stream 1 received: \($0)")}
.store(in: &cancellables)
numberPublisher
.sink { print ("Stream 2 received: \($0)")}
.store(in: &cancellables)
// 出力結果(多少省略)
// Random: receive value: (35)
// Stream 1 received: 35
// Stream 2 received: 35
// Random: receive value: (90)
// Stream 1 received: 90
// Stream 2 received: 90
// Random: receive value: (76)
// Stream 1 received: 76
// Stream 2 received: 76
// Random: receive finished
setFailureType(to:)
Publisherのエラーの型を変更する。以下の例ではNeverからSomeErrorに変更している。
import Combine
struct SomeError: Error {}
let publisher1 = [0, 1, 2, 3, 4, 5].publisher
let publisher2 = CurrentValueSubject<Int, SomeError>(0)
let cancellable = publisher1
.setFailureType(to: SomeError.self)
.combineLatest(publisher2)
.sink(
receiveCompletion: { print ("completed: \($0)") },
receiveValue: { print ("value: \($0)")}
)
// 出力
// value: (5, 0)"
assertNoFailure
エラーがないことを保証させるオペレータ。うまく表現はできないが、fatalErrorみたいなものだと思えばいい
import Combine
struct SomeError: Error {}
let publisher = PassthroughSubject<Int, SomeError>()
publisher
.assertNoFailure()
.sink(receiveValue: { value in
print(value)
})
publisher.send(1)
publisher.send(2)
publisher.send(completion: .failure(SomeError())) // エラーが生じる
mapError
completion.failureが持つErrorプロトコルに準拠した値を、別のErrorプロトコルに準拠した値に変更できる
import Combine
struct Error1: Error {}
struct Error2: Error {}
let nums = [3, 2, 1, 0]
nums.publisher
.tryMap { num in
let num = num * 10
guard num > 0 else { throw Error1()}
return num
}
.mapError { completion in
print(completion)
return Error2()
}
.sink { completion in
switch completion {
case .finished:
break
case .failure(let error):
print(error)
}
} receiveValue: { value in
print(value)
}
// 出力結果
// 30
// 20
// 10
// Error1()
// Error2()
handleEvent
receiveSubscription: サブスクリプションが開始されたとき
receiveOutput: 値が発行されるたび
receiveCompletion: シーケンスが完了したとき(成功またはエラー)
receiveCancel: サブスクリプションがキャンセルされたとき
receiveRequest: 新しい値の要求があったとき
これはChatGPT
import Combine
let publisher = PassthroughSubject<Int, Never>()
let cancellable = publisher
.handleEvents(receiveSubscription: { _ in
print("Subscribed!")
}, receiveOutput: { value in
print("Received value: \(value)")
}, receiveCompletion: { _ in
print("Completed!")
}, receiveCancel: {
print("Cancelled!")
}, receiveRequest: { demand in
print("Demand: \(demand)")
})
.sink { value in
print("sink received value: \(value)")
}
publisher.send(1)
publisher.send(2)
cancellable.cancel()
デバッグ時に便利なオペレータ。流れてきた要素とイベンt(値の発行、完了、エラーなど)をコンソールに出力する
import Combine
let publisher = PassthroughSubject<Int, Never>()
let cancellable = publisher
.print("Publisher Events")
.sink { value in
print("受け取ったvalueは\(value)です。")
}
publisher.send(1)
publisher.send(2)
publisher.send(completion: .finished)
CombineLatest
複数のPublisherから値を受け取り、一つのセットとして下流へ流す。そのとき、全てのPublisherが一度は値を出力してから出ないと何も流せない。流すのはそれぞれのPublisherの最新のやつ
import Combine
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()
var publisher = Publishers.CombineLatest(subject1, subject2)
.sink { num1, num2 in
print(num1)
print(num2)
}
.store(in: &cancellables)
subject1.send(1)
subject1.send(2)
// ここまでは何も出力されない
subject2.send(2)
// ここで出力される
//// こっちでもOK
//subject1
// .combineLatest(subject2)
// .sink { num1, num2 in
// print(num1)
// print(num2)
// }
// .store(in: &cancellables)
map
通常のmap
import Combine
let publisher = PassthroughSubject<Int, Never>()
publisher
.map { $0 * 2 } // 値を2倍にする
.sink { print($0) }
publisher.send(3) // 6
publisher.send(6) // 12
publisher.send(9) // 18
compactMap
受け取った要素を変換し、その結果がnilでなければアンラップし、要素を下流に出力する。変換結果がnilの場合、その要素は無視される。
以下の例では「apple」がInt型に変換するとnilになるので無視される。
import Combine
let stringPublisher = PassthroughSubject<String, Never>()
stringPublisher
.compactMap { str in
let num: Optional<Int> = Int(str)
return num
}
.sink {
print("「\($0)」を受け取りました。")
}
stringPublisher.send("5")
stringPublisher.send("apple") // 変換結果がnilなので無視される。
stringPublisher.send("7")
// 出力結果
//「5」を受け取りました。
//「7」を受け取りました。
retry
上流のパブリッシャがエラーを発行したときに、指定された回数だけ再試行する
retry(3)なら3回連続で失敗した時にエラーを投げる
下の例は値が3回連続でアウトな値が流れてきたときにエラーを投げる
import Combine
let faultyPublisher = PassthroughSubject<Int, Error>()
struct ZeroError: Error {}
let cancellable = faultyPublisher
.tryMap { value -> Int in
if value == 0 {
print("値が0です。エラーを投げます。")
throw ZeroError()
} else {
return value
}
}
.retry(3) // 3回再試行(3回連続で失敗した時にエラーを投げる。)
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("完了")
case .failure(let error):
print("エラーを受信:\(error)")
}
}, receiveValue: { value in
print("受信した値:\(value)")
})
faultyPublisher.send(1)
faultyPublisher.send(0) // ZeroErrorが投げられるが1回目ので問題ない
faultyPublisher.send(3)
faultyPublisher.send(0)
faultyPublisher.send(0)
faultyPublisher.send(0) // 3回連続で失敗したので下流のパブリッシャにエラーが投げられる
faultyPublisher.send(5)
throttle
import SwiftUI
import Combine
struct ContentView: View {
private let subject1 = PassthroughSubject<Date, Never>()
private let subject2 = PassthroughSubject<Date, Never>()
private var cancellables = Set<AnyCancellable>()
init() {
subject1
.throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
.sink { print($0) }
.store(in: &cancellables)
subject2
.throttle(for: 5, scheduler: DispatchQueue.main, latest: false)
.sink { print($0) }
.store(in: &cancellables)
}
var body: some View {
VStack {
Button {
subject1.send(Date())
} label: {
Text("latest: true")
}
.buttonStyle(.borderedProminent)
Button {
subject2.send(Date())
} label: {
Text("latest: false")
}
.buttonStyle(.borderedProminent).padding(.top)
}
}
}
#Preview {
ContentView()
}
/// debounceは値を受け取り、指定された時間待って何も流れてこなければ、受け取った値を流す。
/// throttleは値を受け取りすぐに流す。そしてそのあとは、指定した時間休憩。休憩中に値が流れてきたら一つだけ保持して休憩後に値を流す。休憩中に3つの値が流れてきた場合、Latestがtrueならば3番目の値を休憩上がりに流す。latestがfalseならば1番目の値を休憩上がりに流す。
///
decode
流れてきたDataをWeatherData型にデコードしてくれる
import Combine
let openWeatherID = "自分のOpenWeatherID"
struct WeatherData: Decodable {
struct Weather: Decodable {
let description: String
}
struct Main: Decodable {
let feels_like: Double
}
let name: String
let weather: [Weather]
let main: Main
enum CodingKeys: String, CodingKey {
case name
case weather
case main
}
}
let url = URL(string: "https://api.openweathermap.org/data/2.5/weather?q=Tokyo,JP&appid=\(openWeatherID)&lang=ja&units=metric")!
var cancellables = Set<AnyCancellable>()
URLSession.shared.dataTaskPublisher(for: url)
// .map(\.data)
.map { output in output.data }
.decode(type: WeatherData.self, decoder: JSONDecoder())
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
break
case .failure(let error):
print("Error: \(error)")
}
}, receiveValue: { value in
print("value: \(value)")
})
.store(in: &cancellables)
receive(on:)
特定のスケジューラ(DispatchQueue, RunLoop, OperationQueue)で出力を受信可能。UIの更新につながる処理はメインスレッドで行う必要があるので、その時などに使う
import Combine
let numbers = [1, 2, 3, 4, 5].publisher
numbers
.receive(on: DispatchQueue.main)
.sink(receiveValue: { value in
print(value)
})
tryFilter
エラーを投げることのできるfilterオペレータ
以下の例では、流れてきたString型の要素空文字だった場合にエラーを投げる
import Combine
struct EmptyError: Error {}
let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()
passthroughSubject
.tryFilter { str in
if !str.isEmpty {
return true
} else {
throw EmptyError()
}
}
.sink { completion in
print("completion: \(completion)")
} receiveValue: { output in
print("output: \(output)")
}
.store(in: &cancellables)
passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // completionが流れてしまっているので無意味
catch
catchはリカバリー用のパイプラインに切り替えるため、エラー処理後はcompletionが流れてしまう。
import Combine
struct EmptyError: Error {}
let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()
passthroughSubject
.tryFilter { str in
if !str.isEmpty {
return true
} else {
throw EmptyError()
}
}
.catch { _ in
return Just("Recovery String")
}
.sink { completion in
print("completion: \(completion)")
} receiveValue: { output in
print("output: \(output)")
}
.store(in: &cancellables)
passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // completionが流れてしまっているので無意味
flatMap
入力として受け取った各要素を新しいPublisherに変換し、それらのPublisherからのすべての要素を単一のストリームにマージして出力
catchオペレータと異なり、複数回のエラーを処理できる。
import Combine
struct EmptyError: Error {}
let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()
passthroughSubject
.flatMap { str in
return Just(str)
.tryFilter { str in
if !str.isEmpty {
return true
} else {
throw EmptyError()
}
}
.catch { _ in
return Just("Recovery String")
}
}
.sink { completion in
print("completion: \(completion)")
} receiveValue: { output in
print("output: \(output)")
}
.store(in: &cancellables)
passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // 問題なく流れる
scan
前回出力した値を保持、使用できるオペレータ
0は初期値、accumulatorは前回出力した値、newValueは新たに流れてきた値
import Combine
let numbersPublisher = PassthroughSubject<Int, Never>()
var cancellables: Set<AnyCancellable> = []
numbersPublisher
.scan(0) { accumulator, newValue in
return accumulator + newValue
}
.sink(receiveValue: { print($0) })
.store(in: &cancellables)
numbersPublisher.send(1) // 出力: 1 (0 + 1)
numbersPublisher.send(2) // 出力: 3 (1 + 2)
numbersPublisher.send(3) // 出力: 6 (3 + 3)
eraseToPublisher
SubjectをPublisherとして公開可能
Sublisherの実装を隠蔽可能
import Combine
// ①SubjectをPublisherとして公開可能
class Test {
private let subject = PassthroughSubject<Int, Never>()
var publisher: AnyPublisher<Int, Never> { subject.eraseToAnyPublisher() }
}
// ②Publisherの実装を隠蔽可能
let publisher1 = Array(1...10).publisher
.print()
.removeDuplicates()
.retry(2)
.debounce(for: 1, scheduler: DispatchQueue.main)
.map { _ in return true }
.filter { output in output }
print(type(of: publisher1))
// 出力結果
// Filter<Map<Debounce<Retry<RemoveDuplicates<Print<Sequence<Array<Int>, Never>>>>, OS_dispatch_queue>, Bool>>
let publisher2 = Array(1...10).publisher
.print()
.removeDuplicates()
.retry(2)
.debounce(for: 1, scheduler: DispatchQueue.main)
.map { _ in return true }
.filter { output in output }
.eraseToAnyPublisher()
print(type(of: publisher2))
// 出力結果
// AnyPublisher<Bool, Never>
tryMap
変換が失敗した場合にエラーを投げることのできるmap
import Combine
struct ZeroError: Error {}
let numbers = [1, 2, 0, 4, 5]
numbers.publisher
.tryMap { number in
if number != 0 {
return number * 10
} else {
throw ZeroError()
}
}
.sink { completion in
switch completion {
case .finished:
print("finished")
case .failure(let error):
print("error:", error)
}
} receiveValue: { num in
print("num:", num)
}
debounce
短時間の大量のAPIリクエストはスパム行為と捉えられる可能性があるため、debounceが便利
ChatGPTの説明が良かった
debounceオペレータは、Publisherから送信される値の間隔を制御します。指定された期間中に新しい値が送信されない場合に限り、最後に送信された値がDownstreamに伝播されます。このオペレータは、高頻度のイベントや更新をまとめて、そのフリークエンシーを制限するのに役立ちます。
class SearchViewModel: ObservableObject {
@Published var query: String = ""
private var cancellables: Set<AnyCancellable> = []
init() {
$query
.debounce(for: .seconds(0.5), scheduler: DispatchQueue.main)
.sink { searchText in
// ここでAPIリクエストなどのアクションを実行
print("Searching for \(searchText)")
}
.store(in: &cancellables)
}
}
removeDuplicates
重複はOK。連続で重複はNG
import Combine
let values = [1, 2, 3, 3, 3, 4, 5].publisher
values
.removeDuplicates()
.sink(receiveValue: { print($0) })
// 出力: 1, 2, 3, 4, 5
import Combine
enum Position {
case sales, accounting, engineering, administrative
}
struct Person {
let name: String
let position: Position
}
let people = [
Person(name: "A", position: .accounting),
Person(name: "B", position: .engineering),
Person(name: "C", position: .engineering), // 出力されない
Person(name: "D", position: .sales),
Person(name: "E", position: .accounting),
Person(name: "F", position: .accounting), // 出力されない
Person(name: "G", position: .sales)
].publisher
people
.removeDuplicates(by: { (prev, current) in
prev.position == current.position
})
.sink(receiveValue: { print($0.name) })
// 出力結果
// A
// B
// D
// E
// G
delay
その名の通り遅延を行う。Int型の配列のPublisherなどだとわかりにくいので、SwiftUIでサンプル
ボタン押下後1.5秒でHello, worldが出力される
import SwiftUI
import Combine
struct ContentView: View {
private let subject = PassthroughSubject<String, Never>()
private var cancellables = Set<AnyCancellable>()
init() {
subject
.delay(for: .seconds(1.5), scheduler: DispatchQueue.global())
.sink { print($0) }
.store(in: &cancellables)
}
var body: some View {
Button {
subject.send("Hello, world.")
} label: {
Text("Send")
}
.buttonStyle(.borderedProminent)
}
}
#Preview {
ContentView()
}
※ 間違ってる場所がありましたらコメントをお願いします。
Discussion