🚚

内部実装から理解するKotlin Coroutines:CoroutineDispatcher編

に公開

本記事の目的・対象者

本記事は、Kotlin Coroutines の基礎をすでに理解している中級者以上の開発者を対象に、さらに理解を深めることを目的としたシリーズの一部です。
今回は、CoroutineDispatcherに焦点を当て、その内部的な仕組みについて解説していきます。

Kotlin Coroutinesの初歩は、すでに理解していることを前提としています。
公式ドキュメントの説明[1]や、私が以前執筆した別の記事[2]を先に参照されることをおすすめします。

https://zenn.dev/kaseken/articles/99d92a128cbc9a

CoroutineDispatcherはどこで利用されるのか

CoroutineDispatcherとは、コルーチンがどのスレッドまたはスレッドプールで実行されるかを指定する、CoroutineContextの一要素です。

コンピュータサイエンスの分野において「dispatch」とは、「命令・イベント・関数呼び出しなどをハンドラに割り当てる」という意味で使われる用語です。CoroutineDispatcherも同様に、処理をスレッドあるいはスレッドプールへ割り当てるという役割を担っています。

CoroutineDispatcherは、以下の2つのタイミングで利用されます。

  1. Coroutineの起動時
  2. Coroutineの再開時

以下に示すコード例を用いて、具体的に見ていきましょう (Kotlin Playground)。

まず、「1. Coroutineの起動時」は、Coroutine BuilderがCoroutineを作成・起動したタイミングです。以下のサンプルコードでは、launchの部分に当たります。

import kotlinx.coroutines.*

suspend fun someSuspendFunction(functionId: Int) {
    val firstThread = Thread.currentThread().id
    delay(100) // 2. Coroutineの再開時 (= delayの完了時)
    val secondThread = Thread.currentThread().id
    println("Thread switched from $firstThread to $secondThread.")
}

suspend fun main() {
    coroutineScope {
        for (i in 0..<5) {
            launch(Dispatchers.IO) { // 1. Coroutineの起動時
                someSuspendFunction(i)
            }
        }
    }
}

出力:

Thread switched from 15 to 13.
Thread switched from 13 to 17.
Thread switched from 14 to 15.
Thread switched from 12 to 15.
Thread switched from 17 to 17.

suspend関数内では、処理が一時停止・再開するポイントがあります。このポイントのことをsuspension pointと呼びます。上記のサンプルコードにおいては、delay(100)がsuspension pointに相当します。
delay(100)の処理が完了し、後続の処理が再開するタイミングが「2. Coroutineの再開時」に相当します。

suspension pointに達すると、suspend関数内の処理が一時停止するとともに、実行スレッドが解放されます。そして、再開時には、停止以前と同じスレッドで再開するとは限りません。
上記のサンプルコードにおいては、CoroutineDispatcherとしてDispatchers.IOを使用しています。出力からも分かる通り、スレッドプール内のスレッドで処理が実行されるため、suspension pointの前後で実行スレッドが切り替わることがあります。

CoroutineDispatcherの内部実装 ー dispatchが呼ばれるまで

前項で見たように、CoroutineDispatcherは、「Coroutineの起動時」と「Coroutineの再開時」に処理を実行するスレッドを決定する役割を担っています。
ここからは、どのようにして所定のスレッドへと処理が割り当てられるのか、具体的にソースコードを追っていきましょう。

まずは、Coroutine BuilderがCoroutineを起動する際のCoroutineDispatcherの関与を追ってみましょう。
代表的なCoroutine Builderであるlaunchのソースコードを以下に示します[3]

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

Coroutineを起動するためのcoroutines.startが呼ばれていることが分かります。なお、同じく代表的なCoroutine Builderであるasyncメソッドでも、同様にcoroutines.startが呼ばれています[4]

startメソッドは、AbstractCoroutineに対して定義されています[5]
コメントからも、Coroutineを起動するためのメソッドであることが分かります。

    /**
     * Starts this coroutine with the given code [block] and [start] strategy.
     * This function shall be invoked at most once on this coroutine.
     * 
     * - [DEFAULT] uses [startCoroutineCancellable].
     * - [ATOMIC] uses [startCoroutine].
     * - [UNDISPATCHED] uses [startCoroutineUndispatched].
     * - [LAZY] does nothing.
     */
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

AbstractCoroutine.startの内部のstart(block, receiver, this)では、CoroutineStart.invoke[6]が呼ばれます。

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

