Open7

Swift Concurrency: 同期関数で非同期処理を待ちたい

kabeyakabeya

Swift Concurrencyは便利なんですが、いったん関数をasyncにしてしまうと、もう同期関数から同期的に呼ぶということができなくなってしまうという問題があります(ないですか?)

もうちょっと具体的に。

asyncとマークされた関数は、asyncとマークされた関数か、同期関数内で生成されたTaskのクロージャからしか呼べません。同期関数側はTaskのクロージャの完了を待ってくれず、Taskを生成したら即座に帰って行ってしまいます。何だったらクロージャの開始すら待ちません。

ちょっとasyncってマークしただけなのに…。もう別世界に行ってしまったのね…
って感じなんですよね。

例えば、asyncなfuncAfuncBを並列に呼び出し、両方の結果をawaitして、その結果を使って何かを行う。
その間、呼び出し元は待つ。
それだけがしたいんですよね。

呼び出し元もasyncだと、話は簡単です。

func funcX() async {
    NSLog("funcX start")
    async let a = funcA()
    async let b = funcB()
    NSLog("a x b = %ld", await a * b)
    NSLog("funcX finish")
}

呼び出し側をasyncにしない場合は以下のようにTaskを導入してそこでasync関数を呼び出すことになりますが、こいつはTaskの完了を待たないので、例えばmainからの呼び出しがこのfuncYだけだと、結果を出力する前にプログラムが終わってしまいます。

func funcY() {
    NSLog("funcY start")
    Task {
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }
    NSLog("funcY finish")
}

なので、以下のように待ちたいのです。

func funcY() {
    NSLog("funcY start")
    let waiting = TaskWaiting()
    Task {
        waiting.makeWaiting()
        defer {
            waiting.releaseWaiting()
        }
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }
    waiting.wait()
    
    NSLog("funcY finish")
}
kabeyakabeya

同期関数側にセマフォもしくはミューテックス的なものを導入したら良いだろうということで調べました。
まず、Swift Concurrencyには同期関数側で待つ仕組みはないらしい、ということが分かりました。

いくつか記事やコードもありました。

https://zenn.dev/xsgn/articles/25c4e3a0d4bba9

正直、これで良いと思うんですが、一方でDispatchSemaphoreを使っているというのが気になります。

WWDC21の「Swift concurrency: Behind the scenes」によると、
Swift Concurrencyでは協調的スケジューリングが必要なので、DispatchSemaphorepthread_condなどのUnsafeなプリミティブは使ったらダメということなんですね。

Swift Concurrencyでは呼び出し元の同期関数と呼び出し先の非同期関数とが同じスレッドで実行されるケースとかもあります。DispatchSemaphoreはそういうことを知らないので、場合によってはデッドロックが発生してしまう、ということなのかなと思っています。
(こういう非同期かつ環境依存の問題ってテストして再現するかというとそうでもないのでつらいですね)

上記のWWDC動画で「環境変数LIBDISPATCH_COOPERATIVE_POOL_STRICT=1にして実行すると問題があれば分かるよ」みたいなことを言ってますが、この記事のDispatchSemaphoreの処理ではそれをしても特に問題は検出されませんでした(簡単なことしか試してないんで絶対大丈夫ということでもないのですが)。

https://techlife.cookpad.com/entry/2022/10/24/090000

セマフォをDispatchSemaphoreでない仕組みで実装できないか、という話なんですが、asyncなんですよね。

https://github.com/groue/Semaphore

こっちも同様です。asyncでないwaitが欲しいんです。

kabeyakabeya

自分で書いたのは以下のようなものです。

class TaskWaiting {
    let startBlocker: NSLock
    let taskBlocker: NSLock
    
    init() {
        self.startBlocker = NSLock()
        self.taskBlocker = NSLock()
        
        self.startBlocker.lock()
    }
    
    func makeWaiting() {
        self.taskBlocker.lock()
        self.startBlocker.unlock()
    }
    
    func wait() {
        self.startBlocker.lock()
        self.taskBlocker.lock()
    }
    
    func releaseWaiting() {
        self.taskBlocker.unlock()
    }
}

func funcY() {
    NSLog("funcY start")
    let waiting = TaskWaiting()
    Task {
        waiting.makeWaiting()
        defer {
            waiting.releaseWaiting()
        }
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }
    waiting.wait()
    
    NSLog("funcY finish")
}

NSLockを二つ使います。
最初は普通にNSLockを1つだけ使って以下のように書きました。

