Kotlin coroutines Flowのオペレータを作る

公開:2020/12/24
更新:2020/12/24
4 min読了の目安(約4300字TECH技術記事 1

はじめに

この記事はKotlin Advent Calendar 2020の19日目の記事になりました。

Kotlin corutinesのFlowはリリース当初はオペレータの種類も少なく、Rxで行っているような複雑な処理を実行したい場合には自身でオペレータの代わりとなる実装をする必要がありました。
現在では、オペレータの種類も増えてきたため自身でオペレータを実装したいケースは少なくなってきましたが、それでも既存のオペレータの組み合わせだけでは難しいor煩雑になってしまうケースはどうしても出てきてしまいます。

本記事では自作オペレータを作るいくつかの方法を紹介していきたいと思います。

簡単なオペレータ

単純な変換処理等を行うようなオペレータは transform を使うことで実装できます。
以下は奇数であればスキップし、偶数であれば2回値を流すオペレータの実装です。

fun Flow<Int>.skipOddAndDuplicateEven() = transform { value ->
    if (value % 2 == 0) {
        emit(value)
	emit(value)
    }
}

val numberFlow = flowOf(0, 1, 2, 3)
numberFlow
    .skipOddAndDuplicateEven()
    .collect {
        println(it)
	// 0
	// 0
	// 2
	// 2
    }

少し複雑なオペレータ

それでは、もう少し複雑なオペレータを考えてみましょう。
今回はFlowから流れてくるパラメータを2つのAPIに投げて先に結果が返ってきた方のみを結果として流すFlowオペレータを実装してみます。

オペレータの実装は先ほどと同じくtransformを起点に行っています。
transformの中で使われているselect関数は引数として渡されたブロックの中で最初に実行されたSelectクロージャのみを結果として扱うことができる関数です。
ここではasyncで各APIを呼び出し、DeferredのSelectクロージャであるonAwaitを指定することで、最初に結果が返ってきたasyncをこのオペレータの結果としてemitで流しています。

SelectクロージャはDeferredだけでなくChannelJobなどに対してもそれぞれ用意されているので、詳しくはドキュメントを参照してください。

また、selectではタイムアウトを指定することも可能なので、今回はタイムアウトも合わせて指定しています。

/**
 * paramがfoo以外だと遅いAPI
 * paramがfoobarのときはさらに遅い
 */
suspend fun fooApi(param: String): String {
    if (param != "foo") delay(500)
    if (param == "foobar") delay(1000)
    return "foo api result for $param"
}

/**
 * paramがbar以外だと遅いAPI
 * paramがfoobarのときはさらに遅い
 */
suspend fun barApi(param: String): String {
    if (param != "bar") delay(500)
    if (param == "foobar") delay(1000)
    return "bar api result for $param"
}

/**
 * fooApiとbarApiの2つに同時にリクエストして先に結果が返ってきたほうを流すオペレータ
 * ただし、リクエストが1000ms以内に返ってこないとタイムアウトする
 */
fun Flow<String>.searchFromTwoApi() = transform { param ->
    coroutineScope {
        val result = select<String> {
            async { fooApi(param) }.onAwait { it }
            async { barApi(param) }.onAwait { it }
            onTimeout(1000) { "Timeout!" }
        }
        emit(result)
    }
}

val paramFlow = flowOf("foo", "bar", "foobar")
paramFlow
    .searchFromTwoApi()
    .collect {
        println(it)
        // foo api result for foo
        // bar api result for bar
        // Timeout!
    }

さらに複雑なオペレータ

これまでのオペレータはtansformを起点として実装を行ってきました。
transformを使ったオペレータでは、単純に流れてきた値だけを見て処理を行うことはできましたが、「前回までに流れて来た値を使いたい」といったケースや「流れてきた値に応じてなにかしらのフラグを設定して以降の処理に使いたい」といったケースには対応できません。

こういったオペレータを実装したい場合はflow関数で新しくFlowを作り、内部で元のFlowproduceIn関数を使ってChannelに変換、値の取得・監視を行いながら処理した結果を下流に流します。

例として、以下のような動作をするオペレータを実装してみます

  • あるInt値が流れてきたときにその値を流す
  • 値を流したあと、上流から流れてきた値の秒数だけ次の値を待ち受ける
  • 待受時間内に次の値が流れてきたときには待受していた値との合計値を流す
  • 待受時間内に次の値が流れてこなかったときはその値だけを流す
  • 流れてきた値が負のときは待受時間0とする
fun Flow<Int>.waitAndSum() = flow {
    coroutineScope {
        val values = produceIn(this)

        var lastValue: Int? = null
        while (isActive) {
            select<Unit> {
                // onReceiveOrClosedがInternalCoroutinesApiでなくなったら
                // onReceiveをonReceiveOrClosedに置き換えてcatchを削除する
                values.onReceive {
                    emit(it + (lastValue ?: 0))
                    lastValue = it
                }
                lastValue?.let {
                    onTimeout(it.coerceAtLeast(0).toLong() * 1000) {
                        lastValue = null
                    }
                }
            }
        }
    }
}.catch {
    // channelがクローズしていたときのエラーは無視する
    if (it !is ClosedReceiveChannelException) {
        throw it
    }
}

val valueFlow = flow<Int> {
    emit(5) // 5
    delay(1000)
    emit(3) // 3 + 5 = 8
    delay(1000)
    emit(4) // 4 + 3 = 7
    delay(5000)
    emit(6) // 6
    delay(2000)
    emit(-1) // -1 + 6 = 5
    delay(10)
    emit(9) // 9
}

valueFlow
    .waitAndSum()
    .collect {
        println("$it")
        // 5
        // 8
        // 7
        // 6
        // 5
        // 9
    }

おわり

Kotlin coroutines Flowでオペレータを自作したい場合にいくつかの方法があることを示しました。
特に、複雑なオペレーターを作りたい場合には、FlowChannel化やselect関数は使わなくても可能ではありますが、これらを有効に使うことで簡潔に書くことができるかもしれません。