🐕

Kotlin Coroutine 入門2: 並列実行と Structured Concurrency と例外

2020/09/23に公開

前回は Kotlin の coroutine の基本として、起動と suspend 関数の解説をしました。今回は coroutine を並列で起動する場合に必要になってくる概念を解説していきます。

https://zenn.dev/wm3/articles/ef46f2329c37a19368ac

シナリオ: 最安値を見つけろ!

今回は「二つの販売店の API を使って商品の価格を比較し、最安値を取得する」というシナリオを考えます。動作を見やすくするために Store という抽象クラスを用意しました。

/** お店の商品情報を提供する抽象クラス */
abstract class Store(private val name: String) {
    /** 価格取得の実装。サブクラスが実装する。 */
    protected abstract suspend fun doGetPrice(itemCode: String): Int

    /** doGetPrice を呼び出し、取得開始と終了、エラーが出た時にログを出力します。 */
    suspend fun itemPrice(itemCode: String): Int {}
}

さらにサンプルのお店を二つ用意しました。

/** 普通の店。処理に2秒かかる。 */
object AStore : Store("AStore") {
    override suspend fun doGetPrice(itemCode: String): Int {
        delay(2000)
        return 49998
    }
}

/** ちょっと安い店。AStore と同じく処理に2秒かかる。 */
object BStore : Store("BStore") {
    override suspend fun doGetPrice(itemCode: String): Int {
        delay(2000)
        return 49800
    }
}

実際に二つのお店の商品価格を取得して表示してみます。

fun runMain(): Job = scope.launch {
    val price1 = AStore.itemPrice("4901170017583")
    val price2 = BStore.itemPrice("4901170017583")
    println("⭐⭐AStore: $price1, BStore: $price2⭐⭐")
}

▶️ 実行してみる

実行結果

 0.621: AStore: 「4901170017583」の価格を取得します
 2.723: AStore: 「4901170017583」の価格は 49998 でした
 2.724: BStore: 「4901170017583」の価格を取得します
 4.725: BStore: 「4901170017583」の価格は 49800 でした
⭐⭐AStore: 49998, BStore: 49800⭐⭐

各ストアの itemPrice() が呼び出されると、価格取得処理の実行前後にログが出力されます。ログは処理全体の開始何秒のイベントかが記録されています。例外が発生した時もログが出力されます。

さて、準備ができたので本題に入りましょう。

並列実行と Structured Concurrency

async と await

実際に AStoreBStore の価格を比較し、最安値を計算してみましょう。

fun runMain(): Job = scope.launch {
    val price1 = AStore.itemPrice("4901170017583")
    val price2 = BStore.itemPrice("4901170017583")
    val bestPrice = min(price1, price2)
    println("⭐⭐最安価格: $bestPrice⭐⭐")
}

▶️ 実行してみる

これを実行すると、以下のように出力されます。

 0.105: AStore: 「4901170017583」の価格を取得します
 2.129: AStore: 「4901170017583」の価格は 49998 でした
 2.130: BStore: 「4901170017583」の価格を取得します
 4.131: BStore: 「4901170017583」の価格は 49800 でした
⭐⭐最安価格: 49800⭐⭐

まず AStore の価格の取得のために2秒中断し、その後 BStore の価格の取得に2秒中断、合計処理に4秒かかっていて、少々効率的ではありません。同時に itemPrice() を呼び出せば2秒で両方取得する事ができるはずです。

asyncawait を使う事で並列実行が可能になります。

fun runMain(): Job = scope.launch {
    val price1 = async { AStore.itemPrice("4901170017583") }
    val price2 = async { BStore.itemPrice("4901170017583") }
    val bestPrice = min(price1.await(), price2.await())
    println("⭐⭐最安価格: $bestPrice⭐⭐")
}

▶️ 実行してみる

itemPrice()async { … } で囲む事で、処理を裏で実行させながら次の処理が実行できるようになります。async の結果に対して await() を呼ぶと、処理が終了するまで中断してからその結果を受け取ります。

実行結果

 0.140: AStore: 「4901170017583」の価格を取得します
 0.148: BStore: 「4901170017583」の価格を取得します
 2.165: AStore: 「4901170017583」の価格は 49998 でした
 2.166: BStore: 「4901170017583」の価格は 49800 でした
⭐⭐最安価格: 49800⭐⭐

最初に二つの処理がほぼ同時に開始され、それぞれ2秒後に処理が終了している事がわかると思います。これで2秒後に「最安価格: 49800」が出力されました。便利!! 🥳🥳