基本的には、CoroutineStartはDEFAULTとなり、block.startCoroutineCancellable(receiver, completion)が呼ばれます。

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

startCoroutineCancellableの内部では、createCoroutineUninterceptedinterceptedresumeという3つのメソッドが続けて呼ばれています。それぞれの役割を見ていきましょう。

1. createCoroutineUninterceptedの役割

createCoroutineUninterceptedは、Coroutineの動作の要である「Continuation」を初期化します。Continuationは、Coroutineの状態を管理するためのインスタンスです。suspend関数内で処理を一時停止できる仕組みは、Continuationによって実現されています。
なお、Continuationの詳細に関しては、別で解説記事を書いたので、そちらをご参照ください[7]

https://zenn.dev/kaseken/articles/a50fd3f5e6e2ba

CoroutineDispatcherは、作成されたContinuationのcontext: CoroutineContextフィールド内に保持されます[8]

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

2. interceptedの役割

interceptedメソッドは、前のステップで初期化されたContinuationを、CoroutineDispatcherに対して送信可能とします。より厳密に言えば、CoroutineContextに登録されたContinuationInterceptorを用いて、Continuationに変換を加えます。
「intercept」とは傍受するという意味の単語ですが、Coroutineの実行を傍受して、変換を加えるというイメージです。

JVMプラットフォームにおけるinterceptedのソースコードを以下に示します [9]

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

このように、ContinuationInterceptorCoroutineContextに存在する場合、interceptContinuationが呼び出されてContinuationが変換されます。
そして、CoroutineDispatcherContinuationInterceptorを継承しているため、DispatcherがContextに含まれていれば、そのDispatcherのinterceptContinuationが呼ばれます。

interceptContinuationのソースコードを以下に示します [10]

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

このメソッドは、渡されたContinuationDispatchedContinuationにラップして返します。

DispatchedContinuationは、元のContinuationをCoroutineDispatcherによってスケジューリング可能な形に変換するラッパーです。以下にソースコードの一部を抜粋します[11]

internal class DispatchedContinuation<in T>(
    @JvmField internal val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
}

このように、DispatchedContinuationdispatcherを内部に保持し、Coroutineの起動・再開処理をDispatcherに委ねることができます。

3. resumeの役割

前のステップで返されたDispatchedContinuationに対して、resumeが呼び出されます。
resumeは、Coroutineを再開させるためのメソッドです。ここから分かるように、resumeというメソッド名であるものの、実は初回起動時にもresumeメソッドが呼ばれます。

resumeのソースコードを以下に示します [12]

/**
 * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
 */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

resumeは、DispatchedContinuation.resumeWithを呼び出します。resumeWithのソースコードを以下に示します[13]

    override fun resumeWith(result: Result<T>) {
        val state = result.toState()
        if (dispatcher.safeIsDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.safeDispatch(context, this)
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }

resumeWithは、まずsafeIsDispatchNeededメソッドで、CoroutineDispatcherによるdispatch (スレッドあるいはスレッドプールへの割り当て) が必要かをチェックします。
そして、dispatchが必要であれば、dispatcher.safeDispatchを呼び出し、CoroutineDispatcherに処理を送信します。
safeDispatchメソッド内では、以下のようにdispatchが呼ばれます[14]

internal fun CoroutineDispatcher.safeDispatch(context: CoroutineContext, runnable: Runnable) {
    try {
        dispatch(context, runnable)
    } catch (e: Throwable) {
        throw DispatchException(e, this, context)
    }
}

基本的にはsafeIsDispatchNeededはtrueとなる、すなわちCoroutineDispatcherのdispatchが呼ばれるケースが大半です。
例外として、「Dispatchers.Unconfinedを使用している」あるいは「Dispatchers.Main.immediateを使用している、かつ既にメインスレッドで実行されている」の場合のように、dispatchがスキップされるケースもあります。

ここまでで、Coroutine Builderのlaunchメソッドを出発点として、CoroutineDispatcherのdispatchが呼ばれるまでの流れを追うことができました。

他方、Coroutineの再開時には、Continuationに対してresumeが呼び出されます。そのため、同様にresumeWith内でCoroutineDispatcher.dispatchが呼ばれることになります。

CoroutineDispatcherの内部実装 ー dispatchが呼ばれた後

続いて、CoroutineDispatcher.dispatchが呼ばれた後の動作を追っていきます。
dispatchは、CoroutineDispatcherに対して抽象メソッドとして定義されています [15]

public abstract class CoroutineDispatcher {
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
}

具体的な実装はCoroutineDispatcherの各サブクラスで行われています。すなわち、dispatchの具体的な内部実装は、「CoroutineDispatcherの種類」ならびに「動作プラットフォーム」によって異なります。

そこで、以降のセクションでは、実行環境として最も典型的である「JVM/Androidプラットフォーム」を想定した上で、各CoroutineDispatcherごとの内部実装を追っていきます。

CoroutineDispatcherの種類

JVM/Androidプラットフォームでは、以下の4種類のCoroutineDispatcherがあらかじめ定義されています。

  • Dispatchers.Default
  • Dispatchers.IO
  • Dispatchers.Main
  • Dispatchers.Unconfined

Dispatchers.Default

Dispatchers.Defaultでは、CPUのコア数と同数のスレッドを持つスレッドプールが、処理の実行に用いられます[16]。スレッド数に上限が存在するため、CPU-boundな処理に向いています。
Dispatchers.Defaultは、その名の通り、CoroutineDispatcherがCoroutineContextで指定されていない場合にデフォルトで使用されるCoroutineDispatcherです。

では、Android/JVMプラットフォームにおけるDispatchers.Defaultの具体的な実装を見ていきましょう。

Android/JVMプラットフォーム上のDispatchers.Defaultの 実装

Dispatchers.Defaultの実装は、Dispatchers.ktにあります[17]

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = DefaultScheduler
}

