🐡

【Swift】Combine Operators

2023/11/04に公開

※ 作成中

一部のPublisher(Publishers.MulticastやTimer.Timerなど)は、最初からConnectable
autoconnectを利用することで、毎回connectを呼び出す必要がなくなる

multicast tryCatch

breakpoint

デバッグ時に便利なオペレータ。購読時、出力時、完了時に条件式を設定し、trueとなった場合に実行を停止し、アラートで知らせる。

import Combine
let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()

subject
    .breakpoint(
        receiveSubscription: nil,
        receiveOutput: { num in num == 0 },
        receiveCompletion: nil)
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)

subject.send(100)
subject.send(200)
subject.send(0)
subject.send(300)
subject.send(400)

// 出力
// 100
// 200
// 0
// ※ 300も出力される場合がある

breakpointOnError

上流のパブリッシャがエラーを投げたとき、実行を停止する
"上流"のパブリッシャがエラーを投げたときなので、breakpointOnErrorオペレータの位置を間違えないようにする

import Combine
let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()
struct SomeError: Error {}

subject
    .tryMap { num in
        guard num != 0 else { throw SomeError() }
        return num * 10
    }
    .breakpointOnError()
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)

subject.send(1)
subject.send(2)

subject.send(0) // ここで止まる

subject.send(3)
subject.send(4)

switchToLatest

切り替え可能なPublisherを提供するって感じかな

import Combine
var cancellables = Set<AnyCancellable>()

let subject = PassthroughSubject<PassthroughSubject<String, Never>, Never>()
let 日本人 = PassthroughSubject<String, Never>()
let アメリカ人 = PassthroughSubject<String, Never>()
let フランス人 = PassthroughSubject<String, Never>()

subject
    .switchToLatest()
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)


日本人.send("こんにちは")
// 初期状態ではsubjectが何も購読していないため無意味

subject.send(日本人)
// subjectが日本人(Publisher)を購読開始

日本人.send("こんにちは")
// 「こんにちは」が出力される

subject.send(アメリカ人)
// subjectがアメリカ人(Publisher)を購読開始

日本人.send("こんにちは")
// この時点でsubjectは日本人(Publisher)を購読していないため無意味

アメリカ人.send("Hello")
// 「Hello」が出力される

subject.send(フランス人)
// subjectがフランス人(Publisher)を購読開始

日本人.send("こんにちは")
アメリカ人.send("Hello")
// subjectがフランス人(Publisher)を購読しているので無意味

フランス人.send("Bonjour")
// 「Bonjour」が出力される

subject.send(日本人)
// 日本人を再度購読

日本人.send("こんにちは")
// 「こんにちは」が再度出力される

prefix(_:)

上流から指定された数の要素を受け取りCompletionを流す

import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<Int, Never>()

subject
    .prefix(3)
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4) // 無視
subject.send(5) // 無視

// 出力
// 1
// 2
// 3
// finished

prefix(while:)

指定する値が条件に合致していれば、その値を下流に流す。条件に合致しない値を受信したらcompletionを流す。

import Combine
var cancellables = Set<AnyCancellable>()
let numberPublisher = [1, 2, 3, 4, 5].publisher

numberPublisher
    .prefix(while: {
        $0 <= 3
    })
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { num in
        print(num)
    })
    .store(in: &cancellables)

// 出力
// 1
// 2
// 3

prefix(untilOutputFrom:)

指定したPublisherが何か値を放出したとき、Completionを流す

import Combine
var cancellables = Set<AnyCancellable>()

let firstSubject = PassthroughSubject<Int, Never>()
let secondSubject = PassthroughSubject<Int, Never>()

firstSubject
    .prefix(untilOutputFrom: secondSubject)
    .print()
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)

firstSubject.send(1)
firstSubject.send(2)

secondSubject.send(999)

firstSubject.send(3) // 無視
firstSubject.send(4) // 無視

merge

複数のパブリッシャーからの出力を,一つのパブリッシャーに統合する
どこかしらのパブリッシャーからエラーが流れてきたらそこで終了

import Combine
var cancellables = Set<AnyCancellable>()
struct SomeError: Error {}

let subject1 = PassthroughSubject<Int, SomeError>()
let subject2 = PassthroughSubject<Int, SomeError>()
let subject3 = PassthroughSubject<Int, SomeError>()
let subject4 = PassthroughSubject<Int, SomeError>()
let subject5 = PassthroughSubject<Int, SomeError>()


subject1
    .merge(with: subject2, subject3, subject4, subject5)
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { num in
        print(num)
    })
    .store(in: &cancellables)

subject1.send(1)
subject2.send(2)
subject3.send(3)
subject4.send(4)
subject5.send(5)

subject1.send(completion: .failure(SomeError()))

