🚰

Ktor Pipelineをナマで使う

2023/01/27に公開

Ktor v2.2.2 の実装に基づいて書いています。

Ktorの Pipeline およびその関連クラスは、WebフレームワークとしてのKtor本体とは無関係に、単独で使うことができます。単独で使ったときに何ができるのかを知っておくと、Ktor本体におけるPipelineの仕組みが非常に理解しやすくなります。この記事では、Ktor本体には基本的に触れずに、 Pipeline クラスをライブラリ的に使いながら解説します。

基本的な使い方

Pipeline オブジェクトの構築

Pipelineは、順序をもつ0個以上のフェーズ( PipelinePhase )と、そのそれぞれに0個以上登録されたやはり順序をもつインターセプタからなる、基本的には静的なオブジェクトです。フェーズやインターセプタの追加はいつでも可能で、つまり動的に変化させることはできるのですが、後述するパイプライン実行が処理の度に新しい値を受け取りそれを書き換えていくのに対して、 Pipeline オブジェクト自体を初期設定後に頻繁に変化させる使い方はあまりしない、程度の意味合いで静的と表現しています。

Pipeline を作成してフェーズとインターセプタを登録する基本的なコードは次のようになります。 Pipeline クラスについている2つの型パラメータ <StringBuilder, MutableMap<String, Any>> については次項で説明します。

// import io.ktor.util.pipeline.Pipeline など
// 以下、import文はすべて省略します。

suspend fun main() {
    val phaseA = PipelinePhase("A")
    val phaseB = PipelinePhase("B")
    val phaseC = PipelinePhase("C")

    val stringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>(phaseA, phaseB, phaseC).apply {
        intercept(phaseA) { it.append("->phaseA") }
        intercept(phaseB) { it.append("->phaseB") }
    }
}

パイプライン実行と TSubjectTContext

作成したPipelineに値を流し込むと、その値はPipeline内の各インターセプタによって書き換えなどが行われ、流し込んだのと同じ型の値が出力されます。この処理をここではパイプライン実行と呼びます。また、パイプライン実行時には、Pipelineによって処理される値の他に、処理に必要な諸々の情報を集めたオブジェクトを一つ用いることができます。これをコンテキストと呼びます。

処理される値とコンテキスト、この2つのオブジェクトの型が、先ほど Pipeline のインスタンス化時に指定した2つです。 Pipeline クラスの定義(下記)で使われている型パラメータ名を取って、これらをそれぞれ TSubjectTContext と呼ぶことにします。定義を見ての通り、nullableでない任意の型を指定できます。

public open class Pipeline<TSubject : Any, TContext : Any>(
    vararg phases: PipelinePhase
)

Pipeline オブジェクトを作成し、そこに値を流し込んでパイプライン実行するコードは次のようになります。ここでは、入出力値の型 TSubjectStringBuilder、コンテキストの型は MutableMap<String, Any> です。

suspend fun main() {
    val phaseA = PipelinePhase("A")
    val phaseB = PipelinePhase("B")
    val phaseC = PipelinePhase("C")

    val stringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>(phaseA, phaseB, phaseC)

    stringPipeline.intercept(phaseA) { str ->
        this.context["isEmpty"] = str.isEmpty()
        str.append("->phaseA")
    }
    stringPipeline.intercept(phaseB) { str ->
        // この例では意味を感じられないが、
	// 先行するインターセプタがコンテキストに設定した値を利用できる
        val isEmpty = (this.context["isEmpty"] as? Boolean) ?: false
        if (!isEmpty) {
            str.append("->phaseB")
        }
    }

    val context = mutableMapOf<String, Any>()
    val result = stringPipeline.execute(context, StringBuilder("init"))
    
    println(context)
    // {isEmpty=false}
    println(result)
    // init->phaseA->phaseB
}

用途がわかりやすいよう、下記のように型パラメータを指定した Pipeline 型に名前をつけるとよいかもしれません。この記事でもこれ以降このクラスを使います。

StringPipeline.kt
class StringPipeline : Pipeline<StringBuilder, MutableMap<String, Any>>(Initialize, Execute, Send) {
    companion object {
        val Initialize = PipelinePhase("Initialize")
        val Execute = PipelinePhase("Execute")
        val Send = PipelinePhase("Send")
    }
}

// typealiasという手もあるけれど、PipelinePhaseを定義する場所に悩みそう
// typealias StringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>

フェーズ

