🚅

社内でKotlin Coroutines勉強会を開催しました!

2023/04/04に公開
2

こんにちは!アルダグラムでエンジニアをしている渡邊です!

先日社内で Kotlin Coroutines の勉強会を開催しました。
社内では定期的に勉強会を開催しており、知見の共有や技術力の向上を行っています。
せっかくなので今回開催した勉強会の内容について、以下に共有したいと思います!

コルーチンとは

コルーチンは、実行を中断して後で再開できる軽量なスレッドのようなもの

非同期処理による待機が必要な場合、従来のスレッドやコールバックで実装するとコードが複雑になりがちになる。

fun fetchData(onSuccess: (result: String) -> Unit, onError: (error: String?) -> Unit) {
    thread {
        try {
            // データ取得の処理(ここでは単純に2秒待機している)
            Thread.sleep(2000)
            val data = "data from network"
            // コールバックに取得したデータを渡す
            onSuccess(data)
        } catch (e: Exception) {
            // コールバックにエラーを渡す
            onError(e.message)
        }
    }
}

// データを取得してUIを更新する処理を行う
fetchData(
    onSuccess = { result ->
        // UIを更新する
    },
    onError = { error ->
        // エラー処理
    }
)

コルーチンを使うことで、非同期で実行する処理のコードを簡潔に書くことができる。

suspend fun fetchData(): String {
    // データ取得の処理(ここでは単純に2秒待機している)
    delay(2000)
    return "data from network"
}

// コルーチンを使ってデータを取得してUIを更新する処理を行う
GlobalScope.launch {
    try {
        val data = fetchData()
        // UIを更新する
    } catch (e: Exception) {
        // エラー処理
    }
}

Kotlin Coroutines は Kotlin の言語機能ではなくライブラリとして提供されている。
https://github.com/Kotlin/kotlinx.coroutines
(ただし suspend といった修飾子などは Kotlin の言語機能に組み込まれている)

Kotlin Coroutines には Channel や Flow といった機能もあるが、今回は説明しない。

コルーチンの概要

スレッドでタスクを実行する場合

例えば一つのスレッドで複数のタスクを実行する場合を考える。
その場合、以下のように実行する処理が直列に並ぶようなイメージとなる。


一つのスレッドでタスクを実行する場合

もちろん複数のスレッドでそれぞれのタスクを実行することもできる。


複数スレッドでタスクを実行する場合

ただ スレッドの生成や切り替えにはコストがかかるため、大量のスレッドを生成、使用するのは望ましくない場合がある

コルーチンでタスクを実行する場合

コルーチンでタスクを実行する場合は、処理の実行が中断、再開される。
なので、最初に実行した処理が途中で以下のように中断状態となる。


タスクが中断状態となる

中断状態のタスクがある場合、その後に続けて処理を実行することができる。


中断状態のタスクの後に続けて別のタスクを実行

以下は3つコルーチンでタスクを開始して、全て中断状態になった場合。


3つのタスクが中断状態になっている

処理が再開されるタイミングになったら、順次中断した箇所から処理が再開される。


タスクが再開されていく例

このようにコルーチンを使うことで、複数のタスクを一つのスレッドで管理するため、スレッドの生成や切り替えのコストが削減される。
したがって、リソースの効率的な使用と高い並行性を実現できる
これがコルーチンが 軽量なスレッドのようなもの と言われる理由。

説明を簡略するために一つのスレッドとしたが、コルーチンでは複数のスレッドで複数のタスクを実行することも可能。
その場合、スレッドの空き状況に応じて処理が実行されるようになる。
なので以下の図のように、中断前と中断後で処理が実行されるスレッドが変わることがある


中断前と中断後で実行されるスレッドが変わる例

コードで確認してみる

Kotlin Playground でコルーチンのコードを確認することができる。
https://pl.kotl.in/pmSwymFDg

import kotlinx.coroutines.*

fun main() {   
    runBlocking {
	launch {
            println("処理1-1")
            // delay で処理は中断され、1000ms後に続きの処理が再開される
            delay(1000)
            println("処理1-2")
        }
        launch {
            println("処理2-1")
            // delay で処理は中断され、500ms後に続きの処理が再開される
            delay(500)
            println("処理2-2")
        }
        launch {
            println("処理3-1")
            // delay で処理は中断され、1500ms後に続きの処理が再開される
            delay(1500)
            println("処理3-2")
        }
    }
}

上記のコードを実行すると、以下のようにログが出力される。

処理1-1
処理2-1
処理3-1
処理2-2
処理1-2
処理3-2

後述するが launch でコルーチンを開始することができる。
delay() を呼び出すと指定した時間(ミリ秒)だけ処理が中断され、指定した時間が経過すると続きから処理が再開される。

suspend 関数