subject1.send(1) // 無視
subject2.send(2) // 無視
subject3.send(3) // 無視
subject4.send(4) // 無視
subject5.send(5) // 無視

// 出力
// 1
// 2
// 3
// 4
// 5
// failure(__lldb_expr_189.SomeError())

drop(while:)

dropオペレータは指定した条件に合わなくなるまで、要素を放出しない。条件に合わなくなったあとは全て放出することに注意

import Combine
let publisher = [-3, -2, -1, 0, 1, 2, 3, -100].publisher

publisher
    .drop(while: { num in
        num < 0
    })
    .sink { completion in
        print(completion)
    } receiveValue: { element in
        print(element)
    }

// 0
// 1
// 2
// 3
// -100  負の値だが、一度条件が合わなくなったので放出されている
// finished

drop(untilOutputFrom:)

指定したPublisherが値を放出するまで、下流に何も流さないようにするオペレータ

import Combine
var cancellables = Set<AnyCancellable>()

let mainStream = PassthroughSubject<Int, Never>()
let subStream = PassthroughSubject<Void, Never>()

mainStream
    .drop(untilOutputFrom: subStream)
    .sink { completion in
        print(completion)
    } receiveValue: { num in
        print(num)
    }
    .store(in: &cancellables)

mainStream.send(1)
mainStream.send(2)

subStream.send(())

mainStream.send(3)
mainStream.send(4)

// 出力
// 3
// 4

replaceError

エラーを指定された値に置き換え終了する

import Combine
var cancellables = Set<AnyCancellable>()
struct ZeroError: Error {}
let subject = PassthroughSubject<Int, ZeroError>()

subject
    .tryMap {
        if $0 == 0 {
            throw ZeroError()
        } else {
            return $0
        }
    }
    .replaceError(with: 999)
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { output in
        print(output)
    })
    .store(in: &cancellables)

subject.send(1)
subject.send(2)
subject.send(0)
subject.send(4)
subject.send(5)

// 出力
// 1
// 2
// 999
// finished

replaceNil

nilを受信した時に、指定した値に置き換えてくれる

import Combine
var cancellables = Set<AnyCancellable>()

[1, nil, 3].publisher
    .replaceNil(with: 2)
    .sink(receiveValue: { value in
        print(value)
    })
    .store(in: &cancellables)

// 出力
// Optional(1)
// Optional(2)
// Optional(3)

encode

受け取った要素をエンコードする。エンコードに失敗した場合はエラーを流す。

import Combine
struct User: Codable, Identifiable {
    var id = UUID()
    var name: String
}
var cancellables = Set<AnyCancellable>()

Just(User(name: "John Doe"))
    .encode(encoder: JSONEncoder())
    .sink(
        receiveCompletion: { completion in
            print("completion:", completion)
        },
        receiveValue: { data in
            guard let jsonString = String(data: data, encoding: .utf8),
                  let decodedDataAsUser = try? JSONDecoder().decode(User.self, from: data)
            else { return }
            print("jsonStringは「\(jsonString)」です。")
            print("decodedDataAsUserは「\(decodedDataAsUser)」です。")
        }
    )
    .store(in: &cancellables)

// jsonStringは「{"name":"John Doe","id":"5718544D-4E25-48AB-9D18-48DEC5BE3AE6"}」です。
// decodedDataAsUserは「User(id: 5718544D-4E25-48AB-9D18-48DEC5BE3AE6, name: "John Doe")」です。
// completion: finished

makeConnectable

connectが呼ばれるまでデータの送信を待機できる。

import Combine
var cancellables = Set<AnyCancellable>()
let just = Just<String>("Hello, world.")
let publisher = just.makeConnectable()


DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
    publisher.sink { print("【1】", $0) }.store(in: &cancellables)
}


DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    publisher.sink { print("【2】", $0) }.store(in: &cancellables)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    publisher.connect()
}

//【1】 Hello, world.
//【2】 Hello, world.

multicast

makeConnectableと似てるが、Subjectを介す必要あり。
あとで調べる

import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<Int, Never>()
let numbers = [1, 2, 3, 4, 5].publisher

let multicastPublisher = numbers
    .multicast(subject: subject)

// 複数のサブスクライバーを追加
multicastPublisher
    .sink(receiveValue: { print("【1】\($0)") })
    .store(in: &cancellables)
multicastPublisher
    .sink(receiveValue: { print("【2】\($0)") })
    .store(in: &cancellables)

// パブリッシャーを接続
let connection = multicastPublisher.connect()
// リソースの解放
connection.cancel()

ignoreOutput

上流のパブリッシャーから流れてくる要素を全て無視し、completionを受け取った時のみそれを伝達する。

import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher

numbers
    .ignoreOutput()
    .sink(
        receiveCompletion: { completion in
            print("completion:", completion)
        },
        receiveValue: { value in
            print("value:", value)
        }
    )
    .store(in: &cancellables)