ここで用いられているDefaultSchedulerは、SchedulerCoroutineDispatcherを継承しており、dispatchメソッドはそちらで定義されています[18]

以下に、SchedulerCoroutineDispatcherdispatchメソッドを抜粋して示します[19]

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    private var coroutineScheduler = createScheduler()

    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}

dispatchメソッドが呼び出されると、CoroutineSchedulerdispatchメソッドが呼ばれることが分かります。
この**CoroutineSchelulerこそが、CoroutineDispatcherの内部実装の中核**といえます。

CoroutineSchedulerは、その名の通り、実行タスクのスケジューリングを行います。
具体的には、「タスクのスケジューリング (スレッドへのタスクの割り当てや実行順の制御)」ならびに「スレッドプール内のスレッド数の調整」を行っています。

CoroutineSchedulerは、Dispatchers.DefaultDispatchers.IOの両方で利用するための共通のスレッドプールを管理しています。
ただし、スレッドプールは共通であるものの、「CPUバウンドな処理を実行するスレッド」と「IOバウンドなブロッキングタスクを実行するスレッド」は、別々にカウントされています。Dispatchers.Defaultでは、前者のスレッドが利用されており、こちらのスレッド数は上限に収まるようになっています。以下に模式図を示します。

これら2種類のスレッドは、以下のような違いが存在します。

  • CPUバウンドな処理を実行するスレッド
    ワーカースレッドの数は、corePoolSizeパラメータで決まる。基本的には、CPUのコア数と同一である (ただし、最低でも2以上の値を取り、例外的にシングルコアのCPUでは2となる)。Dispatchers.Defaultでは、こちらが利用される。
  • ブロッキング処理を実行するスレッド
    ワーカースレッドは必要に応じて作成され、最大でmaxPoolSizeパラメータで指定された個数までスレッドが作成される。デフォルト値のMAX_POOL_SIZEは1 << 21 = 2,097,152で、実質的には無制限にブロッキングタスク用のバックグラウンドスレッドが作られる。Dispatchers.IOでは、こちらが利用される。

以下のサンプルコードで、Dispatchers.DefaultDispatchers.IOで使用されるスレッド数の上限を確認してみましょう (Kotlin Playground)。

import kotlinx.coroutines.*

suspend fun main() {
    println("コア数: ${Runtime.getRuntime().availableProcessors()}")
    coroutineScope {
        for (i in 0..<8) {
            launch(Dispatchers.Default) {
                println("Current thread ID: ${Thread.currentThread().id}")
                Thread.sleep(10)
            }
        }
    }
    println("`Dispatchers.Default` finished.")
    coroutineScope {
        for (i in 0..<8) {
            launch(Dispatchers.IO) {
                println("Current thread ID: ${Thread.currentThread().id}")
                Thread.sleep(10)
            }
        }
    }
    println("`Dispatchers.IO` finished.")
}

出力の例:

