🧊

内部実装から理解するKotlin Coroutines Flow ー Flow Builder・emit・collectの動作原理

に公開

Kotlin Coroutines Flowは、AndroidアプリあるいはサーバーサイドKotlinの開発において、今や広く利用されている。その一方で、Flowがどのような原理で動いているのかという内部の仕組みを理解している人は多くはないと思われる。

この記事では、ソースコードからFlowの内部実装を解読していく。
本記事のメインターゲットは、「Flowの基本的な仕様は知っており、開発に使っているが、ブラックボックスのまま使っている」という中級者の方々である。読んでくださった方々が、Flowの裏側の動きをイメージできるようになり、Flowを使ったコードの実装・デバッグ・レビューをより効率的に、かつ自信を持って行えるようになることを目指している。

なお、Kotlin Coroutinesの内部実装に関しては、Kotlin Fest 2025での発表で解説しているため、ぜひそちらもご参照いただきたい。

https://youtu.be/oIaL8X8q2Gk

Flowのサンプルコードを見る

Kotlinの公式ドキュメントを見ると、Flowのサンプルコードとしてはじめに登場するのが、以下のようなコード[2]である。

fun simple(): Flow<Int> = flow { // flow builder
    println("Flow started")
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    println("Calling collect...")
    simple().collect { value -> println(value) } 
}

▶️ Playgroundで実行

このコードを実行すると、以下のような出力となり、100msごとに値が流されること、ならびに実行スレッドがブロックされないことが分かる。
また、「Calling collect...」の後に「Flow started」が出力されていることから、collectが呼ばれた後に、flow内の処理が開始することも分かる。このように、collectをはじめとするTerminal Operatorが呼ばれるまでは処理が開始しないFlowのことをCold Flowと呼ぶ。

Calling collect...
Flow started
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

上述した仕様は、多くのKotlinエンジニアが知っているであろう基本中の基本である。
一方で、なぜこのような挙動が実現されるのかという裏側の仕組みを説明できるだろうか?
ここからは、flowemit、ならびにcollectという3つの主要な関数の内部実装を解読することで、その仕組みを明らかにしていく。

Flow Builder (flow関数) の内部実装

Flowを作成するための関数は、Flow Builderと呼ばれる。
先ほど示したサンプルコードに含まれるflow {}も、Flow Builderの一種である。

flow関数の実装を以下に示す。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

GitHubで見る

flow関数自体は、引数として受け取ったblockを使用してSafeFlowクラスのインスタンスを作成して返す、単純な関数である (SafeFlowに関しては後ほど解説する)。
ここで着目したいのは、@BuilderInference block: suspend FlowCollector<T>.() -> Unitというblock引数のシグネチャである。

suspend FlowCollector<T>.() -> Unitの部分:

Kotlinには、レシーバ付き関数型 (Function Type with Receiver) [3]と呼ばれる関数型が存在する。Receiver.(T1) -> ReturnValueのように表される。このラムダ関数内では、レシーバとして渡されるオブジェクトがthisとして使用される。以下に一例を示す。

data class User(val name: String)

// Userをレシーバとする、レシーバ付き関数型。Userオブジェクトが`this`となる。
val greet: User.(Int) -> String = { times -> "${this.name}, ${"Hello!".repeat(times)}" }

fun main() {
    println(User("Kotlin").greet(3)) // Kotlin, Hello!Hello!Hello!
}

▶️ Playgroundで実行

flow関数の説明に戻ると、blockFlowCollector<T>をレシーバとするsuspend FlowCollector<T>.() -> Unitという型のラムダ関数である。blocksuspend関数であるため、内部でemitdelayのようなsuspend関数を呼び出すことができる。

@BuilderInferenceの部分:

続いて、@BuilderInferenceの部分について説明する。これは、ビルダー型推論[4]と呼ばれるコンパイラの機能を有効化するためのアノテーションである。

ビルダー関数とは、レシーバ付き関数型のラムダ関数を受け取り、オブジェクトを組み立てて返す関数である。
以下に一例を示す。buildBoxBox<T>を生成するためのビルダー関数である。ラムダ関数としてBox<T>.() -> Unitを受け取っている。ここで、ビルダー型推論とは、ビルダー関数の型定義 (= Tの型) を、ラムダ関数内の関数呼び出しから推論することである。以下の例では、ラムダ関数内のadd("hello")からTがStringとして型推論される。

import kotlin.experimental.ExperimentalTypeInference

class Box<T> {
    val items = mutableListOf<T>()
    fun add(item: T) { items.add(item) }
}

// NOTE: Kotlin 2.0未満では @BuilderInference がないと T を推論できなかった。
@OptIn(ExperimentalTypeInference::class)
fun <T> buildBox(@BuilderInference block: Box<T>.() -> Unit): Box<T> = Box<T>().apply(block)

fun main() {
    val stringBox = buildBox {
        add("hello") // ラムダ関数内の add("hello") から、Box<String>と型推論
        add("world")
    }
    println(stringBox.items) // [hello, world]
    val intBox = buildBox {
        add(1) // ラムダ関数内の add(1) から、Box<Int>と型推論
        add(2)
    }
    println(intBox.items) // [1, 2]
}

▶️ Playgroundで実行

flow関数の解説に戻ると、ビルダー型推論により、flow {}内のemitに渡された値の型によって、生成されるFlow<T>のTを型推論することが可能となる。

  val f = flow {
      emit(42) // ここから T = Int と型推論 → f: Flow<Int>
  } 