// 出力: Completed

count

上流から流れてくる要素の数を記録し、completion.finishedが流れてきたタイミングで流れてきた要素の総数を放出する。

import Combine

let subject = PassthroughSubject<Int, Never>()

let subscription = subject
    .print()
    .count()
    .sink { completion in
        print(completion)
    } receiveValue: { output in
        print(output)
    }

subject.send(1)
subject.send(1)
subject.send(1)
subject.send(completion: .finished)
subject.send(1)
subject.send(1)

// receive subscription: (PassthroughSubject)
// request unlimited
// receive value: (1)
// receive value: (1)
// receive value: (1)
// receive error: (SomeError())
// failure(__lldb_expr_103.SomeError())

output

特定のインデックス、もしくは特定の範囲を出力するオペレーター。特定のインデックスの放出後、もしくは特定の範囲を出力後にcompletionを流す。

import Combine

let characters = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n"]

characters.publisher
    .output(at: 3)
    .sink { completion in
        print(completion)
    } receiveValue: { character in
        print(character)
    }

// 出力
// d
// finished

print("- - - - - -")

characters.publisher
    .output(in: 3...6)
    .sink { completion in
        print(completion)
    } receiveValue: { output in
        print(output)
    }

// 出力
// d
// e
// f
// g
// finished

print("- - - - - -")

// completionが流されるタイミングに関して

let subject = PassthroughSubject<Int, Never>()

let subscription = subject
    .output(at: 2)
    .sink { completion in
    print(completion)
} receiveValue: { num in
    print(num)
}

subject.send(10) // 何も起こらない
subject.send(20) // 何も起こらない
subject.send(30) // この直後にcompletionが流される
subject.send(40) // 何も起こらない
subject.send(50) // 何も起こらない

// 出力
// 30
// finished

reduce

受信した値を集約することができるオペレーター。scanとの違いは、scanは値を受信するたびに出力があるが、reduceはcompletionを受信したときに集約した値を出力する。

import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher

print("【reduce】")
numbers
    .reduce(0, { accumulator, value in
        return accumulator + value
    })
    .print()
    .sink(receiveValue: { total in
        print("Total is \(total)")
    })
    .store(in: &cancellables)


print("【scan】")
numbers
    .scan(0, { accumulator, value in
        return accumulator + value
    })
    .print()
    .sink(receiveValue: { total in
        print("Total is \(total)")
    })
    .store(in: &cancellables)


// 【reduce】
// receive subscription: (Once)
// request unlimited
// receive value: (15)
// Total is 15
// receive finished
// 【scan】
// receive subscription: ([1, 3, 6, 10, 15])
// request unlimited
// receive value: (1)
// Total is 1
// receive value: (3)
// Total is 3
// receive value: (6)
// Total is 6
// receive value: (10)
// Total is 10
// receive value: (15)
// Total is 15
// receive finished

tryReduce

エラーを投げることのできるreduceオペレーター

enum MyError: Error {
    case zero
}
import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [3, 2, 1, 0].publisher

numbers
    .tryReduce(0) { accumulator, value in
        if value != 0 {
            return accumulator + value
        } else {
            throw MyError.zero
        }
    }
    .sink(
        receiveCompletion: { completion in
            print("completion:", completion)
        },
        receiveValue: { total in
            print("Total is \(total)")
        }
    )
    .store(in: &cancellables)

timeout

指定された時間要素が流れてこなければタイムアウトとし、completion.finishedを流す。

import Combine
var cancellables = Set<AnyCancellable>()
let subject = PassthroughSubject<String, Never>()

subject
    .timeout(3, scheduler: DispatchQueue.global())
    .sink(
        receiveCompletion: { completion in
            print("completion:", completion)
        },
        receiveValue: { value in
            print("value:", value)
        }
    )
    .store(in: &cancellables)


DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
    print("「a」を出力します。")
    subject.send("a")
}

DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
    print("「b」を出力します。")
    subject.send("b")
}

DispatchQueue.global().asyncAfter(deadline: .now() + 10) {
    print("「c」を出力します。")
    subject.send("c")
}

// 「a」を出力します。
// value: a
// 「b」を出力します。
// value: b
// completion: finished
// 「c」を出力します。

zip

2つのパブリッシャーからの要素をタプルにまとめ、下流に流す。3つでもOK
Publsher1とPublisher2があった場合、Publisher1のn番目とPublisher2のn番目がセットで放出される感じ
一つでもcompletionが流れてくるとcompletionを流す

import Combine
var cancellables = Set<AnyCancellable>()

let numbers1 = 1...10
let numbers2 = 97...100

Publishers.Zip(numbers1.publisher, numbers2.publisher)
    .sink { completion in
        print(completion)
//    } receiveValue: { num1, num2 in
//        print(num1, num2)
//    }
    } receiveValue: { output in
        print(output)
    }
    .store(in: &cancellables)

