☘️

【Swift 5.6】Concurrency の応答を非同期でないコンテキストで拾いたい

2022/06/24に公開

みなさん Swift Concurrency していますか(挨拶)

更新履歴

22/06/25 Taskのextensionではないやり方を追記しました
22/06/27 import Foundationimport class Dispatch.DispatchSemaphoreへ修正.

この記事の要約

Swift Concurrencyは便利だが,非同期(concurrent)なコンテキストに閉じ込めなければならない.
そのため,非同期でない処理の中の一部にSwift concurrencyを用いたい場合などで不便である.
この記事では非同期でない処理の一部にSwift concurrencyを組み込むためのextensionを提案する.
(追記)extensionでないやり方も追記しました

やりたいこと(イメージ) (Our purpose)

func asyncHeavyFunc() async -> some value {}
func notAsyncFunc() -> some value { // ここがasyncでない
    async let x = asyncHeavyFunc()
    async let y = asyncHeavyFunc()
    return await merge(x, y)
}

Summary (Eng.)

Swift-concurrency is powerful feature. It can be however only used in concurrency context such as func somefunc() async {} or Task {}.
Some of us would like partially employ them in procedual context for parallel evaluation like #pragma omp parallel for.
In this article, an extension is proposed to obtain responses of Swift-concurrency-function in procedual context.

この記事の背景

Swift Concurrency は多くの非同期処理を簡潔に書けて大変良いです.
Structured concurrency な点がとても素晴らしいです.

ところで,稀に非同期でない処理の一部を並列化してマルチコアの恩恵を受けたい場合があります.
他言語で言えば for-loop の各要素をマルチコアで走らせる #pragma omp parallel for などが相当します.

(OpenMPの例 in C++)
before

for ( int i = 0 ; i < LARGE_NUMBER_ITERATION ; ++ i ) {
    result[i] = someHeavyFunc(i);
}

after

#pragma omp parallel for // これを書くだけでfor-loopの中がマルチコアで並列に評価される
for ( int i = 0 ; i < LARGE_NUMBER_ITERATION ; ++ i ) { 
    result[i] = someHeavyFunc(i);
}

#pragma omp parallel for は非同期ではないコンテキストで,指定したfor-loopのみ局所的に並列化させることが可能です.コード修正も最小限で済みます.
今回はこの局所的な並列化をSwift concurrencyで書きたいというモチベーションです.
#pragma omp parallel for 自体はSwift Concurrencyを使わずSwift+GCDで書くこともできます.