コア数: 2
Current thread ID: 12
Current thread ID: 13
Current thread ID: 13
Current thread ID: 12
Current thread ID: 13
Current thread ID: 12
Current thread ID: 13
Current thread ID: 12
`Dispatchers.Default` finished.
Current thread ID: 13
Current thread ID: 14
Current thread ID: 16
Current thread ID: 15
Current thread ID: 17
Current thread ID: 18
Current thread ID: 20
Current thread ID: 12
`Dispatchers.IO` finished.

Dispatchers.Defaultの場合には、スレッド数に上限が存在しており、かつその上限が、コア数 (= 2) と合致していることが分かります。
他方、Dispatchers.IOでは、スレッド数に上限がない (正確には上限が非常に大きい) ことが分かるます。また、Dispatchers.DefaultDispatchers.IOで同じスレッドが使用されており、共通のスレッドプールに基づいていることも示されています。


CoroutineDispatcherの全体像は、タスクスケジューリングの機構など、非常に複雑です。ここではdispatchメソッドで行われていることを理解するに留めます。

以下にCoroutineScheduler.dispatchのソースコードを抜粋し、簡略化したものを示します[20]
補足として、Dispatchers.IOでもこのメソッドが使用されますが、説明のために、ここではDispatchers.IOに対応するためのコードを省いています。

    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
        val task = createTask(block, taskContext)
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, fair)
        if (notAdded != null) {
            addToGlobalQueue(notAdded)
        }
        signalCpuWork()
    }

CoroutineScheduler.dispatch内には、主に3つのことが行われています。。

  1. 処理 (block: Runnable) を、スケジュール可能な形 (Task) へと変換する。
  2. タスクキューにタスクを追加する。
    なお、タスクキューには、「スレッドごとのローカルキュー」と「全スレッドが参照できるグローバルキュー」の2種類が存在する。まずは、優先的にローカルキューへのプッシュし、所定のスレッドに対して処理を割り当てる。それに失敗した場合には、グローバルキューに追加する。
  3. signalCpuWorkを実行する。これにより、スレッドにタスクが追加されたことが通知され、処理の実行が促される。なお、タスクは同期的に実行されるわけではなく、各スレッドが非同期的に実行する。

signalCpuWorkのコードは以下のようになっています。
tryUnparkは、idle状態 (Parked状態) のスレッドがあれば、それを呼び起こす処理です。
tryCreateWorkerは、idle状態のスレッドが存在せず、かつスレッド数が未だ上限に達していなかった場合に、新たにスレッドを作成する処理です。

    fun signalCpuWork() {
        if (tryUnpark()) return
        if (tryCreateWorker()) return
        tryUnpark()
    }

このようにして、スレッドが空いている、あるいはスレッドの追加作成の余地がある場合には、できるだけ速やかに処理が実行されるようになっています。

なお、スレッドは必要に応じて追加されるだけではなく、スレッドが一定時間idle状態であった場合には、tryTerminateWorkerメソッドが呼び出され、停止するようになっています [21]

Dispatchers.IO

Dispatchers.IOは、IO-boundなブロッキングタスクの実行に適したCoroutineDispatcherです。
裏側では、Dispatchers.Defaultと共通のスレッドプール上で処理を実行します。
ただし、前述したように、Dispatchers.Defaultとは別でスレッド数の上限が管理されており、より多くのスレッドを起動することが可能です。

では、上記の仕様が、どのように実現されているのかを内部実装から探っていきましょう。

JVMにおいては、Dispatchers.IOとしてDefaultIoSchedulerが使用されます。

public actual object Dispatchers {
    @JvmStatic
    public val IO: CoroutineDispatcher get() = DefaultIoScheduler
}

DefaultIoScheduler.dispatchのソースコードを以下に抜粋します。

internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        default.dispatch(context, block)
    }
}

DefaultIoScheduler.dispatch内では、UnlimitedIoScheduler.limitedParallelismから得られるLimitedDispatcher (CoroutineDispatcherのサブクラス) に対してdispatchが呼ばれています。

なお、ここでUnlimitedIoScheduler.limitedParallelismparallelismパラメータに64.coerceAtLeast(AVAILABLE_PROCESSORS)が渡されています。Dispatchers.IOが同時並行に実行できるスレッド数の上限は、デフォルトで64に設定されていることが分かります。

LimitedDispatcherとは、同時並行に実行されるスレッド数を制限するための仕組みを持ったCoroutineDispatcherです。LimitedDispatcherの内部実装に関しては、後ほど別のセクションで詳細に見るため、ここでは詳細な説明は省かせていただきます。

LimitedDispatcher.dispatchを経由して、UnlimitedIoSchedulerdispatchメソッドが呼ばれます [22]