// 出力
// (1, 97)
// (2, 98)
// (3, 99)
// (4, 100)
// finished

// ※ numbers2がcompletionを流したので、finished


print("- - - - - -")

let subject1 = PassthroughSubject<String, Never>()
let subject2 = PassthroughSubject<String, Never>()

Publishers.Zip(subject1, subject2)
    .sink { completion in
        print(completion)
    } receiveValue: { 苗字, 名前 in
        print(苗字, 名前)
    }
    .store(in: &cancellables)

subject1.send("田中")
subject2.send("太郎")
// "田中 太郎"が出力される
subject1.send("高橋")
subject1.send("佐藤")
subject2.send("二郎")
// "高橋 二郎"が出力される
// "佐藤"は次に名前が流れてきたときに使われる

collect

流れてきた要素を配列にまとめてから下流に流す。completionを受信するまで要素を受け取り続け配列を流さないパターンと、指定された数の要素を受け取ったらそれを配列にして流すパターンがある。

import Combine
var cancellbles = Set<AnyCancellable>()

// completionが流れたときに今までの要素を全てまとめ、配列として流す
let subject1 = PassthroughSubject<Int, Never>()
subject1
    .collect()
    .sink { completion in
        print(completion)
    } receiveValue: { output in
        print(output)
    }
    .store(in: &cancellbles)
    
subject1.send(1) // 何も起こらない
subject1.send(2) // 何も起こらない
subject1.send(3) // 何も起こらない
subject1.send(completion: .finished) // この行が実行されたときに[1, 2, 3]が出力される

print("- - - - -")

// 指定された数の分の要素が流れてきたとき、配列にして流す。completionが流れてきたとき、中途半端に要素を持っていれば、それを配列として流し、completionを流す。
let subject2 = PassthroughSubject<Int, Never>()
subject2
    .collect(3)
    .sink { completion in
        print(completion)
    } receiveValue: { output in
        print(output)
    }
    .store(in: &cancellbles)

subject2.send(1) // 何も起こらない
subject2.send(2) // 何も起こらない
subject2.send(3) // 出力:[1, 2, 3]
subject2.send(4) // 何も起こらない
subject2.send(5) // 何も起こらない
subject2.send(6) // 出力:[4, 5, 6]
subject2.send(7) // 何も起こらない
subject2.send(completion: .finished) // この行が実行されたときに[7]が出力される

prepend

Publisherの先頭に要素を付け加える。単一の要素だけでなく、複数な要素、またはPublisherでもOK

import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [3, 4, 5].publisher

numbers
    .prepend(2)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 2
// 3
// 4
// 5

print("- - - - -")

numbers
    .prepend(0, 1, 2)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 0
// 1
// 2
// 3
// 4
// 5

print("- - - - -")

let prefixPublisher = [0, 1, 2].publisher
numbers
    .prepend(prefixPublisher)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 0
// 1
// 2
// 3
// 4
// 5

print("- - - - -")

// あくまで先頭に要素が加わるだけ。要素が流れるたびにprependで指定したものが流れるわけではない。

let subject = PassthroughSubject<Int, Never>()

subject.prepend(100)
    .sink { num in
        print(num)
    }
    .store(in: &cancellables)

subject.send(10)
subject.send(20)
subject.send(30)

// 出力
// 100
// 10
// 20
// 30

append

prependの逆。Publisherの最後に要素を付け加える。単一の要素だけでなく、複数な要素、またはPublisherでもOK。
※ completionが流れてくるまで何も起きない

import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3].publisher

numbers
    .append(4)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 1
// 2
// 3
// 4


print("- - - - -")

numbers
    .append(4, 5, 6)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 1
// 2
// 3
// 4
// 5
// 6

print("- - - - -")

let prefixPublisher = [4, 5, 6].publisher
numbers
    .append(prefixPublisher)
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

// 出力
// 0
// 1
// 2
// 3
// 4
// 5

first

Publisherが発行する最初の要素を出力し、その後completionを流す。where節で条件を指定可能(numbers2)

import Combine
var cancellables = Set<AnyCancellable>()

// 最初の要素を出力後、completionを流す。
let numbers1 = [1, 2, 3, 4, 5].publisher

numbers1
    .first()
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { num in
        print(num)
    })
    .store(in: &cancellables)
// 出力
// 1
// finished


// 条件に合致する最初の要素を出力後、completionを流す
let numbers2 = Array(1...10).publisher

numbers2
    .first(where: { $0 >= 8 })
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { output in
        print(output)
    })
    .store(in: &cancellables)
// 出力
// 8
// finished

last

// completionを受信するまで要素をバッファリングする。copletion受信後、最後に流れてきた要素を下流に流し、completionを流す。
where節も使える