func funcW() {
    NSLog("funcW start")
    let lock = NSLock()
    Task {
        lock.lock() // [1]
        defer {
            lock.unlock()
        }
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }
    //sleep(1)  // [3]
    lock.lock() // [2]
    
    NSLog("funcW finish")
}

ロック1つだけだと、Taskの開始時のlock()つまり[1]の呼び出しと、呼び出し元側のlock()つまり[2]の呼び出しが逆転してしまうことがあって、その場合すり抜けてしまってTaskの終了まで待たないんですね。
sleepのような少し待つ処理を[3]に入れれば良いのですが、ほとんどの場合で待ちすぎるくせに、すり抜けない保証が全くありません。

このため、ロック生成の時点で[2]の箇所をブロックするようにして、Taskの開始時に[1]のロックが済んでから[2]のブロックを解除する、というように2個のロックを導入しました。

Swift Concurrency的にOKなのかはよく分かりません。
WWDCの動画では、DispatchSemaphoreは「Unsafe Primitives」となってるのに対して、NSLockは「Caution Required」ではありますが。

kabeyakabeya

先にも挙げた記事の方法だと、返値も受け取ることができますし、書き方がasyncとほぼ同じように書けるというのもいいですね。
(返値をどうしても同期関数で受け取らなければならないケースがあるのかよく分かりませんが)

func funcZ() {
    NSLog("funcZ start")
    Task {
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }.await
    
    NSLog("funcZ finish")
}

ロックを生成したり、deferでアンロックしたり、というのがありません。

kabeyakabeya

先の記事のコードを、DispatchSemaphoreでなくNSLockバージョン(=TaskWaiting)に差し替えると、以下のようにできます。

extension Task where Failure == Never {
    var await: Success {
        var target: Success!
        withoutActuallyEscaping({ target = $0 }) { setter in
            let waiting = TaskWaiting()
            Task<Void, Never> {
                waiting.makeWaiting()
                defer {
                    waiting.releaseWaiting()
                }
                await setter(value)
            }
            waiting.wait()
        }
        return target
    }
}
extension Task where Failure : Error {
    var await: Success {
        get throws {
            var target: Result<Success, Failure>!
            withoutActuallyEscaping({ target = $0 }) { setter in
                let waiting = TaskWaiting()
                Task<Void, Never> {
                    waiting.makeWaiting()
                    defer {
                        waiting.releaseWaiting()
                    }
                    await setter(result)
                }
                waiting.wait()
            }
            return try target.get()
        }
    }
}
kabeyakabeya

Swift Concurrency CheckingをCompleteにすると、先のコードでは警告が出ますね…
targetTaskをまたいでキャプチャされているところの警告が出るのを回避できません。
(何か方法があるのかも知れませんが)

そもそも返値まで同期関数側に戻さなくても良い気がするので、それをやめてawaitをプロパティから関数に変更してみたところ、警告はなくなりました。

final class TaskWaiting: Sendable {
    private let startBlocker: NSLock
    private let taskBlocker: NSLock
    
    init() {
        self.startBlocker = NSLock()
        self.taskBlocker = NSLock()
        
        self.startBlocker.lock()
    }
    
    func makeWaiting() {
        self.taskBlocker.lock()
        self.startBlocker.unlock()
    }
    
    func wait() {
        self.startBlocker.lock()
        self.taskBlocker.lock()
    }
    
    func releaseWaiting() {
        self.taskBlocker.unlock()
    }
}

extension Task where Failure == Never {
    func await() {
        let waiting = TaskWaiting()
        Task<Void, Never> {
            waiting.makeWaiting()
            defer {
                waiting.releaseWaiting()
            }
            let _ = await value
        }
        waiting.wait()
    }
}

extension Task where Failure : Error {
    func await() throws {
        let waiting = TaskWaiting()
        Task<Void, Never> {
            waiting.makeWaiting()
            defer {
                waiting.releaseWaiting()
            }
            let _ = await result
        }
        waiting.wait()
    }
}

呼び出すときは以下のようになります。

func funcZ() {
    NSLog("funcZ start")
    Task {
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }.await()
    
    NSLog("funcZ finish")
}
kabeyakabeya

ちなみに?@MainActorでマークしてしまうとTaskの中と外がともに同じメインスレッドになってしまうので、デッドロックします。

func funcZ() {
    NSLog("funcZ start")
    Task { @MainActor in // ←この場合はawait()したらダメ
        async let a = funcA()
        async let b = funcB()
        NSLog("a x b = %ld", await a * b)
    }.await()
    
    NSLog("funcZ finish")
}