【翻訳】Parallel Programming with Swift — Part 2/4
この記事は、Swiftによる並列プログラミングのパート2です。パート1に引き続き、Dispatch Queueとシステムが提供するキューについて調べた。今回は、タスクを定義する別の方法と、GCDが提供する強力なAPIに焦点を当てます。
このシリーズの全てのパートをご覧になりたい方は、こちらをクリックしてください。
Concurrency & GCD — Parallel Programming with Swift — Part 1/4
アジェンダ
Target Queue
DispatchGroup
DispatchWorkItem
DispatchBarrier
DispatchSemaphore
DispatchSources
1.Target Queue
カスタムディスパッチキューは、いかなる作業も実行しません。デフォルトでは、カスタムディスパッチキューのターゲットキューは、デフォルト優先度のグローバルキューです。Swift 3 以降、ディスパッチキューは一度アクティブになると、それ以上変更することができません。カスタムキューのターゲットキューは、setTarget(queue:) 関数によって設定することができます。アクティブ化されたキューにターゲットを設定すると、コンパイル時にエラーになります。幸いなことに、 DispatchQueue イニシャライザは他の引数も受け付けます。何らかの理由で、すでに作成されたキューにターゲットを設定する必要がある場合は、iOS 10以降で利用可能な initiallyInactive 属性を使用することで、それを行うことができます。
DispatchQueue(label: "queue", attributes: .initiallyInactive)
そうすることで、アクティベートするまでの間、それを修正することができます。
DispatchQueueクラスのactivate()メソッドは、タスクを実行させます。キューは並行処理としてマークされていないので、直列順序で実行されます。キューを並行処理にするには、以下のように指定する必要があります。
DispatchQueue(label: "queue", attributes: [.initiallyInactive, .concurrent])
ターゲットキューには、サイクルを生成しない限り、他のディスパッチキューであっても、他のカスタムキューであっても渡すことができます。この関数は、ターゲットキューを別のグローバルキューに設定するだけで、カスタムキューの優先度を設定するために使用することができます。ブロックを実行できるのは、グローバルな同時実行キューとメインキューだけです。他のすべてのキューは、(最終的には)これらの特別なキューのいずれかをターゲットとしなければなりません。
ターゲット・キューについては、Dispatch APIや使用しているフレームワークでお気づきかもしれません。私はRxSwiftライブラリでターゲット・キューの使い方を見つけました。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var value: Int = 2
let serialQueue = DispatchQueue(label: "serialQueue")
let concurrentQueue = DispatchQueue(label: "concurrentQueue", attributes: [.initiallyInactive, .concurrent])
concurrentQueue.setTarget(queue: serialQueue)
concurrentQueue.activate()
concurrentQueue.async {
for j in 0...4 {
value = j
print("\(value) ✡️")
}
}
concurrentQueue.async {
for j in 5...7 {
value = j
print("\(value) ✴️")
}
}
ターゲット・キューの使用
・ カスタム・キューのターゲットを低優先度のグローバル・キューに設定した場合、カスタム・キュー上の全ての作業は低優先度で実行され、高優先度のグローバル・キューでも同様に実行されます。
・ カスタムキューのターゲットをメインキューに設定します。これにより、そのカスタムキューに投入された全てのブロックがメインスレッド上で実行されることになります。単にメインキューを直接使用する代わりにこれを行うことの利点は、カスタムキューを独立して中断・再開することができ、その後グローバルキューに再ターゲットできる可能性があることです。
・ カスタムキューを他のカスタムキューの対象とします。これは、複数のキューが互いに対してシリアライズされることを強制し、本質的に、対象となるキューを一時停止/再開することによって、すべてのキューを一緒に一時停止/再開することができるキューのグループを作成します。
現在のキュー名を取得
APIは、呼び出しがメインスレッドにあるかどうかをチェックする関数を提供してくれます。しかし、現在のキュー名を取得する方法はありません。以下のスニペットを使用すると、現在のキュー名を任意の場所に表示できます。
extension DispatchQueue {
static var currentLabel: String? {
let name = __dispatch_queue_get_label(nil)
return String(cString: name, encoding: .utf8)
}
}
2.DispatchGroup
ディスパッチ・グループを使うと、複数のタスクをグループ化して、それらの完了を待ったり、完了したら通知を受けたりすることができます。タスクは非同期でも同期でもよく、異なるキューで実行することもできます。ディスパッチグループは DispatchGroup オブジェクトによって管理されます。
いくつかの画像をダウンロードする必要があり、全ての画像のダウンロードが完了したらユーザーに通知したい、というシナリオを考えてみましょう。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let concurrentQueue = DispatchQueue(label: "com.queue.Concurrent", attributes: .concurrent)
func performAsyncTaskIntoConcurrentQueue(with completion: @escaping () -> ()) {
concurrentQueue.async {
for i in 1...5 {
if Thread.isMainThread {
print("\(i) task running in main thread")
} else{
print("\(i) task running in other thread")
}
concurrentQueue.async {
let imageURL = URL(string: "https://upload.wikimedia.org/wikipedia/commons/0/07/Huge_ball_at_Vilnius_center.jpg")!
let _ = try! Data(contentsOf: imageURL)
print("\(i) finished downloading")
}
}
DispatchQueue.main.async {
completion()
}
}
}
print("###### Download all images asynchronously and notify on completion ######")
print("############")
print("############\n")
performAsyncTaskIntoConcurrentQueue(with: {
print("\n############")
print("############")
print("###### All images are downloaded")
})
この実装では、タスクが非同期に実行されるため、完了時に通知を受け取ることが難しい。そこで DispatchGroup が登場します。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let concurrentQueue = DispatchQueue(label: "com.queue.Concurrent", attributes: .concurrent)
func performAsyncTaskIntoConcurrentQueue(with completion: @escaping () -> ()) {
let group = DispatchGroup()
for i in 1...5 {
group.enter()
concurrentQueue.async {
let imageURL = URL(string: "https://upload.wikimedia.org/wikipedia/commons/0/07/Huge_ball_at_Vilnius_center.jpg")!
let _ = try! Data(contentsOf: imageURL)
print("###### Image \(i) Downloaded ######")
group.leave()
}
}
/* Either write below code or group.notify() to execute completion block
group.wait()
DispatchQueue.main.async {
completion()
}
*/
group.notify(queue: DispatchQueue.main) {
completion()
}
}
print("###### Download all images asynchronously and notify on completion ######")
print("############")
print("############\n")
performAsyncTaskIntoConcurrentQueue(with: {
print("\n############")
print("############")
print("###### All images are downloaded")
})
DispatchGroup は、作業の集約的な同期を可能にします。これは、複数の異なる作業項目やブロックを投入し、それらが異なるキューで実行されていたとしても、それらがすべて完了したときに追跡するために使用することができます。必要なのは、タスクを同期させるために、ディスパッチグループ上で enter() と leave() をバランスよくコールすることだけです。 enter() を呼び出して、タスクが開始されたことをグループに手動で通知し、leave を呼び出して、作業が完了したことを通知します。 group.wait() も呼び出せます。 group.wait() は、グループのタスクが完了するまで現在のスレッドをブロックします。完了ブロックを呼び出すには2つの方法があります。
-
wait() を使用し、メイン・キューで完了ブロックを実行します。
-
group notify() を呼び出します。
-
wait(timeout:)。これは現在のスレッドをブロックしますが、指定されたタイムアウトの後は、とにかく続行します。DispatchTime 型のタイムアウト・オブジェクトを作成するには、.now() + 1 という構文を使うと、今から1秒後のタイムアウトが作成されます。
-
wait(timeout:) は、グループが完了したのかタイムアウトしたのかを判断するために使用できる列挙型を返します。
その他の使用例
・ 2つの異なるネットワークコールを実行する必要があります。両方の呼び出しが返された後に、レスポンスを解析するために必要なデータが得られます。
・ アニメーションが、長いデータベース呼び出しと並行して実行されている。その両方が終了したら、ローディング・スピナーを非表示にしたい。
・ 使用しているネットワークAPIは速すぎます。更新ジェスチャへのプルが機能しているように見えますが、機能していません。API 呼び出しがあまりにも速く返されるため、更新コントロールは外観のアニメーションが終わるとすぐにそれ自体を終了してしまいます。つまり、リフレッシュ・コントロールを非表示にする前に、ある最小限の時間しきい値とネットワーク呼び出しの両方を待つことができます。
2.DispatchWorkItem
GCD に関するよくある誤解のひとつに、「一度スケジュールしたタスクはキャンセルできないので、 Operation API を使う必要がある」というものがあります。iOS 8 と macOS 10.10 で DispatchWorkItem が導入され、使いやすい API でこの機能を提供するようになりました。
DispatchWorkItemは、実行可能な作業をカプセル化します。ワークアイテムは、 DispatchQueue や DispatchGroup 内にディスパッチすることができます。 DispatchWorkItem は、 DispatchSource イベント、登録、またはキャンセル ハンドラとして設定することもできます。
言い換えれば、 DispatchWorkItem は、任意のキューにディスパッチできるコードのブロックをカプセル化します。
ディスパッチワークアイテムは、キャンセルフラグを持っています。実行前にキャンセルされた場合、ディスパッチキューはそれを実行せず、スキップします。実行中にキャンセルされた場合、 cancel プロパティは true を返します。この場合、実行を中止することができます。また、ワークアイテムはタスクが完了したときにキューに通知することができます。
注意: GCD はプリエンプティブなキャンセルは行いません。すでに開始されたワークアイテムを停止させるには、自分でキャンセルをテストする必要があります。
ワークアイテムを即座に実行したくない場合は、 wait 関数があります。実行を遅らせるための時間間隔を渡すことができます。
OOPS、 DispatchWorkItem には2種類のwait関数があります。どちらを使えばいいのか?迷いましたか?
func wait(timeout: DispatchTime) -> DispatchTimeoutResult
func wait(wallTimeout: DispatchWallTime) -> DispatchTimeoutResult
DispatchTime は基本的にデバイスの時計に合わせた時間であり、デバイスがスリープすれば時計もスリープします。完璧な組み合わせです。
しかし、DispatchWallTime は壁掛け時計の時刻であり、壁掛け時計は全く眠りません。
実行後、func notify(qos.DispatchQo = default)を呼び出すことで、同じキューや他のキューに通知することができます:DispatchQoS = default, flags:DispatchWorkItemFlags = default, queue:DispatchQueue, execute:エスケープ () -> Void)
DispatchQoSクラス は DispatchQueue と連動し、重要度に応じてタスクを分類するのに役立ちます。最も優先順位の高いタスクが最初に実行されます。しかし、優先順位の低いタスクは、より少ないリソースとエネルギーしか必要としないため、後で実行されます。これにより、アプリケーションの応答性とエネルギー効率が向上します。
DispatchWorkItemFlags は基本的に、DispatchWorkItem の動作をカスタマイズできるユニークなオプションのセットです。指定したフラグの値によって、新しいスレッドを作成するか、バリアを作成する必要があるかを決定します。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let concurrentQueue = DispatchQueue(label: "com.queue.Concurrent", attributes: .concurrent)
func performAsyncTaskInConcurrentQueue() {
var task:DispatchWorkItem?
task = DispatchWorkItem {
for i in 1...5 {
if Thread.isMainThread {
print("task running in main thread")
} else{
print("task running in other thread")
}
if (task?.isCancelled)! {
break
}
let imageURL = URL(string: "https://upload.wikimedia.org/wikipedia/commons/0/07/Huge_ball_at_Vilnius_center.jpg")!
let _ = try! Data(contentsOf: imageURL)
print("\(i) finished downloading")
}
task = nil
}
/*
There are two ways to execute task on queue. Either by providing task to execute parameter or
within async block call perform() on task. perform() executes task on current queue.
*/
// concurrentQueue.async(execute: task!)
concurrentQueue.async {
task?.wait(wallTimeout: .now() + .seconds(2))
// task?.wait(timeout: .now() + .seconds(2))
task?.perform()
}
concurrentQueue.asyncAfter(deadline: .now() + .seconds(2), execute: {
task?.cancel()
})
task?.notify(queue: concurrentQueue) {
print("\n############")
print("############")
print("###### Work Item Completed")
}
}
performAsyncTaskInConcurrentQueue()
print("###### Download all images asynchronously and notify on completion ######")
print("############")
print("############\n")
1つのキューは2秒後にタスクをキャンセルしており、for ループ で isCancelled のチェックを行うため、画像ダウンロード・タスクが中断されます。
3.DispatchBarrier
シングルトンでよくある懸念として、シングルトンがスレッドセーフでないことがあります。シングルトンは、複数のコントローラから同時にシングルトンインスタンスにアクセスされることが多いからです。スレッドセーフなコードは、データの破損などの問題を起こすことなく、並行タスクから安全に呼び出すことができます。スレッドセーフでないコードは、一度に1つのコンテキストでしか実行できません。
考慮すべきスレッドセーフのケースは2つあります。シングルトンインスタンスの初期化中と、インスタンスへの読み書き中です。
初期化は、Swift が静的変数を初期化する方法のため、簡単なケースであることが分かります。静的変数は最初にアクセスされたときに初期化され、初期化はアトミックであることが保証されます。
クリティカルセクションは、同時に実行してはならないコードの一部です。これは通常、コードが変数のような共有リソースを操作するためで、並行プロセスからアクセスされると破損する可能性があります。
グローバルキューや .concurrent 属性で作成されていないキューに投入された場合、バリアブロックはasync()/sync()API で投入されたブロックと同じ動作をします。
ある問題を考えてみましょう。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var value: Int = 2
let concurrentQueue = DispatchQueue(label: "queue", attributes: .concurrent)
concurrentQueue.async {
for i in 0...3 {
value = i
print("\(value) ✴️")
}
}
concurrentQueue.async {
for j in 4...6 {
value = j
print("\(value) ✡️")
}
}
concurrentQueue.async {
value = 9
print(value)
}
concurrentQueue.async {
value = 14
print(value)
}
この例では、複数のスレッドが同時に value 変数 を操作しようとしており、そのために間違った値が出力されています。
これは競合状態、あるいは古典的なリーダ・ライタの問題で、多くのキュー上のブロックが変更可能な値を変更しようとしています。この結果、ブロックに間違った値が表示されます。 Swift は、 DispatchBarrier を使用して読み取り/書き込みロックを作成するエレガントなソリューションを提供します。
ディスパッチバリアによって、並行ディスパッチキュー内に同期ポイントを作成することができます。通常の動作では、キューは通常の並行キューと同じように動作します。しかし、バリアが実行されている時は、キューはシリアルキューとして動作します。バリアが終了すると、キューは通常の同時実行キューに戻ります。
GCDは、どのコードブロックがバリア呼び出しの前にキューに投入されたかを記録し、それらが全て完了したときに、渡されたバリアブロックを呼び出します。また、キューに投入されたブロックは、バリアブロックが完了するまで実行されません。しかし、バリアコールはすぐに戻り、このブロックを非同期に実行します。
技術的には、 DispatchWorkItem やブロックをディスパッチキューに投入するとき、その特定の時間に指定されたキューで実行される唯一の項目であるべきであることを示すフラグを設定します。この DispatchWorkItem が実行される前に、 ディスパッチバリアに先行してキューに投入されたすべての項目が完了しなければなりません。バリアが実行されるとき、それは実行される唯一のタスクであり、キューはその間他のタスクを実行しません。バリアが終了すると、キューはデフォルトの動作に戻ります。
キューがシリアルキューやグローバル並行キューの一つである場合、 バリアは動作しません。カスタム並行キューでバリアを使用することは、コードの重要な領域でスレッドセーフを扱うのに良い選択です。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var value: Int = 2
let concurrentQueue = DispatchQueue(label: "queue", attributes: .concurrent)
concurrentQueue.async(flags: .barrier) {
for i in 0...3 {
value = i
print("\(value) ✴️")
}
}
concurrentQueue.async {
print(value)
}
concurrentQueue.async(flags: .barrier) {
for j in 4...6 {
value = j
print("\(value) ✡️")
}
}
concurrentQueue.async {
value = 14
print(value)
}
4.DispatchSemaphore
マルチスレッド・プログラミングでは、スレッドを待たせることが重要です。スレッドはリソースへの排他的アクセスを待たなければなりません。スレッドを待たせ、カーネル内でスリープさせることで、スレッドにCPU時間を取らせないようにする方法の1つが、セマフォです。セマフォは1960年代初頭にダイクストラによって発明されました。
セマフォは、複数のスレッドによる共有リソースへのアクセスを制御する能力を与えてくれます。共有リソースとは、変数であったり、画像をURLからダウンロードしたり、データベースから読み込んだりといったタスクであったりりします。
セマフォは、スレッドキューとカウンタ値(Int型)から構成されます。
スレッドキューは、セマフォがFIFO順序で待機しているスレッドを追跡するために使用されます。
カウンタ値は、スレッドが共有リソースにアクセスできるかどうかを決定するためにセマフォが使用します。カウンタ値は、signal() や wait() 関数を呼び出すと変化します。
共有リソースを要求する
共有リソースを使用する前に、毎回 wait() を呼び出します。基本的には、共有リソースが利用可能かどうかをセマフォに問い合わせます。利用可能でなければ待ちます。
共有リソースを解放する
共有リソースを使用したら、その都度 signal() を呼び出します。基本的には、共有リソースとのやりとりが終わったことをセマフォに通知します。
wait() を呼び出すと、以下の処理を実行します。
・ セマフォ・カウンタを1デクリメントします。
・ 結果の値が0より小さい場合、スレッドはブロックされ、待機状態になります。
・ 結果の値が0より大きいか等しい場合、コードは待たずに実行されます。
signal() を呼び出すと、以下の処理が実行されます。
・ セマフォ・カウンタを1インクリメントします。
・ 前の値が0より小さければ、スレッド・キューで現在待機しているスレッドのブロックを解除します。
・ 前の値が0より大きいか等しい場合は、スレッド・キューが空で、誰も待っていないことを意味します。
下の画像はセマフォの完璧な例です。
セマフォを単一のキューで実装してみましょう
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var value: Int = 2
let concurrentQueue = DispatchQueue(label: "queue", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 1)
for j in 0...4 {
concurrentQueue.async {
print("\(j) waiting")
semaphore.wait()
print("\(j) wait finished")
value = j
print("\(value) ✡️")
print("\(j) Done with assignment")
semaphore.signal()
}
}
上のコードでは、最初に wait() が呼ばれ、次に signal() がセマフォに対して呼ばれていることにお気づきでしょうか。
画像のダウンロード中にセマフォを使用する別の例を見てみましょう。
セマフォがどのように機能するかを理解したところで、アプリにとってより現実的なシナリオを考えてみましょう。
まず、画像をダウンロードするコードのブロックを実行するために使用する並行キューを作成します。
次に、一度に多くのCPU時間を消費しないように、一度に2枚の画像をダウンロードすることにしました。
第三に、forループを使って6回反復します。各繰り返しでは、次のようにします: wait() → 画像のダウンロード → signal()
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let concurrentQueue = DispatchQueue(label: "com.queue.Concurrent", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 2)
func performAsyncTaskIntoConcurrentQueue() {
for i in 1...6 {
concurrentQueue.async {
print("###### Image \(i) waiting for download ######")
semaphore.wait()
print("###### Downloading Image \(i) ######")
let imageURL = URL(string: "https://upload.wikimedia.org/wikipedia/commons/0/07/Huge_ball_at_Vilnius_center.jpg")!
let _ = try! Data(contentsOf: imageURL)
print("###### Image \(i) Downloaded ######")
semaphore.signal()
}
}
}
print("###### Download all images asynchronously ######")
print("############")
print("############\n")
performAsyncTaskIntoConcurrentQueue()
理解を深めるために、セマフォ・カウンターを追跡してみましょう。
・ 2 (初期値)
・ 1 (画像1待ち。値 >= 0なので、画像ダウンロード開始)
・ 0 (画像2待ち、値 >= 0なので、画像ダウンロード開始)
・ -1 (画像 3 の待ち、値 < 0 なので、キューに追加)
・ -2(画像4の待ち、値 < 0、キューに追加)
・ -3 (画像 5 の待ち, 値 < 0 なので, キューに追加)
・ -4(画像6の待ち、値 < 0 以降、キューに追加)
・ -3 (画像1シグナル、最後の値 < 0、画像3をウェイクアップしてキューからポップ)
・ - 2 (画像2シグナル、最後の値 < 0、画像4をウェイクアップしてキューからポップ)
・ -1 (画像3信号、最後の値 < 0、画像5をウェイクアップしてキューからポップ)
・ 0 (画像4シグナル、最後の値 < 0、画像6をウェイクアップしてキューからポップ)
このシーケンスから、1つのスレッドがシーケンスを実行し始めると、もう1つのスレッドは最初のスレッドが終了するまで待たなければならないことが分かります。2番目のスレッドがシーケンスのどの時点で wait() リクエストを送信するかは関係なく、もう一方のスレッドが終了するまで常に待機しなければなりません。
同じ優先順位のスレッド間でしかセマフォを使用しない方がいいです。
DispatchSources
ディスパッチ・ソースは、イベント・ハンドラを使用して、カーネル・シグナルやシステム、ファイル、ソケット関連のイベントなど、システム・レベルの非同期イベントを処理する便利な方法です。
ディスパッチ・ソースは、以下のタイプのシステム・イベントを監視するために使用できます。
・ タイマー・ディスパッチ・ソース:定期的な通知(DispatchSourceTimer)の生成に使用されます。
・ シグナル・ディスパッチ・ソース:UNIX シグナルの処理に使用されます (DispatchSourceSignal)。
・ メモリ・ディスパッチ・ソース(Memory Dispatch Sources):メモリ使用状況に関する通知の登録に使用されます (DispatchSourceMemoryPressure)。
・ ディスクリプタ ディスパッチ ソース:ディスクリプターソースは、以下のような様々なファイルやソケットベースの操作に関連する通知を送信します:
- データが読み込み可能になったときのシグナル
- データの書き込みが可能になったときのシグナル
- ファイルの削除、移動、名前の変更
- ファイルのメタ情報の変更
(DispatchSourceFileSystemObject、DispatchSourceRead、DispatchSourceWrite)。
これにより、「ライブ編集」機能を持つ開発者ツールを簡単に構築することができます。
・ プロセスのディスパッチソース:外部プロセスの実行状態に関連するイベントを監視するために使用されます (DispatchSourceProcess)。プロセス関連のイベント
- プロセスの終了
- プロセスが fork または exec タイプのコールを発行する
- シグナルがプロセスに送られた場合。
・ マッハ関連のディスパッチソース:MachカーネルのIPC機能に関連するイベントを処理するために使用されます(DispatchSourceMachReceive、DispatchSourceMachSend)。
・ 私たちがトリガーするカスタムイベント:DispatchSourceProtocolに準拠することで、カスタムディスパッチソースを定義できます。
いくつかの使用例を見てみましょう。
・ NSTimerはメインスレッドで実行され、メインスレッドの実行ループが必要です。NSTimer
はメインスレッドで実行され、メイン実行ループを実行する必要があります。このような場合、DispatchSourceTimerを使うことができます。ディスパッチタイマーソースは、時間間隔が完了したときにイベントを発生させ、同じキューで事前に設定されたコールバックを発生させます。もっと読む....
・ ファイルの変更を監視する必要がある場合、DispatchSource Descriptor を使用する簡単な方法があります。
注意
いったんディスパッチタスクの実行が開始されると、タスク/キュー/ワークアイテムをキャンセルしたり中断したりしても、そのタスクは停止しません。キャンセルや一時停止の操作は、まだコールされていないタスクにのみ影響します。この操作を行う必要がある場合は、タスクの実行中に適切なタイミングでキャンセルされた状態を手動でチェックし、必要であればタスクを終了する必要があります。
【翻訳元の記事】
Parallel Programming with Swift — Part 2/4
Discussion