import Combine
var cancellables = Set<AnyCancellable>()
let numbers = [1, 2, 3, 4, 5].publisher

numbers
    .print("前")
    .last()
    .print("後")
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { num in
        print(num)
    })
    .store(in: &cancellables)
    
// 前: receive subscription: ([1, 2, 3, 4, 5])
// 後: receive subscription: (Last)
// 後: request unlimited
// 前: request unlimited
// 前: receive value: (1)
// 前: receive value: (2)
// 前: receive value: (3)
// 前: receive value: (4)
// 前: receive value: (5)
// 前: receive finished
// 後: receive value: (5)
// 5
// 後: receive finished
// finished

dropFirst

指定された数の要素を無視してから下流に要素を流す。

import Combine

let subject = PassthroughSubject<Int, Never>()
var cancellables = Set<AnyCancellable>()

subject.dropFirst(3)
    .sink { value in
        print(value)
    }
    .store(in: &cancellables)

subject.send(1) // 無視
subject.send(2) // 無視
subject.send(3) // 無視
subject.send(4) // 出力される
subject.send(5) // 出力される

filter

シーケンスプロトコルに準拠したものが使用できるfilterと同じと思って問題なさそう

import Combine
let nums = Array(1...100)

nums.publisher
    .filter { num in
        num % 10 == 0
    }
    .sink { num in
        print(num)
    }

subscribe(on:)

subscribe(on:)
サブスクライブ自体がどのスレッドやキューで行われるかを制御
receive(on:)
データの出力や完了イベントがどのスレッドやキューで受け取られるかを制御

import Combine

let publisher = Just("Hello, World!")
    .delay(for: .seconds(3), scheduler: DispatchQueue.global())
    .map { str -> String in
        return str.lowercased()
    }
    .subscribe(on: DispatchQueue.global()) // リクエストの受け取り。ここで指定したキューでサブスクライブ処理を開始
    .receive(on: DispatchQueue.main)       // 結果をメインスレッドで受け取る

let cancellable = publisher
    .sink(receiveValue: { value in
        print(value) // メインスレッドで実行される
    })

share

通常、サブスクライバーが Publisher にサブスクライブすると、そのサブスクライバーごとに新しいデータストリームが開始されます。しかし、share() オペレータを使用すると、すべてのサブスクライバーが同じデータストリームを共有することができます。これは、特にデータの取得にコストがかかる操作(例えば、ネットワークリクエストなど)を行う場合や、サブスクライバーが同じデータセットにアクセスする必要がある場合に有用です。

Publishers.Share は Publishers.Multicast と PassthroughSubject パブリッシャーに暗黙の autoconnect() を組み合わせたもの

Publishers.Shareは他のパブリッシャーと同じ構造体ではなく、クラス

以下のサンプルコードは、appleのやつを少しいじったもの
shareされていることで、Stream1とStream2の値が同じになる
shareオペレータをコメントアウトするとよくわかる

import Combine
var cancellables = Set<AnyCancellable>()


let numberPublisher = (1...3).publisher
    .delay(for: 1, scheduler: DispatchQueue.main)
    .map( { _ in return Int.random(in: 0...100) } )
    .print("Random")
    .share()


numberPublisher
    .sink { print ("Stream 1 received: \($0)")}
    .store(in: &cancellables)
numberPublisher
    .sink { print ("Stream 2 received: \($0)")}
    .store(in: &cancellables)
    
// 出力結果(多少省略)
// Random: receive value: (35)
// Stream 1 received: 35
// Stream 2 received: 35
// Random: receive value: (90)
// Stream 1 received: 90
// Stream 2 received: 90
// Random: receive value: (76)
// Stream 1 received: 76
// Stream 2 received: 76
// Random: receive finished

setFailureType(to:)

Publisherのエラーの型を変更する。以下の例ではNeverからSomeErrorに変更している。

import Combine

struct SomeError: Error {}

let publisher1 = [0, 1, 2, 3, 4, 5].publisher
let publisher2 = CurrentValueSubject<Int, SomeError>(0)

let cancellable = publisher1
    .setFailureType(to: SomeError.self)
    .combineLatest(publisher2)
    .sink(
        receiveCompletion: { print ("completed: \($0)") },
        receiveValue: { print ("value: \($0)")}
     )


// 出力
// value: (5, 0)"

assertNoFailure

エラーがないことを保証させるオペレータ。うまく表現はできないが、fatalErrorみたいなものだと思えばいい

import Combine

struct SomeError: Error {}

let publisher = PassthroughSubject<Int, SomeError>()

publisher
    .assertNoFailure()
    .sink(receiveValue: { value in
        print(value)
    })

publisher.send(1)
publisher.send(2)
publisher.send(completion: .failure(SomeError())) // エラーが生じる

mapError

completion.failureが持つErrorプロトコルに準拠した値を、別のErrorプロトコルに準拠した値に変更できる