UnlimitedIoSchedulerdispatchメソッドを以下に示します [23]

private object UnlimitedIoScheduler : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}

DefaultScheduler.dispatchWithContextが呼ばれています。
注目すべき点として、2つ目の引数にBlockingContextが渡されています。この意味については後ほど解説します。

これは、基底クラスであるSchedulerCoroutineDispatcherdispatchWithContextメソッドを呼び出し、最終的にSchedulerCoroutineDispatcher.dispatchが呼ばれます[24]
SchedulerCoroutineDispatcher.dispatchは、Dispatchers.Defalultにおいてもタスクスケジューリングの根本を担っていたメソッドです。

internal open class SchedulerCoroutineDispatcher() {
    internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
        coroutineScheduler.dispatch(block, context, fair)
    }

ただし、Dispatchers.DefaultDispatchers.IOは同じメソッドを利用しているものの、先述したBlockingContextが渡されている点が大きな違いを生じます。

以下に、Dispatchers.IOから呼ばれる場合のSchedulerCoroutineDispatcher.dispatchのコードを簡略化して示します[25]

    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
        val task = createTask(block, taskContext)
        val stateSnapshot = incrementBlockingTasks()
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, fair)
        if (notAdded != null) {
            addToGlobalQueue(notAdded)
        }
        signalBlockingWork(stateSnapshot)
    }

Dispatchers.Defaultとの違いとして、「incrementBlockingTasks()を呼ぶことで、ブロッキングタスクの個数をインクリメントしていること」および「signalCpuWorkの代わりにsignalBlockingWorkを呼んでいること」の2点があります。

signalBlockingWorkメソッドのソースコードを以下に示します[26]

    private fun signalBlockingWork(stateSnapshot: Long) {
        if (tryUnpark()) return
        if (tryCreateWorker(stateSnapshot)) return
        tryUnpark()
    }

tryUnparkは、Dispatchers.Defaultのセクションでも説明したように、idle状態のスレッドがあれば、それを確保するためのものです。

tryCreateWorkerは、idle状態のスレッドがない場合に、新たにスレッドを追加します。tryCreateWorkerのソースコードを以下に示します[27]

    private fun tryCreateWorker(state: Long = controlState.value): Boolean {
        val created = createdWorkers(state)
        val blocking = blockingTasks(state)
        val cpuWorkers = (created - blocking).coerceAtLeast(0)
        if (cpuWorkers < corePoolSize) {
            val newCpuWorkers = createNewWorker()
            if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
            if (newCpuWorkers > 0) return true
        }
        return false
    }

tryCreateWorkerのソースコードから、「全スレッド数 - ブロッキングタスク用のスレッド数」が、CPUタスク用のスレッド数の上限 (基本的にはCPUのコア数) を下回る場合には、新たに共通スレッドプールに対してスレッドが追加されることが分かります。Dispatchers.IO経由でdispatchを実行する際には、「ブロッキングタスクの個数」が加算されるため、tryCreateWorkerが呼ばれた際には新しいスレッドが1つ作られることになります。

このようなメカニズムによって、「CPUバウンドな処理用のスレッド (上限はCPUのコア数)」と「IOバウンドなブロッキングタスク用のスレッド」のスレッド数を、裏側では共通スレッドプールを用いつつ、別々に管理することが可能となっています。

Dispatchers.Main

Dispatchers.Mainは、メインスレッド (Androidの場合はUIスレッド) で実行するためのCoroutineDispatcherです。

以下に、JVMプラットフォームにおけるDispatchers.Mainのソースコードを示します[28]

public actual object Dispatchers {
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

MainDispatcherLoader.dispatcherが使用されています。
そこで、MainDispatcherLoaderのソースコードを以下に示します[29]

internal object MainDispatcherLoader {

    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
}

MainDispatcherLoader.dispatcherが、MainDispatcherFactoryで作られることが分かります。
MainDispatcherFactoryの実装は、実行環境ごとに定義されています。例えば、kotlinx-coroutines-android内のAndroidDispatcherFactorykotlinx-coroutines-javafx内のJavaFxDispatcherFactorykotlinx-coroutines-swing内のSwingDispatcherFactoryなどが存在します。

AndroidDispatcherFactoryをソースコードの一部を抜粋します[30]

internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispclass Handler(atcher {
        val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
        return HandlerContext(mainLooper.asHandler(async = true))
    }
}

Looper.getMainLooper()[31]から作成されたHandler[32]を使って、HandlerContextが作られています。
補足すると、Looperとは、Android SDKで定義されているクラスで、スレッド上でメッセージループを実行するためのオブジェクトです。また、HandlerもAndroid SDKで定義されているクラスで、メッセージループのキューに対して処理 (Runnable) を渡すためのオブジェクトです。

HandlerContextの一部を以下に示します[33]

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
}