先ほど出てきた delay() 関数を呼び出すと処理が中断されると説明したが、なぜ中断されるかというと delay()suspend 関数 となっているためである。

public suspend fun delay(timeMillis: Long) {
    ...
}

suspend 関数とは関数を宣言する際に suspend 修飾子をつけたもので、その関数は 中断される可能性 があることを意味する。
逆に suspend 関数の呼び出しがなければ処理が中断されることはない。
suspend 関数はプログラマが独自に作成することができる。

suspend 関数にはルールがあり、他の suspend 関数内からしか呼び出すことができない

suspend fun execAsyncTask() {
    ...
}

suspend fun anotherAsyncMethod() {
    // 🙆‍♂️ suspend 関数内では、suspend 関数を呼び出せる
    execAsyncTask()
}

fun anotherMethod() {
    // 🙅‍♂️ suspend 関数でない関数内では、suspend 関数を呼び出せない => ビルドエラーになる
    execAsyncTask()
}

先ほどのコードで delay() を呼び出せているのは launch を呼び出す際に引数に指定したラムダが suspend ラムダとなっているため。

launch の定義は以下のようになっている。

public fun CoroutineScope.launch(
    ...
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}

launch {}{} の部分は上記の block 引数のラムダであり、これに suspend が付いているためこのラムダ内では suspend 関数が呼び出せる。

コルーチンの開始方法

launch もしくは async という関数を使うことで、コルーチンを開始することができる。(他にもあるが、この2つがよく使われる)
このようにコルーチンを開始させるものをコルーチンビルダーと呼ぶ。
(説明は省略するが、先ほどのコードに出てきた runBlocking もコルーチンビルダーの一つ)

launchasyncCoroutineScope の拡張関数として定義されているため、これらの関数を使うためには CoroutineScope のインスタンスが必要になる。

import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext

fun main() {
    // CoroutineScope を作成
    val scope = CoroutineScope(EmptyCoroutineContext)

    // launch でコルーチンを開始
    scope.launch {
        // このブロック内では suspend 関数を呼び出すことができる
        delay(500)
    }

    Thread.sleep(1000L)
}

ちなみに launchasync は suspend 関数ではないので、呼び出した時に処理が中断されることはないことに注意。

https://pl.kotl.in/FqFOQEyrn

import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext

fun main() {
    val scope = CoroutineScope(EmptyCoroutineContext)

    println("before launch") // 1
    scope.launch {
	println("before delay") // 3
        delay(500)
	println("after delay")  // 4
    }
    println("after launch") // 2

    Thread.sleep(1000L)
}

例えば上記のコードを実行した場合、以下のようにログが出力される。
launch を呼び出した後、すぐにその下のコードが実行されているのがわかる。

before launch
after launch
before delay
after delay

launch と async の違い

処理の結果が必要な場合は async を使い、そうでない場合は launch を使う

async 関数を呼び出した結果、Deferred<T> という型のインスタンスが返ってくる。
これを使って async 関数で開始したコルーチンの結果を受け取ることができる。

suspend fun fetchData(): String {
    // Web API を実行...
}

val deferred: Deferred<String> = scope.async {
    fetchData()
}

// Deferred には await という関数が用意されている.
// これは suspend 関数になっている.
val result: String = deferred.await()

// 「await() の処理が完了」 = 「fetchData() の処理が完了」したらここのコードが実行される.

上記のコードにおいて await() は suspend 関数になっている。
もし await() を呼び出した時点ですでに fetchData() の処理が完了していれば、呼び出し元が中断されることなく結果が受け取れる。
await() を呼び出した時点でまだ fetchData() の処理が完了してなければ、fetchData() が完了するまで呼び出し元は中断され、fetchData() の処理が完了したら結果を受け取って後続の処理が実行される。

await() は suspend 関数のため suspend 関数内からしか呼び出せない。
以下のような感じで launch や async をネストすることで、await() を呼び出すことができる。
https://pl.kotl.in/wiXX5O424

import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext

fun main() {
    val scope = CoroutineScope(EmptyCoroutineContext)

    scope.launch {
        val result1 = async { 
            delay(1000L)
            System.currentTimeMillis()
        }
        val result2 = async {
            delay(500L)
            System.currentTimeMillis()
        }
        val result = result1.await() - result2.await()
        println("$result")
    }

    Thread.sleep(1100L)
}

CoroutineScope

コルーチンの実行キャンセル

コルーチンスコープの主な機能としては、これまで見てきたようにコルーチンを作成することができる。
また、作成したコルーチンの実行をキャンセルすることもできる。

https://pl.kotl.in/KtZX0zX1S

import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext

fun main() {
    val scope = CoroutineScope(EmptyCoroutineContext)
    scope.launch {
        println("before delay")
        delay(1000)
        println("after delay")
    }

    Thread.sleep(500L)

    // コルーチンをキャンセルする
    scope.cancel()
    
    Thread.sleep(600L)
    println("end")
}