ここまでの話をまとめると、flow関数とはFlow<T>のビルダー関数であり、suspend FlowCollector<T>.() -> Unitというレシーバ付き関数型を利用してFlow<T>の組み立てを行う
ここで登場するFlowCollectorとは何かを次に見ていく。

FlowCollectorの内部実装

FlowCollectorの実装を以下に示す。

https://github.com/Kotlin/kotlinx.coroutines/blob/5f8900478a8e20c073145b1608fbc71fe3d7378b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt#L25-L32

FlowCollectorは、関数型インタフェース (Functional Interface) あるいはSingle Abstract Method (SAM) インタフェース[5]と呼ばれるものの一種である。
関数型インタフェースは、抽象メソッドを一つだけ含むインタフェースで、fun interfaceを使って宣言される。

関数型インタフェースの特徴として、SAM変換と呼ばれる仕組みによって、ラムダ関数を使って簡潔に定義できる。
以下のように、FlowCollector<T>のインスタンスを渡す際に、ラムダ関数を渡すだけで、それがemit関数の実装とみなされ、FlowCollector<T>を簡潔に初期化できる。

fun <T>f(collector: FlowCollector<T>) {}

fun main() {
    // 通常の定義方法
    f(object: FlowCollector<Int> {
        override suspend fun emit(value: Int) {
            println(value)
        }
    })
    // SAM変換による定義方法
    f { value: Int -> println(value) } // ラムダ関数が emit の実装とみなされる
}

このFlowCollectorが何をするものかを知るには、Flowの実装を見る必要がある。
Flowは、collect関数のみを持つインタフェースである。

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

GitHubで見る

collect関数は、先述したFlowCollectorを引数として受け取る。
すなわち、collectに渡すラムダ関数を、以下のようにFlowCollectorを明示的に定義して初期化しても同義となる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
               
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    // simple().collect { value -> println(value) } と同義
    simple().collect(object: FlowCollector<Int> {
        override suspend fun emit(value: Int) {
            println(value)
        }
    })
}

▶️ Playgroundで実行

ここまで見ると、ある推測が浮かぶのではないだろうか。
すなわち、Flow Builder (flow関数) 内で呼ばれるemitは、collectに渡したラムダ関数 (= FlowCollectoremit) を呼んでいるのではないかという推測である。
この推測を確かめるために、後回しにしていたSafeFlowの実装を見ていく。


Flow BuilderからのFlowCollectorのemitの呼び出し

flow関数の内部実装への再訪

再掲すると、flow関数は引数として受け取ったblockを使用して、SafeFlowクラスのインスタンスを返す。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

GitHubで見る

SafeFlowの実装を見ると、collectSafely関数の実装がある。
collectSafelyでは、FlowCollectorを受け取って、block (FlowCollectorをレシーバとするラムダ関数) が呼び出される。

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

GitHubで見る

SafeFlow<T>AbstractFlow<T>を継承する。AbstractFlowのソースコードを以下に示す。

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

GitHubで見る

AbstractFlowcollect内で、collectSafelyが呼ばれる。
すなわち、SafeFlowcollectが呼ばれると、collectSafelyが呼ばれ、collectに渡されたFlowCollectorをレシーバとして、flow関数に渡されたblockが実行される

先述したサンプルコードを用いて説明すると、まずFlow Builderであるflow関数により、SafeFlowが作られる。このSafeFlowは、flowに渡されたラムダ関数 (block) を保持する。


SafeFlowが作成されるまで

続いて、collectが呼ばれると、collectに渡されたFlowCollectorをレシーバとして、flowに渡されたラムダ関数 (block) が実行される。


collectが呼ばれた後

最後に、flow {}内でemitが呼ばれると、FlowCollectoremit (SAM変換されている場合にはcollectに渡したラムダ関数) が実行される。これは、以前に推測した通りの挙動となっている。


emitが呼ばれた後

以上で、Flow Builder・emitcollectの関係性と仕組みが明確になったのではないだろうか。
「Cold Flowはcollectが呼ばれるまで実行されない」という仕様も、この内部実装から説明することが可能である。それは、AbstractFlowおよびSafeFlowの実装から分かる通り、collectが呼ばれた際に、flowに渡されたラムダ関数 (block) が実行されるためである。

まとめ

本記事では、Kotlin Coroutines Flowの中心的な関数であるFlow Builder (flow {})・emit・collectの内部実装を明らかにした。これにより、基本的なCold Flowの仕組みを明確にイメージできるようになったのではないだろうか?
ただ、Flowには他にも様々な要素がある。以下については、今後それぞれ別の記事で解説していく予定である。

  • mapをはじめとする、Intermediate Flow Operatorの動作原理
  • Hot Flowの動作原理
  • コンテキスト切り替え (flowOn) の動作原理
  • BufferingおよびConflationの動作原理
  • Flowのキャンセル機構
  • Flowのエラーハンドリング機構

[追記] mapfilterの仕組みに関する記事を公開

https://zenn.dev/kaseken/articles/996ac7395900ec

脚注
  1. Kotlin/kotlinx.coroutines: https://github.com/Kotlin/kotlinx.coroutines ↩︎

  2. Flowのサンプルコード: https://kotlinlang.org/docs/flow.html#flows ↩︎

  3. Function literals with receiver: https://kotlinlang.org/docs/lambdas.html#function-literals-with-receiver ↩︎

  4. Using builders with builder type inference: https://kotlinlang.org/docs/using-builders-with-builder-inference.html ↩︎

  5. Functional (SAM) interfaces: https://kotlinlang.org/docs/fun-interfaces.html ↩︎

Discussion