😽

【Kotlin】MutableStateFlowの更新を安心して見届けたい

2023/08/17に公開

MutableStateFlowの更新処理について

MutableStateFlowには以下に挙げるように様々な更新手段が存在します。

  • valueのセッタ
    • mutableStateFlow.value = 1
  • emit関数
    • mutableStateFlow.emit(1)
  • update関数
    • mutableStateFlow.update { 1 }
  • updateAndGet関数
    • val updated = mutableStateFlow.updateAndGet { 1 }
  • etc...

公式ドキュメントによると、以下のようにStateFlowにおける 全ての関数 はスレッドセーフであると記されています。(ver. 1.7.1, 2023/08/17アクセス)

Concurrency

All methods of state flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

この記述における 全ての関数 はどこまでの関数を対象にしているのでしょうか?
"valueのセッタ""MutableSharedFlowインタフェースに定義されているemitメソッド" などもスレッドセーフな実装になっているのでしょうか?

また、MutableStateFlowがスレッドセーフになっていることで、どう嬉しいのでしょうか?

スレッドセーフとは?

そもそもスレッドセーフとはどのような状態でしょうか?
スレッドセーフとは "マルチスレッドで処理をしても不具合が生じない" ということを意味します。

具体的に見てみます。
スレッドセーフではない処理として、以下のプログラムを見てみます。

var num = 0
val repeatNum = 1000000
    
runBlocking(Dispatchers.Default) {
    launch {
        repeat(repeatNum) {
            num = num + 1
        }
    }
    launch {
        repeat(repeatNum) {
            num = num + 1
        }
    }
}

println("result: $num") // result: 1172497

このプログラムでは、2つのスレッドがそれぞれ変数numを1000000回インクリメントしています。
最終的な結果は 1000000 * 2 = 2000000 となりそうです。

しかし、本プログラムではスレッドセーフではなく、変数numへのアクセスで2つのスレッドが競合してしまっています。
その結果、出力として1172497が得られています。

一方、Mutexなどで排他制御を行い、スレッドセーフな実装になっていると、最終的な出力は2000000が得られます。

このように、スレッドセーフなプログラムでは、 "シングルスレッドでは正常に動いていたはずが、マルチスレッドで動作させるとたまにバグが生じる" という非常に厄介な問題を回避してくれます。

スレッドによる競合の裏側
num = num + 1

このKotlinのコードをコンパイルすると以下のKotlin Bytecodeが得られます。

ILOAD 0   # numの値を読み込み、スタックにpush
ICONST_1  # 定数1をスタックにpush
IADD      # スタックに積まれたnumと1をpopして加算処理を行い、結果をスタックにpush
ISTORE 0  # スタックに積まれた結果をpopして、numに書き込み

Kotlin上では "値を加算する" という1種類の命令です。
しかし、実際には上記で示したように "値のロード", "計算", "結果の書き込み" という3種類の命令が実行されています。
そのため、2つのスレッドの実行タイミングによっては、以下のような実行順序になり得ます。

Thread1: 値のロード (num = 0)
Thread2: 値のロード (num = 0)
Thread1: 計算 (0 + 1)
Thread1: 結果の書き込み (num = 1)
Thread2: 計算 (0 + 1)
Thread2: 結果の書き込み (num = 1)

こうして "2つのスレッドがそれぞれインクリメントしたはずなのに結果的には1しか増えていない" という状況が生まれてしまいます。

このような状況を避けるために、 "実行中のスレッドがある時は、その処理が終わるまで待つ" というような排他制御が必要になります。

複数のスレッドから MutableStateFlow を更新してみる

スレッドセーフに関して理解できたところで、先ほどのコードを MutableStateFlow で実行してみます。

MutableStateFlow を更新するための関数は複数ありますが、まずは value のセッタを使って更新してみます。
冒頭で "MutableStateFlow はスレッドセーフに更新ができる" と述べましたが、先ほど得られなかった2000000が得られるのでしょうか?

val mutableStateFlow = MutableStateFlow(0)
val repeatNum = 1000000