フェーズは、パイプラインを時間軸で区切って名前をつけたものです。パイプラインにフェーズを並べ、必要に応じてフェーズごとにインターセプタ(後述)を登録する、そしてパイプラインを実行すると、フェーズ順にインターセプタが実行される、という仕組みです。

ただし、パイプライン実行の段階になるとフェーズという概念は実はなくなり、ただインターセプタが順番に実行されるだけです。なので、フェーズはインターセプタを登録するとき(そして後述するパイプラインのマージのとき)にインターセプタの順序を適切に取り扱うためだけに存在していると言ってもよいです。

フェーズを表す PipelinePhase は、文字列をラップしただけのシンプルなクラスです。このクラス自体について説明することはあまりありませんが、 equals がオーバーライドされていない点には要注意です[1]。単一のオブジェクトをコンパニオンオブジェクトなどに格納して使い回す必要があります。

val phaseA1 = PipelinePhase("A")
val phaseA2 = PipelinePhase("A")

println("equal?: ${phaseA1 == phaseA2}")
// equal?: false

フェーズの追加

フェーズは、前述のサンプルコードのとおり、 Pipeline のコンストラクタ引数として指定することができますが、インスタンス生成後に追加することも可能です。

val pipeline = StringPipeline()

// Initializeの後に追加
val validate = PipelinePhase("Validate")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate)

// Sendの前に追加
val transform = PipelinePhase("Transform")
pipeline.insertPhaseBefore(StringPipeline.Send, transform)

// 一番うしろに追加
val finalize = PipelinePhase("Finalize")
pipeline.addPhase(finalize)

println(pipeline.items.map { it.name })
// [Initialize, Validate, Execute, Transform, Send, Finalize]

insertPhaseAfterinsertPhaseBefore には、 「同じフェーズを基準として挿入を繰り返した場合、後から挿入したほうが必ず後ろにくる」 という、気持ちはわかるけれど直感に反していると思わなくもない、微妙な仕様があるので注意が必要です。

val pipeline = StringPipeline()

val validate1 = PipelinePhase("Validate1")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate1)

// validate2が挿入される位置はInitializeの直後ではない!
val validate2 = PipelinePhase("Validate2")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate2)

println(pipeline.items.map { it.name })
// [Initialize, Validate1, Validate2, Execute, Send]

インターセプタ

インターセプタの追加

見ての通り、インターセプタの追加時にはフェーズを指定することで実行順序を定義します。追加先フェーズが同じである場合は追加した順に実行されます。

val pipeline = StringPipeline()

pipeline.intercept(StringPipeline.Execute) {
    this.context["executed"] = true
    it.append("->executed!")
}

ちなみに、フェーズの項で話したとおり、パイプライン実行時にはフェーズという概念はなくなりますので、インターセプタ関数ボディから「自分自身がどのフェーズに登録されたインターセプタであるか」を知る方法はありません。

PipelineContext

インターセプタ関数のレシーバ thisPipelineContext<TSubject, TContext> 型です。名前のとおり、このパイプライン実行で必要な情報を集めたコンテキストを表します。少しややこしいのですが、自分で型指定をした TContext 型のコンテキストは PipelineContext の一部として this.context に入っています。 PipelineContext は公開API上はシンプルなクラスです。用途はおおまかに次の3つ。

  • context フィールドに収められている TContext 型コンテキストにアクセスする
  • PipelineContextCoroutineScope でもあり、インターセプタ内でのコルーチン起動に使う
  • メソッド proceed() および proceedWith() を呼び出す

CoroutineScope としての PipelineContext

PipelineContextCoroutineScope でもあることから、インターセプタ内でコルーチンを起動することができます。

suspend fun main() {
    val pipeline = StringPipeline()

    pipeline.intercept(StringPipeline.Execute) {
        launch {
            delay(2000)
            println("launched")
        }
        delay(1000)
        println("executed")
    }

    withContext(Dispatchers.IO) {
        val result = pipeline.execute(mutableMapOf(), StringBuilder("init"))
        println(result)
    }
}

// (delay 1000 ms)
// executed
// init
// (delay 1000 ms)
// launched

ここで注意しておきたいのは、 "launched" の出力が result の出力後になっている点、パイプライン実行における並行性の構造[structured concurrency]です。