import Combine

struct Error1: Error {}
struct Error2: Error {}

let nums = [3, 2, 1, 0]

nums.publisher
    .tryMap { num in
        let num = num * 10
        guard num > 0 else { throw Error1()}
        return num
    }
    .mapError { completion in
        print(completion)
        return Error2()
    }
    .sink { completion in
        switch completion {
        case .finished:
            break
        case .failure(let error):
            print(error)
        }
    } receiveValue: { value in
        print(value)
    }

// 出力結果
// 30
// 20
// 10
// Error1()
// Error2()

handleEvent

receiveSubscription: サブスクリプションが開始されたとき
receiveOutput: 値が発行されるたび
receiveCompletion: シーケンスが完了したとき(成功またはエラー)
receiveCancel: サブスクリプションがキャンセルされたとき
receiveRequest: 新しい値の要求があったとき
これはChatGPT

import Combine

let publisher = PassthroughSubject<Int, Never>()

let cancellable = publisher
    .handleEvents(receiveSubscription: { _ in
        print("Subscribed!")
    }, receiveOutput: { value in
        print("Received value: \(value)")
    }, receiveCompletion: { _ in
        print("Completed!")
    }, receiveCancel: {
        print("Cancelled!")
    }, receiveRequest: { demand in
        print("Demand: \(demand)")
    })
    .sink { value in
        print("sink received value: \(value)")
    }

publisher.send(1)
publisher.send(2)
cancellable.cancel()

print

デバッグ時に便利なオペレータ。流れてきた要素とイベンt(値の発行、完了、エラーなど)をコンソールに出力する

import Combine

let publisher = PassthroughSubject<Int, Never>()

let cancellable = publisher
    .print("Publisher Events")
    .sink { value in
        print("受け取ったvalueは\(value)です。")
    }

publisher.send(1)
publisher.send(2)
publisher.send(completion: .finished)

CombineLatest

複数のPublisherから値を受け取り、一つのセットとして下流へ流す。そのとき、全てのPublisherが一度は値を出力してから出ないと何も流せない。流すのはそれぞれのPublisherの最新のやつ

import Combine

let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()

var cancellables = Set<AnyCancellable>()
var publisher = Publishers.CombineLatest(subject1, subject2)
    .sink { num1, num2 in
        print(num1)
        print(num2)
    }
    .store(in: &cancellables)

subject1.send(1)
subject1.send(2)
// ここまでは何も出力されない

subject2.send(2)
// ここで出力される


//// こっちでもOK
//subject1
//    .combineLatest(subject2)
//    .sink { num1, num2 in
//        print(num1)
//        print(num2)
//    }
//    .store(in: &cancellables)

map

通常のmap

import Combine

let publisher = PassthroughSubject<Int, Never>()
publisher
    .map { $0 * 2 } // 値を2倍にする
    .sink { print($0) }

publisher.send(3) // 6
publisher.send(6) // 12
publisher.send(9) // 18

compactMap

受け取った要素を変換し、その結果がnilでなければアンラップし、要素を下流に出力する。変換結果がnilの場合、その要素は無視される。
以下の例では「apple」がInt型に変換するとnilになるので無視される。

import Combine

let stringPublisher = PassthroughSubject<String, Never>()
stringPublisher
    .compactMap { str in
        let num: Optional<Int> = Int(str)
        return num
    }
    .sink {
        print("「\($0)」を受け取りました。")
    }

stringPublisher.send("5")
stringPublisher.send("apple") // 変換結果がnilなので無視される。
stringPublisher.send("7")

// 出力結果
//「5」を受け取りました。
//「7」を受け取りました。

retry

上流のパブリッシャがエラーを発行したときに、指定された回数だけ再試行する
retry(3)なら3回連続で失敗した時にエラーを投げる
下の例は値が3回連続でアウトな値が流れてきたときにエラーを投げる

import Combine

let faultyPublisher = PassthroughSubject<Int, Error>()
struct ZeroError: Error {}

let cancellable = faultyPublisher
    .tryMap { value -> Int in
        if value == 0 {
            print("値が0です。エラーを投げます。")
            throw ZeroError()
        } else {
            return value
        }
    }
    .retry(3)  // 3回再試行(3回連続で失敗した時にエラーを投げる。)
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("完了")
        case .failure(let error):
            print("エラーを受信:\(error)")
        }
    }, receiveValue: { value in
        print("受信した値:\(value)")
    })

faultyPublisher.send(1)
faultyPublisher.send(0) // ZeroErrorが投げられるが1回目ので問題ない
faultyPublisher.send(3)
faultyPublisher.send(0)
faultyPublisher.send(0)
faultyPublisher.send(0) // 3回連続で失敗したので下流のパブリッシャにエラーが投げられる
faultyPublisher.send(5)

throttle