private let queue = DispatchQueue(label: "some-concurrent-queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: .none)
func parallel<Source: Sequence, Target>(for source: Source, closure: (Source.Element) -> Target) -> Array<Target> {
    withoutActuallyEscaping(closure) { closure in
        var result: Array<(Int, Target)> = []
        let group = DispatchGroup()
        for (i, v) in source.enumerated() {
            queue.async(group: group) {
                let v = closure(v)
                queue.async(group: group, qos: .default, flags: .barrier) {
                    result.append((i, v))
                }
            }
        }
        group.wait()
        result.sort { $0.0 < $1.0 }
        return result.map { $1 }
    }
}

上記は #pragma omp parallel for 相当の機能を提供できています.しかしOpenMPもSwift+GCDも structured concurrency ではないため完璧ではありません.
どういうことかというと,例えば以下のような場合

let x: some Sequence
let y: some Sequence
let processed_x = parallel(for: x, closure: someHeavyProcessA)
let processed_y = parallel(for: y, closure: someHeavyProcessB)
let result_xy = mergeProcess(processed_x, processed_y)

を考えます.この例ではsomeHeavyProcessBの評価はsomeHeavyProcessAの完了まで開始されず非効率です.
できればsomeHeavyProcessAとsomeHeavyProcessBの評価は並列で同時に行い,両方の評価が終わってからmergeProcessに繋げたいです.
その際には structured concurrency である Swift Concurrency が便利です..

func asyncParallel<Source: Sequence, Target>(for source: Source, closure: @escaping(Source.Element) -> Target) async -> Array<Target> {
    await withTaskGroup(of: Target.self) { group in
        for v in source {
            group.addTask {
                closure(v)
            }
        }
        return await group.reduce(into: []) { $0.append($1) } // 順序が変わる点に注意.
    }
}
let x: some Sequence
let y: some Sequence
async let processed_x = asyncParallel(for: x, closure: someHeavyProcessA)
async let processed_y = asyncParallel(for: y, closure: someHeavyProcessB)
let result_xy = await mergeProcess(processed_x, processed_y)

このように書くとsomeHeavyProcessAとsomeHeavyProcessBが並列で評価されて両方の結果が揃ってからmergeProcessを行うことを(コールバック地獄にせず) 自然に 記述することができます.
つまりSwift Concurrency最高です.

ですが,不便な点として

func asyncHoge() async { // asyncな関数
// ここか
}
Task {
// ここ
}

のように,非同期なコンテキスト中にしか記述できないこと が挙げられます.
そのため先に挙げた #pragma omp parallel for のように 非同期でないコンテキストで局所的に並列化する といったことが実現できません.

func hoge() -> some Sequence { // asyncでない
    let x = some Sequence
    let y = some Sequence
    async let px = asyncParallel(for: x, closure: someHevyFunctionA) // asyncでないのでできない
    async let py = asyncParallel(for: y, closure: someHevyFunctionB) // asyncでないのでできない
    return await merge(px, py) // asyncでないので略
}
// hoge() の呼び出し元もasyncでない

呼び出し元を含めて後段の処理を全てasyncにすれば解決できるのですが,局所的に並列化させたいためだけにそのコストは掛けられません.
そこで非同期でないコンテキストでSwift Concurrencyの応答を拾えるようにTask型へextensionを生やします.

Taskの結果を非同期でないコンテキストで得る

以下のextensionを書いてみました.

import class Dispatch.DispatchSemaphore
extension Task where Failure == Never {
    var await: Success {
        var target: Success!
        withoutActuallyEscaping({ target = $0 }) { setter in
            let semaphore = DispatchSemaphore(value: 0)
            Task<Void, Never> {
                defer {
                    semaphore.signal()
                }
                await setter(value)
            }
            semaphore.wait()
        }
        return target
    }
}
extension Task where Failure : Error {
    var await: Success {
        get throws {
            var target: Result<Success, Failure>!
            withoutActuallyEscaping({ target = $0 }) { setter in
                let semaphore = DispatchSemaphore(value: 0)
                Task<Void, Never> {
                    defer {
                        semaphore.signal()
                    }
                    await setter(result)
                }
                semaphore.wait()
            }
            return try target.get()
        }
    }
}

主なアイデアは

  • Task型に渡す処理ブロックの中で値をreturnするとTask.resultとしてそれを得ることができる
  • ただしTask.result自体がasyncなのでConcurrency contextの中でしか使えない
  • そこでcomputed property の内部で Task を新たに作り,その中でTask.resultをTaskの外に渡す.
  • 内側のTaskが完了するまではDispatchSemaphoreでロックする.

です.
上記が満たされればほぼ目的は達成です.それ以外の処理は以下の目的で採用しました.

Success型はSendableでない可能性もあるので,以下のようにTaskに渡すクロージャで直接更新ができません.(コンパイラがおこ)

var target: Success!
Task {
  target = await result // 安全性を保証できないので怒られる.
  semaphore.signal()
}
semaphore.wait()

しかしこのスコープでしか扱われない変数なので変更は安全です.
コンパイラを騙すためにsetterクロージャでラップします

{ result = $0 } // setter

また,Task の中に渡されるクロージャは @escaping でなければなりません.(本来はこのスコープを抜けてから使われる可能性もあるため.)
しかしスコープを抜けてから呼ばれないことをセマフォで保証できるため@escapingの必要はありません.そこでパフォーマンスを優先し@nonescapingのままwithoutActuallyEscapingで偽装します.
まとめると上記のソースコードになります.

このextensionを生やすと,以下のように非同期でない処理の一部分だけにSwift concurrencyを用いることができます.

.awaitでTask型の結果を得る

func foo() async -> String {
  "foo"
}
let foobar = Task { await foo() }.await

.awaitでTask型の結果を得る(複数のasyncを結合)

func foo() async -> String {
  "foo"
}
func bar() async -> String {
  "bar"
}
func somefunc() -> String {
  let foobar: String = Task {
    async let hoge = foo()
    async let fuga = bar()
    return await hoge + fuga
  }.await
  return foobar
}
print(somefunc())

例外を投げるasyncの結果をtryを置きつつ得る.

func foo() async throws -> String {
  "foo"
}
func bar() async throws -> String {
  "bar"
}
func somefunc() throws -> String {
  let foobar: String = try Task {
    async let hoge = foo()
    async let fuga = bar()
    return try await hoge + fuga
  }.await
  return foobar
}
try print(somefunc())

最初に書いたやりたいことも実現できました.

func asyncHeavyFunc() async -> some value {}
func notAsyncFunc() -> some value { // ここがasyncでない
  Task {
    async let x = asyncHeavyFunc()
    async let y = asyncHeavyFunc()
    return await merge(x, y)
  }.await
}

感想

  • Taskが入れ子になっているところがType Erasureみたいですね
  • プロパティ名を思いつかなかったので .await にしましたが,より適した名称がありそう
  • こんなextensionを生やさなくて済むように言語機能でサポートされてほしい

追記

以下のように関数で書いた方が autoreleasepool のように使えてシンプルな気がした.

import class Dispatch.DispatchSemaphore
func async<Success>(await body: () async throws -> Success) rethrows -> Success {
    try withoutActuallyEscaping(body) { body in
        var result: Result<Success, Error>!
        withoutActuallyEscaping({ result = $0 }) { setter in
            let semaphore = DispatchSemaphore(value: 0)
            Task<Void, Never> {
                defer {
                    semaphore.signal()
                }
                do {
                    try await setter(.success(body()))
                } catch {
                    setter(.failure(error))
                }
            }
            semaphore.wait()
        }
        return try result.get()
    }
}

ベースアイデアは上述Task.await同じ.これだとTask型の入れ子が発生しない上,ErrorとNeverをまとめて定義できる.
使い方もシンプル

func hoge() async -> String {
  "hoge"
}
func fuga() async throws -> String {
  "fuga"
}
let foo = async {
  await hoge()
}
let bar = try async {
  try await fuga()
}
let foobar: String = try async {
  async let foo = hoge()
  async let bar = fuga()
  return try await foo + bar
}
print(foobar == foo + bar)

ただ,extensionと違ってErrorの具体型を定義できないことと名前空間の汚染が気になる.

Discussion