パイプラインは PipelineContext という CoroutineScope オブジェクトを作りはしますが、コルーチンを起動するわけでもなく、 PipelineContext の親となっているのは pipeline.execute を囲っている withContext によって作られた CoroutineScopeです。つまり pipeline.execute 自体は PipelineContext が起動したコルーチンの実行完了を待ってくれません。

withContextlaunch の完了を待ちますから、最後の部分を次のように変えれば期待する出力を得られはしますが、まともなコードとはとても言えません。

val result = withContext(Dispatchers.IO) {
    pipeline.execute(mutableMapOf(), StringBuilder("init"))
}
println(result)

// (delay 1000 ms)
// executed
// (delay 1000 ms)
// launched
// init

launch の実行が終わるまで execute を待機させたい場合は、インターセプタ内でジョブの完了待ち( joinawait )と次項で説明する proceed() を組み合わせることで実現可能です。

なお、新規起動したコルーチン側で TSubjectTContext にアクセス、特に書き込みを行うのは極力控えたほうがよいでしょう。パイプラインの仕組み上 TSubjectTContext もミュータブルなオブジェクトになるケースが多いので、スレッドセーフに配慮する必要が生じます。

suspend fun main() {
    val pipeline = StringPipeline()

    pipeline.intercept(StringPipeline.Execute) {
        launch {
            delay(2000)
	    // 控えたい
            it.append("->launched")
        }
        delay(1000)
        it.append("->executed")
    }

    withContext(Dispatchers.IO) {
        val result = pipeline.execute(mutableMapOf(), StringBuilder("init"))
        println(result)
    }
}

// (delay 1000 ms)
// init->executed
// (この例だとそもそもexecuteが待ってくれず ->launched は表示されない)

proceed()

この関数は、実行順に並んだインターセプタのうち、まだ実行開始していないインターセプタおよびその後続すべてを実行した上でリターンします。なので、これをある特定のインターセプタ内で呼ぶと、そのインターセプタだけは未完了のまま、先に後続のインターセプタがすべて実行される、という挙動になります。

時間のかかる処理を別コルーチンで実行しながら同時にパイプライン実行を進めることができます。下記の例は単純に同期実行すると7,000msかかるところを5,000msに短縮できています。

val pipeline = StringPipeline()

pipeline.intercept(StringPipeline.Initialize) {
    println("interceptor 1")
}
pipeline.intercept(StringPipeline.Initialize) {
    val deferred = async {
        delay(5000)
        "async result"
    }
    println("interceptor 2 before proceed()")
    proceed()
    println("interceptor 2 after proceed()")
    val result = deferred.await()
    println("interceptor 2 after await: $result")
}
pipeline.intercept(StringPipeline.Initialize) {
    delay(1000)
    println("interceptor 3")
}
pipeline.intercept(StringPipeline.Initialize) {
    delay(1000)
    println("interceptor 4")
}

pipeline.execute(mutableMapOf(), StringBuilder("init"))

// interceptor 1
// interceptor 2 before proceed()
// (delay 1000 ms)
// interceptor 3
// (delay 1000 ms)
// interceptor 4
// interceptor 2 after proceed()
// (delay 3000 ms)
// interceptor 2 after await: async result

proceedWith(subject: TSubject)

この関数は、Pipelineを流れている TSubject 値を指定したものに置き換えた上で proceed() します。とはいえ、インターセプタの最後の処理で proceed() をしても実質的な挙動は何も変わりませんから、この関数をインターセプタの最後で呼び出すのなら、やっていることは実質 TSubject 値の置き換えでしかないことになります。そして、Ktor内でもそのようにしか使われていないようです。

これまで、 TSubjectStringBuilder としていましたが、オブジェクト内部の変更だけでなくオブジェクトそのものの置換ができるのであれば、イミュータブルなクラスである String を使うのでもよいのかもしれません。

val phase = PipelinePhase("SinglePhase")
val pipeline = Pipeline<String, Unit>(phase)
pipeline.intercept(phase) {
    proceedWith("replaced")
}

val result = pipeline.execute(Unit, "init")
println(result)
// replaced

ちなみに、 PipelineContext のプロパティ subject はセッターを公開しているので、上のコードは、proceedWith 呼出をやめて次のように書けなくもありません。が、ちょっと気持ち悪い感じはします。

pipeline.intercept(phase) {
    this.subject = "replaced"
}

Pipelineのマージ

(執筆中)

脚注
  1. Ktor Serverでの使い方を見るに、これは意図的だと思う ↩︎

Discussion