import SwiftUI
import Combine

struct ContentView: View {
    private let subject1 = PassthroughSubject<Date, Never>()
    private let subject2 = PassthroughSubject<Date, Never>()
    private var cancellables = Set<AnyCancellable>()
    init() {
        subject1
            .throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
            .sink { print($0) }
            .store(in: &cancellables)
        
        subject2
            .throttle(for: 5, scheduler: DispatchQueue.main, latest: false)
            .sink { print($0) }
            .store(in: &cancellables)
    }
    
    var body: some View {
        VStack {
            Button {
                subject1.send(Date())
            } label: {
                Text("latest: true")
            }
            .buttonStyle(.borderedProminent)
            
            
            Button {
                subject2.send(Date())
            } label: {
                Text("latest: false")
            }
            .buttonStyle(.borderedProminent).padding(.top)
        }
    }
}

#Preview {
    ContentView()
}


/// debounceは値を受け取り、指定された時間待って何も流れてこなければ、受け取った値を流す。
/// throttleは値を受け取りすぐに流す。そしてそのあとは、指定した時間休憩。休憩中に値が流れてきたら一つだけ保持して休憩後に値を流す。休憩中に3つの値が流れてきた場合、Latestがtrueならば3番目の値を休憩上がりに流す。latestがfalseならば1番目の値を休憩上がりに流す。
///

decode

流れてきたDataをWeatherData型にデコードしてくれる

import Combine

let openWeatherID = "自分のOpenWeatherID"

struct WeatherData: Decodable {
    struct Weather: Decodable {
        let description: String
    }
    
    struct Main: Decodable {
        let feels_like: Double
    }
    
    let name: String
    let weather: [Weather]
    let main: Main
    
    enum CodingKeys: String, CodingKey {
        case name
        case weather
        case main
    }
}

let url = URL(string: "https://api.openweathermap.org/data/2.5/weather?q=Tokyo,JP&appid=\(openWeatherID)&lang=ja&units=metric")!
var cancellables = Set<AnyCancellable>()

URLSession.shared.dataTaskPublisher(for: url)
//    .map(\.data)
    .map { output in output.data }
    .decode(type: WeatherData.self, decoder: JSONDecoder())
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            break
        case .failure(let error):
            print("Error: \(error)")
        }
    }, receiveValue: { value in
        print("value: \(value)")
    })
    .store(in: &cancellables)

receive(on:)

特定のスケジューラ(DispatchQueue, RunLoop, OperationQueue)で出力を受信可能。UIの更新につながる処理はメインスレッドで行う必要があるので、その時などに使う

import Combine

let numbers = [1, 2, 3, 4, 5].publisher

numbers
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { value in
        print(value)
    })

tryFilter

エラーを投げることのできるfilterオペレータ
以下の例では、流れてきたString型の要素空文字だった場合にエラーを投げる

import Combine
struct EmptyError: Error {}

let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()

passthroughSubject
    .tryFilter { str in
        if !str.isEmpty {
            return true
        } else {
            throw EmptyError()
        }
    }
    .sink { completion in
        print("completion: \(completion)")
    } receiveValue: { output in
        print("output: \(output)")
    }
    .store(in: &cancellables)


passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // completionが流れてしまっているので無意味

catch

catchはリカバリー用のパイプラインに切り替えるため、エラー処理後はcompletionが流れてしまう。

import Combine
struct EmptyError: Error {}

let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()

passthroughSubject
    .tryFilter { str in
        if !str.isEmpty {
            return true
        } else {
            throw EmptyError()
        }
    }
    .catch { _ in
        return Just("Recovery String")
    }
    .sink { completion in
        print("completion: \(completion)")
    } receiveValue: { output in
        print("output: \(output)")
    }
    .store(in: &cancellables)


passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // completionが流れてしまっているので無意味

flatMap

入力として受け取った各要素を新しいPublisherに変換し、それらのPublisherからのすべての要素を単一のストリームにマージして出力
catchオペレータと異なり、複数回のエラーを処理できる。

import Combine
struct EmptyError: Error {}

let passthroughSubject = PassthroughSubject<String, EmptyError>()
var cancellables = Set<AnyCancellable>()

passthroughSubject
    .flatMap { str in
        return Just(str)
            .tryFilter { str in
                if !str.isEmpty {
                    return true
                } else {
                    throw EmptyError()
                }
            }
            .catch { _ in
                return Just("Recovery String")
            }
    }
    .sink { completion in
        print("completion: \(completion)")
    } receiveValue: { output in
        print("output: \(output)")
    }
    .store(in: &cancellables)

passthroughSubject.send("A")
passthroughSubject.send("")
passthroughSubject.send("B") // 問題なく流れる

scan

前回出力した値を保持、使用できるオペレータ
0は初期値、accumulatorは前回出力した値、newValueは新たに流れてきた値