HandlerContextは、Kotlin CoroutineからHandlerに対して処理を送るためのWrapperです。つまり、HandlerContextを経由して、AndroidのUIスレッドに対して、処理の実行を依頼することが可能です。

dispatchが呼ばれると、handler.postblockが渡されることが分かります。Handlerのpostメソッドは、UIスレッド用の実行キューに対して処理を渡します。

このようにして、Dispatchers.Mainは実行環境に応じたメインスレッドに対して、処理の依頼を可能としています。

Dispatchers.Main.immediateについて

Dispatchers.Mainの代わりに、Dispatchers.Main.immediateと指定することもできます。
この場合、すでに実行スレッドがUIスレッドだった場合には、dispatchをスキップして、そのままUIスレッド上で処理が実行されます。これにより、dispatchにかかるオーバーヘッドを削減し、パフォーマンスを改善することができます。

Dispatchers.Main.immediateがどのように実現されているのを、HandlerContextの内部実装から確かめてみましょう。

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    override val immediate: HandlerContext = if (invokeImmediately) this else
        HandlerContext(handler, name, true)

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
}

Dispatchers.Main.immediateの場合には、invokeImmediatelyがtrueとなります。
isDispatchNeededメソッドを見ると、Looper.myLooper() != handler.looperにより、現在の実行スレッドがUIスレッドだった場合にはfalseが返され、dispatchがスキップされることが分かります。

Dispatchers.Unconfined

Dispatchers.Unconfinedは、特殊なDispatcherで、Coroutineが実行されるスレッドを指定しません [34]

Coroutineの起動後は、そのCoroutineが起動されたスレッド上で、そのまま処理が実行されます。内部の動作としては、isDispatchNeededがfalseとなるため、dispatchがスキップされます。
その後、suspension pointで処理が再開された場合には、そのsuspend関数が実行されていたスレッド上で、そのまま処理が再開されます。例えば、delayでsuspendされた場合には、delayメソッドが使用していたスレッドでそのまま再開後の処理も継続されます。

以下のサンプルコードで動作を確認してみましょう (Kotlin Playground)

import kotlinx.coroutines.*

suspend fun main() {
    coroutineScope {
        // ID=1のスレッドで実行
        println("0: Current thread ID: ${Thread.currentThread().id}")
        launch(Dispatchers.Unconfined) {
            // dispatchされず、ID=1のスレッドでそのまま実行
            println("1: Current thread ID: ${Thread.currentThread().id}")
            delay(100) // delayはバックグラウンドスレッドの1つ (ID=12のスレッド) で実行
            // dispatchされず、ID=12のスレッドでそのまま実行
            println("2: Current thread ID: ${Thread.currentThread().id}")
            launch(Dispatchers.Unconfined) {
                // dispatchされず、ID=12のスレッドでそのまま実行
                println("3: Current thread ID: ${Thread.currentThread().id}")
            }
        }
    }
})

出力の例:

0: Current thread ID: 1
1: Current thread ID: 1
2: Current thread ID: 12
3: Current thread ID: 12

Dispatchers.Unconfinedの利用場面としては、不要なdispatchを無くすことでパフォーマンス上のオーバーヘッドを削減したいケース、あるいはdispatchが望ましくない副作用を引き起こすことを避けたいケースなどが考えられます。
ただ、一般的な利用の範囲内では、あえてDispatchers.Unconfinedを利用すべきケースはほとんどありません。そのため、Dispatchers.Unconfinedに関しては、内部実装の説明はスキップさせていただきます。

limitedParallelismの動作・仕組み

CoroutineDispatcherのインタフェースには、limitedParallelismというメソッドが定義されています。これは、Dispatchers.DefaultDispatchers.IOなどのスレッドプールに対して処理を送信するCoroutineDispatcherにおいて、同時並行で実行されるスレッド数を制限するための仕組みです。

limitedParallelismの動作を、サンプルコードを用いて実際に確認してみましょう (Kotlin Playground)。
Dispatchers.IOを使用して、同時並行に6個のCoroutineを立ち上げてみます。各Coroutine内では、スレッドを1秒間スリープさせます。すると、ID 12からID 17までの6個のスレッドが起動し、同時並行で実行が進むことが分かります。同時並行で処理が進むので、合計所要時間も1秒程度です。