Coroutine Builder と CoroutineScope

launchasync といった、coroutine を起動する関数を「coroutine builder」と呼びます。ほとんど[2]の coroutine builder は CoroutineScope の拡張関数で定義されています。そして先ほどの例の async はトップレベル関数のように見えて 実は CoroutineScope の拡張関数です。ややこしいですね。

このカラクリを理解するには、launch メソッドの定義を見ると良いです。公式ドキュメントを見ると launch メソッドは以下のように書かれてあります。

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

(公式ドキュメント: launch - kotlinx-coroutines-coreより転載)

最初の二つは省略可能で、普段指定しているのは三つ目の block なので、その型に注目してみましょう。CoroutineScope.() -> Unit と書かれてあります。もしかしたら見慣れない人もいるかも知れませんが、これは「CoroutineScope を this に指定したレシーバ付き関数リテラル」です。apply { … } 関数を CoroutineScope に対して実行していると考えると良いと思います。suspend も指定されているので suspend 関数でもあります。

launch に限らず全ての coroutine builder のブロック引数は this に CoroutineScope が指定されます。そのためレシーバーを指定しないで launchasync を呼び出すと現在のスコープ (=this) に coroutine を生成させる事になります。特に async メソッドはレシーバーを指定しない方が良い事が多いでしょう。

後述しますが、この仕組みは Kotlin の coroutine の設計思想の重要な部分となっています。

Structured Concurrecy

coroutine builder によって起動された coroutine と、そのブロックで this として渡されたスコープは表裏一体の関係にあります。具体的にいうと、coroutine をキャンセルすればそのブロックのスコープにもキャンセルが発行されます。

これは何を指すのでしょうか。

試しに二つの処理を同時に実行してみます。この時片方は現在のスコープに対して async を、もう片方は scope.async を呼び出します。

fun runMain(): Job = scope.launch {
    try {
        // 片方は scope から呼び出す。
        val price1 = async { AStore.itemPrice("4901170017583") }
        val price2 = scope.async { BStore.itemPrice("4901170017583") }
        val bestPrice = min(price1.await(), price2.await())
        println("⭐️⭐️最安価格: $bestPrice⭐️⭐️")
    } catch (e: Exception) {
        println("🚨🚨取得に失敗しました: $e🚨🚨")
    }
}

この時に runMain() が終了する前にキャンセルするとどうなるでしょうか?

// 実行して即キャンセルする。
val job: Job = runMain()println("キャンセル呼び出し")
job.cancel()

▶️ 実行してみる

実行結果

 0.177: BStore: 「4901170017583」の価格を取得します
 0.169: AStore: 「4901170017583」の価格を取得します
キャンセル呼び出し
 0.724: AStore: 「4901170017583」の価格の取得がキャンセルされました
🚨🚨取得に失敗しました: kotlinx.coroutines.JobCancellationException: Job was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@6ac1c3f0🚨🚨
 2.181: BStore: 「4901170017583」の価格は 49800 でした

runMain() の処理をキャンセルすると、現在のスコープによって起動した AStore への async 処理もキャンセルされました。一方で scope.async によって起動した BStore への処理はキャンセルされず、結果 BStore の処理は正常終了します。

つまりキャンセルを発行すると、ブロックのスコープにもキャンセルが発行され、そのスコープによって起動した coroutine もキャンセルされる…という伝播が起こります。

coroutine のキャンセル

この仕組みを使えば、入れ子で呼び出した coroutine は外側の coroutine に連動して自動でキャンセルさせる事ができます。

この CoroutineScope の階層化の考えを structured concurrency と呼んでいます。structured concurrency における各スコープは以下のような関係を持っています。

  • 内側のスコープ(の coroutine) が全て終了するまで外側のスコープは終了しない
  • 外側のスコープに対してキャンセルが要求されれば、内側のスコープに対してもキャンセルが伝播する

逆に言えば、「内側のスコープが動いているのに外側の処理が終了している」といった事は起こらないようになっています。

内側のスコープの coroutine だけが動く事はない

実際に使用する際は、あえてこの階層化を超えて実行したい処理(ログや通知など)は viewModelScopeGlobalScope のような階層外にある CoroutineScope を使い、それ以外の場合は現在のスコープを使うのが一つの良い指針と言えると思います。

structured concurrecy に関するさらに詳しい話に興味がある方は、以下が参考になると思います。

