Kotlin coroutines Flowのオペレータを作る
はじめに
この記事はKotlin Advent Calendar 2020の19日目の記事になりました。
Kotlin corutinesのFlowはリリース当初はオペレータの種類も少なく、Rxで行っているような複雑な処理を実行したい場合には自身でオペレータの代わりとなる実装をする必要がありました。
現在では、オペレータの種類も増えてきたため自身でオペレータを実装したいケースは少なくなってきましたが、それでも既存のオペレータの組み合わせだけでは難しいor煩雑になってしまうケースはどうしても出てきてしまいます。
本記事では自作オペレータを作るいくつかの方法を紹介していきたいと思います。
簡単なオペレータ
単純な変換処理等を行うようなオペレータは transform
を使うことで実装できます。
以下は奇数であればスキップし、偶数であれば2回値を流すオペレータの実装です。
fun Flow<Int>.skipOddAndDuplicateEven() = transform { value ->
if (value % 2 == 0) {
emit(value)
emit(value)
}
}
val numberFlow = flowOf(0, 1, 2, 3)
numberFlow
.skipOddAndDuplicateEven()
.collect {
println(it)
// 0
// 0
// 2
// 2
}
少し複雑なオペレータ
それでは、もう少し複雑なオペレータを考えてみましょう。
今回はFlowから流れてくるパラメータを2つのAPIに投げて先に結果が返ってきた方のみを結果として流すFlowオペレータを実装してみます。
オペレータの実装は先ほどと同じくtransform
を起点に行っています。
transform
の中で使われているselect
関数は引数として渡されたブロックの中で最初に実行されたSelectクロージャのみを結果として扱うことができる関数です。
ここではasync
で各APIを呼び出し、Deferred
のSelectクロージャであるonAwait
を指定することで、最初に結果が返ってきたasync
をこのオペレータの結果としてemit
で流しています。
SelectクロージャはDeferred
だけでなくChannel
やJob
などに対してもそれぞれ用意されているので、詳しくはドキュメントを参照してください。
また、select
ではタイムアウトを指定することも可能なので、今回はタイムアウトも合わせて指定しています。
/**
* paramがfoo以外だと遅いAPI
* paramがfoobarのときはさらに遅い
*/
suspend fun fooApi(param: String): String {
if (param != "foo") delay(500)
if (param == "foobar") delay(1000)
return "foo api result for $param"
}
/**
* paramがbar以外だと遅いAPI
* paramがfoobarのときはさらに遅い
*/
suspend fun barApi(param: String): String {
if (param != "bar") delay(500)
if (param == "foobar") delay(1000)
return "bar api result for $param"
}
/**
* fooApiとbarApiの2つに同時にリクエストして先に結果が返ってきたほうを流すオペレータ
* ただし、リクエストが1000ms以内に返ってこないとタイムアウトする
*/
fun Flow<String>.searchFromTwoApi() = transform { param ->
coroutineScope {
val result = select<String> {
async { fooApi(param) }.onAwait { it }
async { barApi(param) }.onAwait { it }
onTimeout(1000) { "Timeout!" }
}
emit(result)
}
}
val paramFlow = flowOf("foo", "bar", "foobar")
paramFlow
.searchFromTwoApi()
.collect {
println(it)
// foo api result for foo
// bar api result for bar
// Timeout!
}
さらに複雑なオペレータ
これまでのオペレータはtansform
を起点として実装を行ってきました。
transform
を使ったオペレータでは、単純に流れてきた値だけを見て処理を行うことはできましたが、「前回までに流れて来た値を使いたい」といったケースや「流れてきた値に応じてなにかしらのフラグを設定して以降の処理に使いたい」といったケースには対応できません。
こういったオペレータを実装したい場合はflow
関数で新しくFlow
を作り、内部で元のFlow
をproduceIn
関数を使ってChannel
に変換、値の取得・監視を行いながら処理した結果を下流に流します。
例として、以下のような動作をするオペレータを実装してみます
- ある
Int
値が流れてきたときにその値を流す - 値を流したあと、上流から流れてきた値の秒数だけ次の値を待ち受ける
- 待受時間内に次の値が流れてきたときには待受していた値との合計値を流す
- 待受時間内に次の値が流れてこなかったときはその値だけを流す
- 流れてきた値が負のときは待受時間0とする
fun Flow<Int>.waitAndSum() = flow {
coroutineScope {
val values = produceIn(this)
var lastValue: Int? = null
while (isActive) {
select<Unit> {
// onReceiveOrClosedがInternalCoroutinesApiでなくなったら
// onReceiveをonReceiveOrClosedに置き換えてcatchを削除する
values.onReceive {
emit(it + (lastValue ?: 0))
lastValue = it
}
lastValue?.let {
onTimeout(it.coerceAtLeast(0).toLong() * 1000) {
lastValue = null
}
}
}
}
}
}.catch {
// channelがクローズしていたときのエラーは無視する
if (it !is ClosedReceiveChannelException) {
throw it
}
}
val valueFlow = flow<Int> {
emit(5) // 5
delay(1000)
emit(3) // 3 + 5 = 8
delay(1000)
emit(4) // 4 + 3 = 7
delay(5000)
emit(6) // 6
delay(2000)
emit(-1) // -1 + 6 = 5
delay(10)
emit(9) // 9
}
valueFlow
.waitAndSum()
.collect {
println("$it")
// 5
// 8
// 7
// 6
// 5
// 9
}
おわり
Kotlin coroutines Flowでオペレータを自作したい場合にいくつかの方法があることを示しました。
特に、複雑なオペレーターを作りたい場合には、Flow
のChannel
化やselect
関数は使わなくても可能ではありますが、これらを有効に使うことで簡潔に書くことができるかもしれません。
Discussion
冷静に考えたら複雑なオペレータのケースでも、
select
使う必要がないパターンだとflow
ブロック内部で上流の値をcollect
して処理するだけでよかったです。