import Combine

let numbersPublisher = PassthroughSubject<Int, Never>()
var cancellables: Set<AnyCancellable> = []

numbersPublisher
    .scan(0) { accumulator, newValue in
        return accumulator + newValue
    }
    .sink(receiveValue: { print($0) })
    .store(in: &cancellables)

numbersPublisher.send(1) // 出力: 1 (0 + 1)
numbersPublisher.send(2) // 出力: 3 (1 + 2)
numbersPublisher.send(3) // 出力: 6 (3 + 3)

eraseToPublisher

SubjectをPublisherとして公開可能
Sublisherの実装を隠蔽可能

import Combine

// ①SubjectをPublisherとして公開可能
class Test {
    private let subject = PassthroughSubject<Int, Never>()
    var publisher: AnyPublisher<Int, Never> { subject.eraseToAnyPublisher() }
}


// ②Publisherの実装を隠蔽可能
let publisher1 = Array(1...10).publisher
    .print()
    .removeDuplicates()
    .retry(2)
    .debounce(for: 1, scheduler: DispatchQueue.main)
    .map { _ in return true }
    .filter { output in output }
    
print(type(of: publisher1))
// 出力結果
// Filter<Map<Debounce<Retry<RemoveDuplicates<Print<Sequence<Array<Int>, Never>>>>, OS_dispatch_queue>, Bool>>

let publisher2 = Array(1...10).publisher
    .print()
    .removeDuplicates()
    .retry(2)
    .debounce(for: 1, scheduler: DispatchQueue.main)
    .map { _ in return true }
    .filter { output in output }
    .eraseToAnyPublisher()
    
print(type(of: publisher2))
// 出力結果
// AnyPublisher<Bool, Never>

tryMap

変換が失敗した場合にエラーを投げることのできるmap

import Combine
struct ZeroError: Error {}

let numbers = [1, 2, 0, 4, 5]

numbers.publisher
    .tryMap { number in
        if number != 0 {
            return number * 10
        } else {
            throw ZeroError()
        }
    }
    .sink { completion in
        switch completion {
        case .finished:
            print("finished")
        case .failure(let error):
            print("error:", error)
        }
    } receiveValue: { num in
        print("num:", num)
    }

debounce

短時間の大量のAPIリクエストはスパム行為と捉えられる可能性があるため、debounceが便利

ChatGPTの説明が良かった
debounceオペレータは、Publisherから送信される値の間隔を制御します。指定された期間中に新しい値が送信されない場合に限り、最後に送信された値がDownstreamに伝播されます。このオペレータは、高頻度のイベントや更新をまとめて、そのフリークエンシーを制限するのに役立ちます。

class SearchViewModel: ObservableObject {
    @Published var query: String = ""
    private var cancellables: Set<AnyCancellable> = []

    init() {
        $query
            .debounce(for: .seconds(0.5), scheduler: DispatchQueue.main)
            .sink { searchText in
                // ここでAPIリクエストなどのアクションを実行
                print("Searching for \(searchText)")
            }
            .store(in: &cancellables)
    }
}

removeDuplicates

重複はOK。連続で重複はNG

import Combine

let values = [1, 2, 3, 3, 3, 4, 5].publisher

values
    .removeDuplicates()
    .sink(receiveValue: { print($0) })

// 出力: 1, 2, 3, 4, 5
import Combine

enum Position {
    case sales, accounting, engineering, administrative
}

struct Person {
    let name: String
    let position: Position
}

let people = [
    Person(name: "A", position: .accounting),
    Person(name: "B", position: .engineering),
    Person(name: "C", position: .engineering), // 出力されない
    Person(name: "D", position: .sales),
    Person(name: "E", position: .accounting),
    Person(name: "F", position: .accounting), // 出力されない
    Person(name: "G", position: .sales)
].publisher

people
    .removeDuplicates(by: { (prev, current) in
        prev.position == current.position
    })
    .sink(receiveValue: { print($0.name) })

// 出力結果
// A
// B
// D
// E
// G

delay

その名の通り遅延を行う。Int型の配列のPublisherなどだとわかりにくいので、SwiftUIでサンプル
ボタン押下後1.5秒でHello, worldが出力される

import SwiftUI
import Combine

struct ContentView: View {
    private let subject = PassthroughSubject<String, Never>()
    private var cancellables = Set<AnyCancellable>()
    init() {
        subject
            .delay(for: .seconds(1.5), scheduler: DispatchQueue.global())
            .sink { print($0) }
            .store(in: &cancellables)
    }
    
    var body: some View {
        Button {
            subject.send("Hello, world.")
        } label: {
            Text("Send")
        }
        .buttonStyle(.borderedProminent)
    }
}

#Preview {
    ContentView()
}

※ 間違ってる場所がありましたらコメントをお願いします。

Discussion