Structured Concurrency と suspend 関数

さて、suspend 関数から coroutine builder を呼び出す事を考えてみます。AStoreBStore の安い方の価格で商品を扱う CheapestStore を作ってみます。

/** 最安価格で取り扱うストア */
object CheapestStore : Store("CheapestStore") {
    override suspend fun doGetPrice(itemCode: String): Int {
        val price1 = async { AStore.itemPrice(itemCode) }
        val price2 = async { BStore.itemPrice(itemCode) }
        return min(price1.await(), price2.await())
    }
}

// メイン処理
fun runMain(): Job = viewModelScope.launch {
    println("⭐️⭐️最安価格: ${CheapestStore.itemPrice("4901170017583")}⭐️⭐️")
}

なんと、コンパイルエラーになります。というのも、suspend 関数は launch { … } の中のブロックとは違い現在のスコープが指定されてないため、coroutine builder を呼び出す事ができないのです。

suspend 関数から coroutine builder を呼び出すためには、coroutineScope { … } (最初の c は小文字)関数を使います。

/** 最安価格で取り扱うストア */
object CheapestStore : Store("CheapestStore") {
    override suspend fun doGetPrice(itemCode: String): Int = coroutineScope {
        val price1 = async { AStore.itemPrice(itemCode) }
        val price2 = async { BStore.itemPrice(itemCode) }
        min(price1.await(), price2.await())
    }
}

▶️ 実行してみる

coroutineScope { … } のブロックにスコープが割り当てられ、coroutine builder を呼び出す事ができるようになります。

coroutineScope { … }launchasync と違い、新しく coroutine を起動せずに同期的に実行します。つまり suspend 関数の呼び出しが終わって値を返すタイミングで、その中で起動した(階層外の CoroutineScope で起動したもの以外の) coroutine は全て終了している事が保証されます。

Structured Concurrency と例外

Kotlin の structured concurrency を考える際、例外の扱いに関する理解は欠かせません。直感的でない面もあるのでここで解説します。

実際に例外を投げてみます。今回は商品情報を取得しようとするとエラーが起きる XStore というのを用意しました。

/** 例外を投げてしまうストア */
object XStore : Store("XStore") {
    override suspend fun doGetPrice(itemCode: String): Int {
        error("サーバーダウン")
    }
}

今まで使っていた AStore と今回の XStore に対して非同期で itemPrice を投げてみます。動作の詳細が分かるように、今回は各行にログを仕込んでおきます。

// メイン処理。片方のストアは例外を投げる。
fun runMain(): Job = scope.launch {
    try {
        println("フェーズ1: AStore にアクセス")
        val price1 = async { AStore.itemPrice("4901170017583") }
        println("フェーズ2: XStore にアクセス")
        val price2 = async { XStore.itemPrice("4901170017583") }
        println("フェーズ3: AStore の結果を取得")
        val p1 = price1.await()
        println("フェーズ4: XStore の結果を取得")
        val p2 = price2.await()
        println("フェーズ5: 最安値を計算")
        val bestPrice = min(p1, p2)
        println("⭐️⭐️最安価格: $bestPrice⭐️⭐️")
    } catch (e: Exception) {
        println("🚨🚨取得に失敗しました: $e🚨🚨")
    }
}

さあ、実行すると起動した各 coroutine に何が起きるでしょうか?特に、メインの処理はどのフェーズで落ちる(あるいは落ちないで完了する)でしょうか?一度考えてみましょう!

▶️ 実行してみる

実行結果

フェーズ1: AStore にアクセス
フェーズ2: XStore にアクセス
フェーズ3: AStore の結果を取得
 0.131: AStore: 「4901170017583」の価格を取得します
 0.132: XStore: 「4901170017583」の価格を取得します
 0.142: XStore: 「4901170017583」の価格の取得が失敗しました: (java.lang.IllegalStateException: サーバーダウン)
🚨🚨取得に失敗しました: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job="coroutine#2":StandaloneCoroutine{Cancelling}@5cf18424🚨🚨
 0.247: AStore: 「4901170017583」の価格の取得がキャンセルされました

実行結果をみると以下の事がわかります。

  • メイン処理だけでなく、無関係の AStore の処理にもキャンセルが発行される
  • メイン処理はフェーズ 3 の無関係の AStore の処理に対する await() でキャンセルが発行される

なんと、XStore の処理や await() だけでなく、全く無関係の AStore に対する挙動にも影響を与えています!この挙動は JavaScript などの言語における async/await と全く異なります。