import kotlinx.coroutines.*

suspend fun main() {
    val start = System.currentTimeMillis()
    val dispatcher = Dispatchers.IO
    coroutineScope {
        for (i in 0..5) {
            launch(dispatcher) {
                println("i=$i, Current thread ID: ${Thread.currentThread().id}")
                Thread.sleep(1000)
            }
        }
    }
    val end = System.currentTimeMillis()
    println("Duration milliseconds=${end - start}")
}

出力:

i=2, Current thread ID: 14
i=0, Current thread ID: 12
i=4, Current thread ID: 16
i=1, Current thread ID: 13
i=3, Current thread ID: 15
i=5, Current thread ID: 17
Duration milliseconds=1117

続いて、Dispatchers.IO.limitedParallelism(2)に変えてみましょう (Kotlin Playground)。
サンプルコードを実行すると、そのCoroutineDispatcherを利用しているスコープにおいて、同時に実行されるコルーチンの個数が2つに制限されることが分かります。並列数が2になるため、合計実行時間は3秒程度になっています。

import kotlinx.coroutines.*

suspend fun main() {
    val start = System.currentTimeMillis()
    val dispatcher = Dispatchers.IO.limitedParallelism(2)
    coroutineScope {
        for (i in 0..5) {
            launch(dispatcher) {
                println("i=$i, Current thread ID: ${Thread.currentThread().id}")
                Thread.sleep(1000)
            }
        }
    }
    val end = System.currentTimeMillis()
    println("Duration milliseconds=${end - start}")
}

出力の例:

i=0, Current thread ID: 12
i=1, Current thread ID: 13
i=2, Current thread ID: 13
i=3, Current thread ID: 12
i=5, Current thread ID: 12
i=4, Current thread ID: 13
Duration milliseconds=3103

ただし、誤解を与えないよう補足すると、limitedParallelismは専用のスレッドプールを作るわけではありません。Dispatchers.DefaultDispatchers.IOと共通のスレッドプールを利用します。
また、あくまで同時に実行されるスレッドの数が指定されるのみで、どのスレッドで実行されるかが固定されることもありません。上の例では、ID 12, ID 13の2つのスレッドで実行されましたが、あるタイムスパンでは別の組み合わせ (例えば、ID 12, ID 14の組み合わせ) で実行されることも考えられます。

limitedParallelismの内部実装

では、limitedParallelismがどのように機能を実現しているか、内部実装を見ながら解説します。
limitedParallelismは、基底クラスのCoroutineDispatcherに対して定義されています [CoroutineDispatcher.limitedParallelismのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt#L176]。

public abstract class CoroutineDispatcher {
    public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
        parallelism.checkParallelism()
        return LimitedDispatcher(this, parallelism, name)
    }
}

LimitedDispatcherのインスタンスが返されます。これは先述したDispatchers.IOの解説でも登場しました。再びLimitedDispatcherの主要部分を抜粋して示します。

internal class LimitedDispatcher(
    private val dispatcher: CoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
    private val runningWorkers = atomic(0)
    private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatchInternal(block) { worker ->
            dispatcher.safeDispatch(this, worker)
        }
    }

    // 渡された`block`を`dispatch`する。Workerが不足する場合には新たに起動する。
    private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
        // タスクキューにタスクを追加する。
        queue.addLast(block)
        // 実行中のWorker数が`parallelism`に達している場合にはreturnする。
        if (runningWorkers.value >= parallelism) return
        // Worker数を増やす。
        if (!tryAllocateWorker()) return
        val task = obtainTaskOrDeallocateWorker() ?: return
        try {
            // 増やしたWorkerでタスクを実行開始する。
            startWorker(Worker(task))
        } catch (e: Throwable) {
            runningWorkers.decrementAndGet()
            throw e
        }
    }

    private inner class Worker(private var currentTask: Runnable) : Runnable {
        override fun run() {
            try {
                while (true) {
                    try {
                        // 初期タスクを実行する。
                        currentTask.run()
                    } catch (e: Throwable) {
                        handleCoroutineException(EmptyCoroutineContext, e)
                    }
                    // キューにまだタスクが残っている場合には、拾って実行する。
                    currentTask = obtainTaskOrDeallocateWorker() ?: return
                }
            } catch (e: Throwable) {
                synchronized(workerAllocationLock) {
                    runningWorkers.decrementAndGet()
                }
                throw e
            }
        }
    }
}