上記のコードを実行すると、以下のように出力される。

before delay
end

after delay が出力されずに、処理が途中で抜けていることがわかる。

コルーチンがキャンセルされた際には suspend 関数で CancellationException の例外がスローされるので、try ~ catch を使うことでキャンセルされたことを判別することもできる。

scope.launch {
    println("before delay")
    try {
        delay(1000)
    } catch (e: CancellationException) {
        // CancellationException でコルーチンの実行がキャンセルされたことが判別できる.
        println("cancelled")
        throw e
    }
    println("after delay")
}

また一度コルーチンスコープをキャンセルすると、それ以降そのコルーチンスコープから新しいコルーチンを起動できなくなるので注意

コルーチンスコープの階層構造

コルーチンスコープは親子関係を作ることができ、それによって階層構造が作られる。
またコルーチンを開始する時にはコルーチンスコープのインスタンスが必要なため、コルーチンは必ずいずれかのコルーチンスコープに紐づく

val scope = CoroutineScope(EmptyCoroutineContext) // 親スコープ
scope.launch { // 子スコープ1
    launch {  // 孫スコープ1
        // ...
        
    }
    launch { // 孫スコープ2
        // ...
    }
}
scope.launch { // 子スコープ2
    launch { // 孫スコープ3
        // ...
    }
}

例えば上記のようにコルーチンを開始すると、以下のような階層構造になる。


コルーチンスコープの階層構造

特定のコルーチンスコープがキャンセルされると、その子孫のスコープが全てキャンセルされる。
これによって適切なタイミングでスコープをキャンセルすることにより、特定のスコープで処理が実行され続けてしまうといったことを防止できる


キャンセルが伝搬していく例

またどこかのコルーチンで例外が発生した場合、繋がっている全てのコルーチンスコープがキャンセルされる。
以下は子のコルーチンで例外が発生した場合の例。


コルーチンで例外が発生した例

特定のコルーチンのキャンセル

launchasync でコルーチンを開始した後、そのコルーチンをキャンセルすることができる。
launch の場合は以下のように、launch の戻り値の Job のインスタンスを使ってキャンセルする。

// launch の戻り値として Job のインスタンスが返される
val job: Job = scope.launch { // ①
   ...
}

...

job.cancel() // ① のコルーチンがキャンセルされる

この場合先ほどの CoroutineScope をキャンセルする場合と異なり、新しくコルーチンを開始させることができる。
async の場合は先ほど説明したように Deferred<T> のインスタンスが返ってくるので、これを使ってキャンセルする。
ちなみに Deferred<T>Job を継承している。

val deferred: Deferred<String> = scope.async { // ①
    fetchData()
}

...

deferred.cancel() // ① のコルーチンがキャンセルされる

コルーチンの実行スレッド

CoroutineDispatcher を使うことで、コルーチンの実行スレッドを指定でき、スレッドはスレッドプールによって管理される。
CoroutineDispatcher はコルーチンスコープを作成する時に指定できる。
Kotlin Coroutines では以下の CoroutineDispatcher が提供されている。

  • Dispatchers.Default : CPU バウンドなタスクを実行する際に指定する
  • Dispatchers.IO : I/O バウンドなタスクを実行する際に指定する
  • Dispatchers.Main : メインスレッドで実行する際に指定する(Android などの特定のプラットフォームでのみ提供されている)
  • Dispatchers.Unconfined : 現在のスレッドで実行する際に指定する(詳細は後述のおまけで)
val scope = CoroutineScope(Dispatchers.IO)
scope.launch {
    println(Thread.currentThread().name) // DefaultDispatcher-worker-1
}

前述のコードでは CoroutineScope の作成時に EmptyCoroutineContext を指定していたが、これは CoroutineDispatcher を指定していない。
この場合、デフォルトで CoroutineDispatchers.Default の CoroutineDispatcher が使われることになる。

Dispatchers.Default や Dispatchers.IO は裏側でスレッドプールによってスレッドが管理されているため、複数のコルーチンを開始した場合、それぞれ複数の異なるスレッドで実行される可能性がある。
実行するスレッドを制限したい場合には、Java の ExecutorService から CoroutineDispatcher への変換がサポートされている。

// 処理が実行されるスレッドを一つに制限する.
val dispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher)

実行スレッドの切り替え

launchasync では CoroutineDispatcher を指定することができるので、以下のように実行するスレッドを切り替えることが可能。

val scope = CoroutineScope(Dispatchers.Main)
scope.launch {
    println(Thread.currentThread().name) // main
    launch(Dispatchers.IO) {
        println(Thread.currentThread().name) // DefaultDispatcher-worker-1
    }
}

