Ktor Pipelineをナマで使う
Ktor v2.2.2 の実装に基づいて書いています。
Ktorの Pipeline
およびその関連クラスは、WebフレームワークとしてのKtor本体とは無関係に、単独で使うことができます。単独で使ったときに何ができるのかを知っておくと、Ktor本体におけるPipelineの仕組みが非常に理解しやすくなります。この記事では、Ktor本体には基本的に触れずに、 Pipeline
クラスをライブラリ的に使いながら解説します。
基本的な使い方
Pipeline
オブジェクトの構築
Pipelineは、順序をもつ0個以上のフェーズ( PipelinePhase
)と、そのそれぞれに0個以上登録されたやはり順序をもつインターセプタからなる、基本的には静的なオブジェクトです。フェーズやインターセプタの追加はいつでも可能で、つまり動的に変化させることはできるのですが、後述するパイプライン実行が処理の度に新しい値を受け取りそれを書き換えていくのに対して、 Pipeline
オブジェクト自体を初期設定後に頻繁に変化させる使い方はあまりしない、程度の意味合いで静的と表現しています。
Pipeline
を作成してフェーズとインターセプタを登録する基本的なコードは次のようになります。 Pipeline
クラスについている2つの型パラメータ <StringBuilder, MutableMap<String, Any>>
については次項で説明します。
// import io.ktor.util.pipeline.Pipeline など
// 以下、import文はすべて省略します。
suspend fun main() {
val phaseA = PipelinePhase("A")
val phaseB = PipelinePhase("B")
val phaseC = PipelinePhase("C")
val stringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>(phaseA, phaseB, phaseC).apply {
intercept(phaseA) { it.append("->phaseA") }
intercept(phaseB) { it.append("->phaseB") }
}
}
TSubject
・ TContext
パイプライン実行と 作成したPipelineに値を流し込むと、その値はPipeline内の各インターセプタによって書き換えなどが行われ、流し込んだのと同じ型の値が出力されます。この処理をここではパイプライン実行と呼びます。また、パイプライン実行時には、Pipelineによって処理される値の他に、処理に必要な諸々の情報を集めたオブジェクトを一つ用いることができます。これをコンテキストと呼びます。
処理される値とコンテキスト、この2つのオブジェクトの型が、先ほど Pipeline
のインスタンス化時に指定した2つです。 Pipeline
クラスの定義(下記)で使われている型パラメータ名を取って、これらをそれぞれ TSubject
・ TContext
と呼ぶことにします。定義を見ての通り、nullableでない任意の型を指定できます。
public open class Pipeline<TSubject : Any, TContext : Any>(
vararg phases: PipelinePhase
)
Pipeline
オブジェクトを作成し、そこに値を流し込んでパイプライン実行するコードは次のようになります。ここでは、入出力値の型 TSubject
が StringBuilder
、コンテキストの型は MutableMap<String, Any>
です。
suspend fun main() {
val phaseA = PipelinePhase("A")
val phaseB = PipelinePhase("B")
val phaseC = PipelinePhase("C")
val stringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>(phaseA, phaseB, phaseC)
stringPipeline.intercept(phaseA) { str ->
this.context["isEmpty"] = str.isEmpty()
str.append("->phaseA")
}
stringPipeline.intercept(phaseB) { str ->
// この例では意味を感じられないが、
// 先行するインターセプタがコンテキストに設定した値を利用できる
val isEmpty = (this.context["isEmpty"] as? Boolean) ?: false
if (!isEmpty) {
str.append("->phaseB")
}
}
val context = mutableMapOf<String, Any>()
val result = stringPipeline.execute(context, StringBuilder("init"))
println(context)
// {isEmpty=false}
println(result)
// init->phaseA->phaseB
}
用途がわかりやすいよう、下記のように型パラメータを指定した Pipeline
型に名前をつけるとよいかもしれません。この記事でもこれ以降このクラスを使います。
class StringPipeline : Pipeline<StringBuilder, MutableMap<String, Any>>(Initialize, Execute, Send) {
companion object {
val Initialize = PipelinePhase("Initialize")
val Execute = PipelinePhase("Execute")
val Send = PipelinePhase("Send")
}
}
// typealiasという手もあるけれど、PipelinePhaseを定義する場所に悩みそう
// typealias StringPipeline = Pipeline<StringBuilder, MutableMap<String, Any>>
フェーズ
フェーズは、パイプラインを時間軸で区切って名前をつけたものです。パイプラインにフェーズを並べ、必要に応じてフェーズごとにインターセプタ(後述)を登録する、そしてパイプラインを実行すると、フェーズ順にインターセプタが実行される、という仕組みです。
ただし、パイプライン実行の段階になるとフェーズという概念は実はなくなり、ただインターセプタが順番に実行されるだけです。なので、フェーズはインターセプタを登録するとき(そして後述するパイプラインのマージのとき)にインターセプタの順序を適切に取り扱うためだけに存在していると言ってもよいです。
フェーズを表す PipelinePhase
は、文字列をラップしただけのシンプルなクラスです。このクラス自体について説明することはあまりありませんが、 equals
がオーバーライドされていない点には要注意です[1]。単一のオブジェクトをコンパニオンオブジェクトなどに格納して使い回す必要があります。
val phaseA1 = PipelinePhase("A")
val phaseA2 = PipelinePhase("A")
println("equal?: ${phaseA1 == phaseA2}")
// equal?: false
フェーズの追加
フェーズは、前述のサンプルコードのとおり、 Pipeline
のコンストラクタ引数として指定することができますが、インスタンス生成後に追加することも可能です。
val pipeline = StringPipeline()
// Initializeの後に追加
val validate = PipelinePhase("Validate")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate)
// Sendの前に追加
val transform = PipelinePhase("Transform")
pipeline.insertPhaseBefore(StringPipeline.Send, transform)
// 一番うしろに追加
val finalize = PipelinePhase("Finalize")
pipeline.addPhase(finalize)
println(pipeline.items.map { it.name })
// [Initialize, Validate, Execute, Transform, Send, Finalize]
insertPhaseAfter
と insertPhaseBefore
には、 「同じフェーズを基準として挿入を繰り返した場合、後から挿入したほうが必ず後ろにくる」 という、気持ちはわかるけれど直感に反していると思わなくもない、微妙な仕様があるので注意が必要です。
val pipeline = StringPipeline()
val validate1 = PipelinePhase("Validate1")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate1)
// validate2が挿入される位置はInitializeの直後ではない!
val validate2 = PipelinePhase("Validate2")
pipeline.insertPhaseAfter(StringPipeline.Initialize, validate2)
println(pipeline.items.map { it.name })
// [Initialize, Validate1, Validate2, Execute, Send]
インターセプタ
インターセプタの追加
見ての通り、インターセプタの追加時にはフェーズを指定することで実行順序を定義します。追加先フェーズが同じである場合は追加した順に実行されます。
val pipeline = StringPipeline()
pipeline.intercept(StringPipeline.Execute) {
this.context["executed"] = true
it.append("->executed!")
}
ちなみに、フェーズの項で話したとおり、パイプライン実行時にはフェーズという概念はなくなりますので、インターセプタ関数ボディから「自分自身がどのフェーズに登録されたインターセプタであるか」を知る方法はありません。
PipelineContext
インターセプタ関数のレシーバ this
は PipelineContext<TSubject, TContext>
型です。名前のとおり、このパイプライン実行で必要な情報を集めたコンテキストを表します。少しややこしいのですが、自分で型指定をした TContext
型のコンテキストは PipelineContext
の一部として this.context
に入っています。 PipelineContext
は公開API上はシンプルなクラスです。用途はおおまかに次の3つ。
-
context
フィールドに収められているTContext
型コンテキストにアクセスする -
PipelineContext
はCoroutineScope
でもあり、インターセプタ内でのコルーチン起動に使う - メソッド
proceed()
およびproceedWith()
を呼び出す
CoroutineScope
としての PipelineContext
PipelineContext
が CoroutineScope
でもあることから、インターセプタ内でコルーチンを起動することができます。
suspend fun main() {
val pipeline = StringPipeline()
pipeline.intercept(StringPipeline.Execute) {
launch {
delay(2000)
println("launched")
}
delay(1000)
println("executed")
}
withContext(Dispatchers.IO) {
val result = pipeline.execute(mutableMapOf(), StringBuilder("init"))
println(result)
}
}
// (delay 1000 ms)
// executed
// init
// (delay 1000 ms)
// launched
ここで注意しておきたいのは、 "launched"
の出力が result
の出力後になっている点、パイプライン実行における並行性の構造[structured concurrency]です。
パイプラインは PipelineContext
という CoroutineScope
オブジェクトを作りはしますが、コルーチンを起動するわけでもなく、 PipelineContext
の親となっているのは pipeline.execute
を囲っている withContext
によって作られた CoroutineScope
です。つまり pipeline.execute
自体は PipelineContext
が起動したコルーチンの実行完了を待ってくれません。
withContext
は launch
の完了を待ちますから、最後の部分を次のように変えれば期待する出力を得られはしますが、まともなコードとはとても言えません。
val result = withContext(Dispatchers.IO) {
pipeline.execute(mutableMapOf(), StringBuilder("init"))
}
println(result)
// (delay 1000 ms)
// executed
// (delay 1000 ms)
// launched
// init
launch
の実行が終わるまで execute
を待機させたい場合は、インターセプタ内でジョブの完了待ち( join
や await
)と次項で説明する proceed()
を組み合わせることで実現可能です。
なお、新規起動したコルーチン側で TSubject
や TContext
にアクセス、特に書き込みを行うのは極力控えたほうがよいでしょう。パイプラインの仕組み上 TSubject
も TContext
もミュータブルなオブジェクトになるケースが多いので、スレッドセーフに配慮する必要が生じます。
suspend fun main() {
val pipeline = StringPipeline()
pipeline.intercept(StringPipeline.Execute) {
launch {
delay(2000)
// 控えたい
it.append("->launched")
}
delay(1000)
it.append("->executed")
}
withContext(Dispatchers.IO) {
val result = pipeline.execute(mutableMapOf(), StringBuilder("init"))
println(result)
}
}
// (delay 1000 ms)
// init->executed
// (この例だとそもそもexecuteが待ってくれず ->launched は表示されない)
proceed()
この関数は、実行順に並んだインターセプタのうち、まだ実行開始していないインターセプタおよびその後続すべてを実行した上でリターンします。なので、これをある特定のインターセプタ内で呼ぶと、そのインターセプタだけは未完了のまま、先に後続のインターセプタがすべて実行される、という挙動になります。
時間のかかる処理を別コルーチンで実行しながら同時にパイプライン実行を進めることができます。下記の例は単純に同期実行すると7,000msかかるところを5,000msに短縮できています。
val pipeline = StringPipeline()
pipeline.intercept(StringPipeline.Initialize) {
println("interceptor 1")
}
pipeline.intercept(StringPipeline.Initialize) {
val deferred = async {
delay(5000)
"async result"
}
println("interceptor 2 before proceed()")
proceed()
println("interceptor 2 after proceed()")
val result = deferred.await()
println("interceptor 2 after await: $result")
}
pipeline.intercept(StringPipeline.Initialize) {
delay(1000)
println("interceptor 3")
}
pipeline.intercept(StringPipeline.Initialize) {
delay(1000)
println("interceptor 4")
}
pipeline.execute(mutableMapOf(), StringBuilder("init"))
// interceptor 1
// interceptor 2 before proceed()
// (delay 1000 ms)
// interceptor 3
// (delay 1000 ms)
// interceptor 4
// interceptor 2 after proceed()
// (delay 3000 ms)
// interceptor 2 after await: async result
proceedWith(subject: TSubject)
この関数は、Pipelineを流れている TSubject
値を指定したものに置き換えた上で proceed()
します。とはいえ、インターセプタの最後の処理で proceed()
をしても実質的な挙動は何も変わりませんから、この関数をインターセプタの最後で呼び出すのなら、やっていることは実質 TSubject
値の置き換えでしかないことになります。そして、Ktor内でもそのようにしか使われていないようです。
これまで、 TSubject
を StringBuilder
としていましたが、オブジェクト内部の変更だけでなくオブジェクトそのものの置換ができるのであれば、イミュータブルなクラスである String
を使うのでもよいのかもしれません。
val phase = PipelinePhase("SinglePhase")
val pipeline = Pipeline<String, Unit>(phase)
pipeline.intercept(phase) {
proceedWith("replaced")
}
val result = pipeline.execute(Unit, "init")
println(result)
// replaced
ちなみに、 PipelineContext
のプロパティ subject
はセッターを公開しているので、上のコードは、proceedWith
呼出をやめて次のように書けなくもありません。が、ちょっと気持ち悪い感じはします。
pipeline.intercept(phase) {
this.subject = "replaced"
}
Pipelineのマージ
(執筆中)
-
Ktor Serverでの使い方を見るに、これは意図的だと思う ↩︎
Discussion