Swift Concurrency: 同期関数で非同期処理を待ちたい
Swift Concurrencyは便利なんですが、いったん関数をasyncにしてしまうと、もう同期関数から同期的に呼ぶということができなくなってしまうという問題があります(ないですか?)
もうちょっと具体的に。
asyncとマークされた関数は、asyncとマークされた関数か、同期関数内で生成されたTask
のクロージャからしか呼べません。同期関数側はTask
のクロージャの完了を待ってくれず、Task
を生成したら即座に帰って行ってしまいます。何だったらクロージャの開始すら待ちません。
ちょっとasyncってマークしただけなのに…。もう別世界に行ってしまったのね…
って感じなんですよね。
例えば、asyncなfuncA
とfuncB
を並列に呼び出し、両方の結果をawait
して、その結果を使って何かを行う。
その間、呼び出し元は待つ。
それだけがしたいんですよね。
呼び出し元もasyncだと、話は簡単です。
func funcX() async {
NSLog("funcX start")
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
NSLog("funcX finish")
}
呼び出し側をasyncにしない場合は以下のようにTask
を導入してそこでasync関数を呼び出すことになりますが、こいつはTask
の完了を待たないので、例えばmain
からの呼び出しがこのfuncY
だけだと、結果を出力する前にプログラムが終わってしまいます。
func funcY() {
NSLog("funcY start")
Task {
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}
NSLog("funcY finish")
}
なので、以下のように待ちたいのです。
func funcY() {
NSLog("funcY start")
let waiting = TaskWaiting()
Task {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}
waiting.wait()
NSLog("funcY finish")
}
同期関数側にセマフォもしくはミューテックス的なものを導入したら良いだろうということで調べました。
まず、Swift Concurrencyには同期関数側で待つ仕組みはないらしい、ということが分かりました。
いくつか記事やコードもありました。
正直、これで良いと思うんですが、一方でDispatchSemaphore
を使っているというのが気になります。
WWDC21の「Swift concurrency: Behind the scenes」によると、
Swift Concurrencyでは協調的スケジューリングが必要なので、DispatchSemaphore
やpthread_cond
などのUnsafeなプリミティブは使ったらダメということなんですね。
Swift Concurrencyでは呼び出し元の同期関数と呼び出し先の非同期関数とが同じスレッドで実行されるケースとかもあります。DispatchSemaphore
はそういうことを知らないので、場合によってはデッドロックが発生してしまう、ということなのかなと思っています。
(こういう非同期かつ環境依存の問題ってテストして再現するかというとそうでもないのでつらいですね)
上記のWWDC動画で「環境変数LIBDISPATCH_COOPERATIVE_POOL_STRICT=1にして実行すると問題があれば分かるよ」みたいなことを言ってますが、この記事のDispatchSemaphore
の処理ではそれをしても特に問題は検出されませんでした(簡単なことしか試してないんで絶対大丈夫ということでもないのですが)。
セマフォをDispatchSemaphore
でない仕組みで実装できないか、という話なんですが、asyncなんですよね。
こっちも同様です。asyncでないwait
が欲しいんです。
自分で書いたのは以下のようなものです。
class TaskWaiting {
let startBlocker: NSLock
let taskBlocker: NSLock
init() {
self.startBlocker = NSLock()
self.taskBlocker = NSLock()
self.startBlocker.lock()
}
func makeWaiting() {
self.taskBlocker.lock()
self.startBlocker.unlock()
}
func wait() {
self.startBlocker.lock()
self.taskBlocker.lock()
}
func releaseWaiting() {
self.taskBlocker.unlock()
}
}
func funcY() {
NSLog("funcY start")
let waiting = TaskWaiting()
Task {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}
waiting.wait()
NSLog("funcY finish")
}
NSLock
を二つ使います。
最初は普通にNSLock
を1つだけ使って以下のように書きました。
func funcW() {
NSLog("funcW start")
let lock = NSLock()
Task {
lock.lock() // [1]
defer {
lock.unlock()
}
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}
//sleep(1) // [3]
lock.lock() // [2]
NSLog("funcW finish")
}
ロック1つだけだと、Task
の開始時のlock()
つまり[1]の呼び出しと、呼び出し元側のlock()
つまり[2]の呼び出しが逆転してしまうことがあって、その場合すり抜けてしまってTask
の終了まで待たないんですね。
sleep
のような少し待つ処理を[3]に入れれば良いのですが、ほとんどの場合で待ちすぎるくせに、すり抜けない保証が全くありません。
このため、ロック生成の時点で[2]の箇所をブロックするようにして、Task
の開始時に[1]のロックが済んでから[2]のブロックを解除する、というように2個のロックを導入しました。
Swift Concurrency的にOKなのかはよく分かりません。
WWDCの動画では、DispatchSemaphore
は「Unsafe Primitives」となってるのに対して、NSLock
は「Caution Required」ではありますが。
先にも挙げた記事の方法だと、返値も受け取ることができますし、書き方がasyncとほぼ同じように書けるというのもいいですね。
(返値をどうしても同期関数で受け取らなければならないケースがあるのかよく分かりませんが)
func funcZ() {
NSLog("funcZ start")
Task {
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}.await
NSLog("funcZ finish")
}
ロックを生成したり、defer
でアンロックしたり、というのがありません。
先の記事のコードを、DispatchSemaphore
でなくNSLock
バージョン(=TaskWaiting
)に差し替えると、以下のようにできます。
extension Task where Failure == Never {
var await: Success {
var target: Success!
withoutActuallyEscaping({ target = $0 }) { setter in
let waiting = TaskWaiting()
Task<Void, Never> {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
await setter(value)
}
waiting.wait()
}
return target
}
}
extension Task where Failure : Error {
var await: Success {
get throws {
var target: Result<Success, Failure>!
withoutActuallyEscaping({ target = $0 }) { setter in
let waiting = TaskWaiting()
Task<Void, Never> {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
await setter(result)
}
waiting.wait()
}
return try target.get()
}
}
}
Swift Concurrency CheckingをCompleteにすると、先のコードでは警告が出ますね…
target
がTask
をまたいでキャプチャされているところの警告が出るのを回避できません。
(何か方法があるのかも知れませんが)
そもそも返値まで同期関数側に戻さなくても良い気がするので、それをやめてawait
をプロパティから関数に変更してみたところ、警告はなくなりました。
final class TaskWaiting: Sendable {
private let startBlocker: NSLock
private let taskBlocker: NSLock
init() {
self.startBlocker = NSLock()
self.taskBlocker = NSLock()
self.startBlocker.lock()
}
func makeWaiting() {
self.taskBlocker.lock()
self.startBlocker.unlock()
}
func wait() {
self.startBlocker.lock()
self.taskBlocker.lock()
}
func releaseWaiting() {
self.taskBlocker.unlock()
}
}
extension Task where Failure == Never {
func await() {
let waiting = TaskWaiting()
Task<Void, Never> {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
let _ = await value
}
waiting.wait()
}
}
extension Task where Failure : Error {
func await() throws {
let waiting = TaskWaiting()
Task<Void, Never> {
waiting.makeWaiting()
defer {
waiting.releaseWaiting()
}
let _ = await result
}
waiting.wait()
}
}
呼び出すときは以下のようになります。
func funcZ() {
NSLog("funcZ start")
Task {
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}.await()
NSLog("funcZ finish")
}
ちなみに?@MainActor
でマークしてしまうとTask
の中と外がともに同じメインスレッドになってしまうので、デッドロックします。
func funcZ() {
NSLog("funcZ start")
Task { @MainActor in // ←この場合はawait()したらダメ
async let a = funcA()
async let b = funcB()
NSLog("a x b = %ld", await a * b)
}.await()
NSLog("funcZ finish")
}