coroutine で発生した例外(CancellationException 以外)をキャッチしなかった場合は、後述する一部の例外を除き一番外側の coroutine (およびその内側にある coroutine)に対してキャンセルが発行されます。

coroutine のキャンセル

このような挙動になっている理由は、失敗した一連の処理を速やかに終了させるためのようです[3]。実際今回の例で言えば、XStore が失敗している事がわかっているのに AStore の処理を待つのは時間の無駄なので、全てキャンセルしてしまった方が都合が良いです。

coroutine の例外をハンドリングする方法

しかし例外を全くハンドリングできないというのでは困ってしまいます。代表的な対応策はいくつかあります。

  1. 入れ子で coroutine を使わない
  2. coroutineScope { … } を使う
  3. CoroutineExceptionHandlerSupervisorJob を使う

一つ目は特に特別な仕組みを知らずともできる方法です。並列実行して高速化したい理由が特にない場合に推奨される方法と言えるでしょう。

今回は二つ目の方法を紹介します。三つ目の方法は今回は説明しません。

coroutineScope { … } と例外

coroutineScope { … } は先ほど suspend 関数で coroutine builder を使うための方法として紹介しました。ではこの状態で例外が発生したらどうなるでしょうか?

/** 最安価格で取り扱うストア。扱うストアの片方が例外を投げる。 */
object CheapestStore : Store("CheapestStore") {
    override suspend fun doGetPrice(itemCode: String): Int = coroutineScope {
        val price1 = async { AStore.itemPrice(itemCode) }
        val price2 = async { XStore.itemPrice(itemCode) }
        min(price1.await(), price2.await())
    }
}

// メイン処理
fun runMain(): Job = scope.launch {
    try {
        val bestPrice = CheapestStore.itemPrice("4901170017583")
        println("⭐️⭐️最安価格: $bestPrice⭐️⭐️")
    } catch (e: Exception) {
        println("🚨🚨取得に失敗しました: $e🚨🚨")
    }
}

▶️ 実行してみる

実行結果

 0.213: CheapestStore: 「4901170017583」の価格を取得します
 0.236: XStore: 「4901170017583」の価格を取得します
 0.236: AStore: 「4901170017583」の価格を取得します
 0.236: XStore: 「4901170017583」の価格の取得が失敗しました: (java.lang.IllegalStateException: サーバーダウン)
 0.299: AStore: 「4901170017583」の価格の取得がキャンセルされました
 0.302: CheapestStore: 「4901170017583」の価格の取得が失敗しました: (java.lang.IllegalStateException: サーバーダウン)
🚨🚨取得に失敗しました: java.lang.IllegalStateException: サーバーダウン🚨🚨

前回と変わらず無関係な AStore にキャンセルが発行されますが、runMain() 自体に CancellationException が発生する事はなく、CheapestStore.itemPrice()XStore が投げた例外になります。

coroutineScope のスコープの中にある coroutine でキャッチされない例外が発生した場合、coroutineScope はその例外を投げます。この際、先ほどと同様にスコープ内の他の処理をキャンセルし、完了するのを待ちます。

coroutineScope のこの挙動のため、例外によるキャンセル伝播は suspend 関数のスコープがキャンセルされるだけで、呼び出し元の coroutine までキャンセルされたりはしません

なお coroutineScope はただの suspend 関数なので、launch { … } の中でも呼び出す事が可能です。例外が発生した時のキャンセル伝播を途中で止めてリカバリしたい時などは coroutineScope を使うと良いでしょう。

まとめ

ここまで Kotlin の coroutine で並列実行をする際に重要となる structured concurrency と、例外が起きた時の動作を解説しました。structured concurrency は Kotlin の coroutine において重要な考えであり、また個人的には非常に面白い機能だと思います。しかしそれなりに複雑ではあるので、慣れるまでは入れ子の async などを使わないというのも一つの手だと個人的には思います。

次回は今まであまり話して来なかった、viewModelScopeGlobalScope のようなトップレベルの CoroutineScope を作る方法について解説しようと思っています。

脚注
  1. 公式ドキュメントにも非推奨と書かれてあります。 ↩︎

  2. runBlocking { … } といった、呼び出し元をブロックする coroutine builder は CoroutineScope が不要です。 ↩︎

  3. Exception handling with structured concurrency (rx & async) · Issue #691 · Kotlin/kotlinx.coroutines ↩︎

Discussion