ただし上記の場合新しくコルーチンを開始しており、コルーチンの開始は少なからずコストがかかる。
もし単にスレッドを切り替えたいだけであれば withContext を使う。

val scope = CoroutineScope(Dispatchers.Main)
scope.launch {
    println(Thread.currentThread().name) // main
    // withContext は suspend 関数となっており、ブロック内の処理が完了するまで中断し、処理が全て完了した後に再開される
    withContext(Dispatchers.IO) {
        // このブロック内では IO スレッドで処理を実行
        println(Thread.currentThread().name) // DefaultDispatcher-worker-1
        delay(100L)
    }
    // これ以降は引き続き main スレッドで実行される
    println(Thread.currentThread().name) // main
}

エラーハンドリング

エラーハンドリングはいくつか方法がありちょっと難しいが、よく使われて一番シンプルだと思われる try ~ catch の方法を紹介。
以下のように launch 内で try ~ catch を使ってエラーをハンドリングできる。

scope.launch {
    try {
        fetchData()
    } catch (e: Throwable) {
        // エラーハンドリング
    }
}

ちなみに launch の外側を try ~ catch で囲んでも例外をキャッチすることはできないので注意。

try {
    // 🙅‍♂️ これは例外をキャッチできない
    scope.launch {
        fetchData()
    }
} catch (e: Throwable) {
}

async の場合のエラーハンドリングはちょっと難しい。
説明にかなり時間がかかってしまうので今回は割愛。
エラーハンドリングについては以下の記事を読むことをおすすめ。

おまけ

Android でサポートされている CoroutineScope

  • viewModelScope
    • ViewModel でサポートされているコルーチンスコープ
    • ViewModel が破棄されるタイミングで、このコルーチンスコープから起動されたコルーチンは全てキャンセルされる
  • lifecycleScope
    • Activity / Fragment でサポートされているコルーチンスコープ
    • Activity / Fragment が破棄されるタイミングで、このコルーチンスコープから起動されたコルーチンは全てキャンセルされる

Dispatchers.Unconfined の挙動について

Dispatchers.Unconfined を使った場合、呼び出した後は元々実行されていたスレッドで処理が実行される。
ただし、途中でスレッドが変わった場合、それ以降はそのスレッドで処理が実行される。

val scope = CoroutineScope(Dispatchers.Unconfined)
scope.launch {
    println(Thread.currentThread().name) // main
    // withContext で実行スレッドを変える
    withContext(Dispatchers.Default) {
        println(Thread.currentThread().name) // DefaultDispatcher-worker-1
        delay(100L)
    }
    // 上記でスレッドが変わったため、これ以降の処理が実行されるスレッドは変わる
    println(Thread.currentThread().name) // DefaultDispatcher-worker-1
}

例えば上記のコードを実行すると、以下のようにログが出力される。

main
DefaultDispatcher-worker-1
DefaultDispatcher-worker-1

協調的マルチタスク

Kotlin のコルーチンは協調的マルチタスク(non-preemptive multitasking)になっている。
これはどういうことかというと、各コルーチンで実行されるタスクは実行を占有せずに、適切なタイミングで他のタスクに実行を空け渡すようにしなければならない。

例えば以下のように一つのコルーチンで実行されるタスクが無限ループするような場合、もう片方のコルーチンではタスクが実行されなくなってしまう。

val dispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher)
scope.launch {
    // 無限ループでスレッドを占有してやるぜ😈
    while (isActive) {
        ...
    }
}

scope.launch {
    // ずっと実行されない...🥲
}

このような場合、yield() を使って他のコルーチンでタスクが実行されるようにする必要がある。

scope.launch {
    // 無限ループでスレッドを占有してやるぜ😈
    while (isActive) {
        ...
        yield() // 適切なタイミングで yield() を呼んで他のコルーチンにタスクを空け渡します👼
    }
}

scope.launch {
    // yield() の呼び出し後に実行された!🥰
}

ちなみにコルーチン内で無限ループする場合、条件に isActive を設定しておくとコルーチンがキャンセルされた場合に false になるので、コルーチンのキャンセルに対応できる。

scope.launch {
    while (isActive) {
        ...
    }
}

最後に

以上が社内勉強会で説明した内容になります。

社内のエンジニアのほとんどが参加してくださり、Kotlin を触ったことがない方からも勉強になったと言っていただけて非常に嬉しかったです!

この記事がどなたかの参考になれば幸いです。

アルダグラム Tech Blog

Discussion

Miyuki OnumaMiyuki Onuma
処理1-1
処理2-1
処理3-1
処理2-1
処理1-1
処理3-1

上記は以下の間違いでしょうか

処理1-1
処理2-1
処理3-1
処理2-2
処理1-2
処理3-2
watabeewatabee

おっしゃる通り間違っていましたので修正いたしました。
ご指摘いただきありがとうございました!