LimitedDispatcherは、主に以下の3つの構成要素から成ります。

  1. タスクを実行する (CoroutineDispatcherdispatcherを呼ぶ) ためのWorker
  2. 並列実行中のWorker数を管理・調整するためのAtomic Integerの変数 (runningWorkers)
  3. タスクを保持・バッファリングするためのタスクキュー (queue)

LimitedDispatcherdispatchメソッドが呼び出されると、まずはタスクキューにタスクが追加されます。その後、実行中のWorkerの数が上限に達していない場合 (runningWorkers < parallelismの場合) には、新たなWorkerを追加し、起動します。この時runningWorkersがインクリメントされます。
実行中のWorkerは、タスクキューが空になるまで、タスクを拾って実行し続けます。Workerがidle状態になると解放され、runningWorkersがデクリメントされます。

このようにして、並列実行中のタスク数を制限する仕組みを実現しています。スレッドプールのサイズ設定に関わらず、インメモリの変数で並列度を制御していることが分かります。

脚注
  1. Coroutines guide: https://kotlinlang.org/docs/coroutines-guide.html ↩︎

  2. "Kotlin Coroutinesの核心:Builder・CoroutineScope・Job・CoroutineContextの関係" https://zenn.dev/kaseken/articles/99d92a128cbc9a ↩︎

  3. launchのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/Builders.common.kt#L44 ↩︎

  4. asyncのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/Builders.common.kt#L79 ↩︎

  5. AbstractCoroutine.start: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt#L133 ↩︎

  6. CoroutineStart.invoke: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/CoroutineStart.kt#L356 ↩︎

  7. 内部実装から理解するKotlin Coroutines:suspend関数・Continuation編: https://zenn.dev/kaseken/articles/a50fd3f5e6e2ba ↩︎

  8. Continuation.context: https://github.com/JetBrains/kotlin/blob/6ef387e3e9ee07d24821c276ddf11a984a38c5d8/libraries/stdlib/src/kotlin/coroutines/Continuation.kt#L20 ↩︎

  9. intercepted: https://github.com/JetBrains/kotlin/blob/15b95af041cb7068f1a721278f7a5c0057fbfa4e/libraries/stdlib/jvm/src/kotlin/coroutines/jvm/internal/ContinuationImpl.kt#L111 ↩︎

  10. interceptContinuation: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt#L240 ↩︎

  11. DispatchedContinuation: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt#L12 ↩︎

  12. Coroutine.resume: https://github.com/JetBrains/kotlin/blob/6ef387e3e9ee07d24821c276ddf11a984a38c5d8/libraries/stdlib/src/kotlin/coroutines/Continuation.kt#L44 ↩︎

  13. DispatchedContinuation.resumeWith: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt#L188 ↩︎

  14. CoroutineDispatcher.safeDispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt#L252 ↩︎

  15. CoroutineDispatcher.dispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt#L216 ↩︎

  16. Dispatchers.Default: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html ↩︎

  17. Dispatchers.Defaultのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/Dispatchers.kt#L16 ↩︎

  18. DefaultSchedulerのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt#L9 ↩︎

  19. SchedulerCoroutineDispatcherのソースコード: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt#L114 ↩︎

  20. CoroutineScheduler.dispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt#L394 ↩︎

  21. CoroutineScheduler.tryTerminateWorker: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt#L867 ↩︎

  22. LimitedDispatcher.dispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt#L45 ↩︎

  23. UnlimitedIoScheduler.dispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt#L43 ↩︎

  24. SchedulerCoroutineDispatcher.dispatchWithContext: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt#L129 ↩︎

  25. SchedulerCoroutineDispatcher.dispatch: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt#L393 ↩︎

  26. SchedulerCoroutineDispatcher.signalBlockingWork: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt#L430 ↩︎

  27. SchedulerCoroutineDispatcher.tryCreateWorker: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt#L443 ↩︎

  28. Dispatchers.Main: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/Dispatchers.kt#L19 ↩︎

  29. MainDispatcherLoader: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt#L13 ↩︎

  30. AndroidDispatcherFactory: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt#L48 ↩︎

  31. Looper: https://developer.android.com/reference/android/os/Looper ↩︎

  32. Handler: https://developer.android.com/reference/android/os/Handler ↩︎

  33. HandlerContext: https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt#L110 ↩︎

  34. Unconfined: https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#unconfined-vs-confined-dispatcher ↩︎

Discussion