runBlocking(Dispatchers.Default) {
    launch {
        repeat(repeatNum) {
	    mutableStateFlow.value = mutableStateFlow.value + 1 // valueのセッタの場合
	    // mutableStateFlow.emit(mutableStateFlow.value + 1) // emit関数の場合
	    // mutableStateFlow.update { it + 1 } // update関数の場合
        }
    }
    launch {
        repeat(repeatNum) {
	    mutableStateFlow.value = mutableStateFlow.value + 1 // valueのセッタの場合
	    // mutableStateFlow.emit(mutableStateFlow.value + 1) // emit関数の場合
	    // mutableStateFlow.update { it + 1 } // update関数の場合
        }
    }
}

println("result: ${mutableStateFlow.value}") // result: 1326125

今回もStateFlowへのアクセスで2つのスレッドが競合してしまい、期待される結果(2000000)が得られませんでした。

他の更新関数ではどうでしょうか?
冒頭で挙げた関数ごとの結果を以下に示します。

  • valueのセッタ
    • 1326125
  • emit関数
    • 1198187
  • update関数
    • 2000000
  • updateAndGet関数
    • 2000000

このように、valueのセッタとemit関数はスレッドセーフに実行できていませんでした。

それでは公式ページではなにをもって "すべての関数がスレッドセーフである" と述べていたのでしょうか?

各更新処理の責務

実装を読んでいったところ、"各関数の責務内でスレッドセーフである" という解釈をするのが良さそうでした。
(今回は実装詳細は紹介しませんが、排他制御によるロック時間を短くするための工夫がいくつかあって参考になるので、気になる方は是非見てみてください。)

以下に今回検証した関数の責務と、実際にどこまでスレッドセーフな処理になっているかを解説します。

valueのセッタ/emit関数

"valueのセッタ""emit関数" のシグニチャはどちらも (T) -> Unit となっており、"引数で与えられた値をvalueに設定する" という責務を担っています。
つまり、この二つの関数が保証しているスレッドセーフの責務に関しても、"引数で与えられた値をvalueに反映させるところまで" であると言えます。
故に、複数のスレッドが同時にvalueを更新しようとしたとしても、更新処理は逐次処理的に実行されており、不正な状態にはならないような実装になっています。

今回のサンプルコードでは、引数に mutableStateFlow.value + 1 を与えていました。
"valueのセッタ""emit関数"mutableStateFlow.value + 1 を計算した結果のvalueへの反映はスレッドセーフになっており、逐次処理的に処理されていきます。
しかし、mutableStateFlow.value + 1 の計算自体はスレッドセーフになっておらず、二つのスレッドで同時に処理してしまっていました。

このように、代入する値の算出 (mutableStateFlow.value + 1) が、MutableStateFlow自身に依存する場合は、別途排他制御を行う必要があります。

update関数

"update関数" のシグニチャは ((T) -> T) -> Unit となっており、"引数で与えられたラムダ式を評価し、得られた値をvalueに設定する" ところまでが責務です。
つまり、引数で渡している { it + 1 } の計算からvalueへの反映まで をスレッドセーフで実行してくれます。

今回挙げた単純な計算以外にも、以下のようにMutableStateFlow自身に依存するロジックがある場合はupdate関数を使うと、代入する値の計算からvalueへの反映までスレッドセーフに行えます。

class StateFlowHolder {
    private val mutableStateFlow: MutableStateFlow<List<Int>?> = MutableStateFlow(null)
    
    // mutableStateFlowが保持しているリストに、引数のリストを結合
    // スレッドセーフな実装
    fun threadSafeAppendList(value: List<Int>) {
	mutableStateFlow.update {
            when (it) {
                null -> value
                else -> it + value
            }
        }
    }
    
    // mutableStateFlowが保持しているリストに、引数のリストを結合
    // スレッドセーフでない実装
    fun notThreadSafeAppendList(value: List<Int>) {
        mutableStateFlow.value = if (it == null) {
	    mutableStateFlow.value
	} else {
	    mutableStateFlow.value + value
	}
    }
}

まとめ

MutableStateFlowにおける排他制御に関してまとめました。
公式ドキュメントには "全ての関数はスレッドセーフである" と記されていますが、これは "各関数の責務内でスレッドセーフ" であることを意味してます。

今回挙げた関数の責務は以下の通りです。

  • valueのセッタ/emit関数
    • 引数で与えられた "値" をvalueに反映させる
  • update関数
    • 引数で与えられた "式" を計算し、valueに反映させる

代入する値を算出するための式をスレッドセーフで行う必要がある場合はupdate関数を使うと、非同期処理に起因するバグを防止できるため、安心してMutableStateFlowを更新できます☺️

Discussion