Open8

Ktorのプラグインはどのように実現されているのか

arashiyamaarashiyama
HttpClient(CIO) {
    install(ContentNegotiation) {
        json(Json.Default)
    }
}

のようなpluginのinstallはどのように実現されているのかを調べつつ雑記。気が向いたら清書するかも

arashiyamaarashiyama

以下コードは https://github.com/ktorio/ktor から適宜省略しつつ引用しています
ktor-client/ktor-client-core/common/src/io/ktor/client/HttpClient.kt

public fun HttpClient(
    engine: HttpClientEngine,
    block: HttpClientConfig<*>.() -> Unit
): HttpClient = HttpClient(engine, HttpClientConfig<HttpClientEngineConfig>().apply(block), manageEngine = false)

よく使うHttpClientのシグネチャは多分これ。一旦enginは無視する

public class HttpClientConfig<T : HttpClientEngineConfig> {
    private val plugins: MutableMap<AttributeKey<*>, (HttpClient) -> Unit> = mutableMapOf()
    private val pluginConfigurations: MutableMap<AttributeKey<*>, Any.() -> Unit> = mutableMapOf()
    private val customInterceptors: MutableMap<String, (HttpClient) -> Unit> = mutableMapOf()
    // 中略
   public fun <TBuilder : Any, TPlugin : Any> install(
        plugin: HttpClientPlugin<TBuilder, TPlugin>,
        configure: TBuilder.() -> Unit = {}
    ) {
        val previousConfigBlock = pluginConfigurations[plugin.key]
        pluginConfigurations[plugin.key] = {
            previousConfigBlock?.invoke(this)

            @Suppress("UNCHECKED_CAST")
            (this as TBuilder).configure()
        }

        if (plugins.containsKey(plugin.key)) return

        plugins[plugin.key] = { scope ->
            val attributes = scope.attributes.computeIfAbsent(PLUGIN_INSTALLED_LIST) { Attributes(concurrent = true) }
            val config = scope.config.pluginConfigurations[plugin.key]!!
            val pluginData = plugin.prepare(config)

            plugin.install(pluginData, scope)
            attributes.put(plugin.key, pluginData)
        }
    }

pluginはmutableMapで管理されていて、plugin:HttpClientPlugin<TBuilder, TPlugin>はkeyを持っていてそれに対してpluginConfigurationspluginsに追加している。書き方からして多重installも行けるっぽい。

arashiyamaarashiyama

HttpClientPluginを見ていく。

interface HttpClientPlugin<out TConfig : Any, TPlugin : Any> {
    /**
     * The [AttributeKey] for this plugin.
     *
     * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.plugins.HttpClientPlugin.key)
     */
    public val key: AttributeKey<TPlugin>

    /**
     * Builds a [TPlugin] by calling the [block] with a [TConfig] config instance as receiver.
     *
     * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.plugins.HttpClientPlugin.prepare)
     */
    public fun prepare(block: TConfig.() -> Unit = {}): TPlugin

    /**
     * Installs the [plugin] class for a [HttpClient] defined at [scope].
     *
     * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.plugins.HttpClientPlugin.install)
     */
    public fun install(plugin: TPlugin, scope: HttpClient)
}

AttributeKeyはStringのラップに少し毛が生えたもの。TypeInfoは後で調べてみる。

public data class AttributeKey<T : Any> @JvmOverloads constructor(
    public val name: String,
    private val type: TypeInfo = typeInfo<Any>(),
) {
    init {
        require(name.isNotBlank()) { "Name can't be blank" }
    }

    override fun toString(): String = "AttributeKey: $name"
}

<out TConfig : Any, TPlugin : Any>とどちらも上限型がないのが意外

実際にプラグインが継承するまでにもっと中間の型があって、ContentNegotiationの作成に関連するやつを集めると

public val ContentNegotiation: ClientPlugin<ContentNegotiationConfig> = createClientPlugin(
    "ContentNegotiation",
    ::ContentNegotiationConfig
) {/*省略*/}
public interface ClientPlugin<PluginConfig : Any> : HttpClientPlugin<PluginConfig, ClientPluginInstance<PluginConfig>>

public fun <PluginConfigT : Any> createClientPlugin(
    name: String,
    createConfiguration: () -> PluginConfigT,
    body: ClientPluginBuilder<PluginConfigT>.() -> Unit
): ClientPlugin<PluginConfigT> = ClientPluginImpl(name, createConfiguration, body)


private class ClientPluginImpl<PluginConfigT : Any>(
    name: String,
    private val createConfiguration: () -> PluginConfigT,
    private val body: ClientPluginBuilder<PluginConfigT>.() -> Unit
) : ClientPlugin<PluginConfigT> {

    override val key: AttributeKey<ClientPluginInstance<PluginConfigT>> = AttributeKey(name)

    override fun prepare(block: PluginConfigT.() -> Unit): ClientPluginInstance<PluginConfigT> {
        val config = createConfiguration().apply(block)
        return ClientPluginInstance(key, config, body)
    }

    @OptIn(InternalAPI::class)
    override fun install(plugin: ClientPluginInstance<PluginConfigT>, scope: HttpClient) {
        plugin.install(scope)
    }
}
arashiyamaarashiyama

Pluginが実際に処理をするための窓口っぽいやつがこいつ

public class ClientPluginBuilder<PluginConfig : Any> internal constructor(
    internal val key: AttributeKey<ClientPluginInstance<PluginConfig>>,
    public val client: HttpClient,
    public val pluginConfig: PluginConfig
) {

    internal val hooks: MutableList<HookHandler<*>> = mutableListOf()
    internal var onClose: () -> Unit = {}

 
    public fun onRequest(
        block: suspend OnRequestContext.(request: HttpRequestBuilder, content: Any) -> Unit
    ) {
        on(RequestHook, block)
    }

 
    public fun onResponse(
        block: suspend OnResponseContext.(response: HttpResponse) -> Unit
    ) {
        on(ResponseHook, block)
    }

  
    public fun transformRequestBody(
        block: suspend TransformRequestBodyContext.(
            request: HttpRequestBuilder,
            content: Any,
            bodyType: TypeInfo?
        ) -> OutgoingContent?
    ) {
        on(TransformRequestBodyHook, block)
    }

  
    public fun transformResponseBody(
        block: suspend TransformResponseBodyContext.(
            response: HttpResponse,
            content: ByteReadChannel,
            requestedType: TypeInfo
        ) -> Any?
    ) {
        on(TransformResponseBodyHook, block)
    }


    public fun onClose(block: () -> Unit) {
        onClose = block
    }
    public fun <HookHandler> on(
        hook: ClientHook<HookHandler>,
        handler: HookHandler
    ) {
        hooks.add(HookHandler(hook, handler))
    }
}

arashiyamaarashiyama

Hookはそれなりにありそうだけど、ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/api下で定義されてるのは
CommonHooks.ktに以下の4つ

  • SetupRequest
  • Send
  • SendingRequest
  • MonitoringEvent

KtorCallContexts.ktに以下の4つ

  • RequestHook
  • ResponseHook
  • TransformRequestBodyHook
  • TransformResponseBodyHook

Hookは基本的にHttpClientが持つPipelineに割り込む。HttpClientが持つPipelineはこの4つ。例えばRequestHookはclient.requestPipelineに処理を追加している

    public val requestPipeline: HttpRequestPipeline = HttpRequestPipeline()
    public val responsePipeline: HttpResponsePipeline = HttpResponsePipeline()
    public val sendPipeline: HttpSendPipeline = HttpSendPipeline()
    public val receivePipeline: HttpReceivePipeline = HttpReceivePipeline()

蛇足だがHttpClient内ではpipeline同士を繋いだりもしている

      sendPipeline.intercept(HttpSendPipeline.Receive) { call ->
            check(call is HttpClientCall) { "Error: HttpClientCall expected, but found $call(${call::class})." }
            val response = receivePipeline.execute(Unit, call.response)
            call.setResponse(response)
            proceedWith(call)
        }

pipeline.intercept(phase){ /* do something*/} って感じで処理を差し込んでいる

arashiyamaarashiyama

Pipelineクラス自体はフェーズの管理に集中している感じがする。フェーズと言ってもフェーズの間に何かがあるわけではなく処理順のための境界みたいなもの。多分。PipelineがやりたそうなことはList<List<PipelineInterceptor>>の管理的なもので、実際にパイプラインが実行されるときはflatして実行している。キャッシュだとかの最適化が入ってるから難しい。

Pipelineクラスが作ったinterceptors: List<PipelineInterceptor<TSubject, TContext>>を実際に実行するのはPipelineContext<TSubject, TContext> クラスで実装は二つある。

internal fun <TSubject : Any, TContext : Any> pipelineContextFor(
    context: TContext,
    interceptors: List<PipelineInterceptor<TSubject, TContext>>,
    subject: TSubject,
    coroutineContext: CoroutineContext,
    debugMode: Boolean = false
): PipelineContext<TSubject, TContext> = if (DISABLE_SFG || debugMode) {
    DebugPipelineContext(context, interceptors, subject, coroutineContext)
} else {
    SuspendFunctionGun(subject, context, interceptors)
}

そもそもPipelineContextってなんやねんって話ではあるが、こいつは実はすでに出ている。interceptのラムダのレシーバがこいつ

pipeline.intercept(phase: PipelinePhase, block: PipelineInterceptor<TSubject, TContext>)
typealias PipelineInterceptor<TSubject, TContext> = suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit

実際に何をするのかというと、パイプラインに運ばれているデータをプラグインに渡したり、差し込んだプラグインの結果を後続に伝えたりする。

public abstract class PipelineContext<TSubject : Any, TContext : Any>(
    public val context: TContext
) : CoroutineScope {
    public abstract var subject: TSubject // パイプラインを流れているデータ
    public abstract fun finish() // 後続を含めてパイプラインを終了
    public abstract suspend fun proceedWith(subject: TSubject): TSubject // データを更新して続ける
    public abstract suspend fun proceed(): TSubject // ただ続ける

    internal abstract suspend fun execute(initial: TSubject): TSubject //パイプラインを開始(内部API)
}

それでDebugPipelineContextは愚直な実装でSuspendFunctionGunはcoroutine最適化ガチガチの実装という感じ。Jetbrainsは命名にGunを入れる選択肢を持っている

arashiyamaarashiyama

DebugPipelineContextは割と簡単なので全掲。ただinterceptorを順番に実行しているだけで、実行時にphaseに関する情報は残ってないことがわかる

@KtorDsl
internal class DebugPipelineContext<TSubject : Any, TContext : Any>(
    context: TContext,
    private val interceptors: List<PipelineInterceptor<TSubject, TContext>>,
    subject: TSubject,
    override val coroutineContext: CoroutineContext
) : PipelineContext<TSubject, TContext>(context) {
    /**
     * Subject of this pipeline execution
     */
    override var subject: TSubject = subject

    private var index = 0

    /**
     * Finishes current pipeline execution
     */
    override fun finish() {
        index = -1
    }

    /**
     * Continues execution of the pipeline with the given subject
     */
    override suspend fun proceedWith(subject: TSubject): TSubject {
        this.subject = subject
        return proceed()
    }

    /**
     * Continues execution of the pipeline with the same subject
     */
    override suspend fun proceed(): TSubject {
        val index = index
        if (index < 0) return subject

        if (index >= interceptors.size) {
            finish()
            return subject
        }

        return proceedLoop()
    }

    override suspend fun execute(initial: TSubject): TSubject {
        index = 0
        subject = initial
        return proceed()
    }

    private suspend fun proceedLoop(): TSubject {
        do {
            val index = index
            if (index == -1) {
                break
            }
            val interceptors = interceptors
            if (index >= interceptors.size) {
                finish()
                break
            }
            val executeInterceptor = interceptors[index]
            this.index = index + 1
            executeInterceptor.invoke(this, subject)
        } while (true)

        return subject
    }
}

arashiyamaarashiyama

以上がPluginがどのような道筋でinstallされるかでした。具体的にContentNegotiationがinstallされていく様子を追っていくと

HttpClient(CIO) {
    install(ContentNegotiation) {
        json(Json.Default)
    }
}
  1. json(Json.Default)でContentNegotiationにシリアライザが登録される
  2. install(ContentNegotiation) {...} によってHttpClientConfigにプラグインと設定がMutableMapに保存される
  3. HttpClientのinitブロックでHttpClientConfigに保存されたプラグインがまとめてinstallされる
  4. ContentNegotiationではinstall時にclient.requestPipelineのTransformフェーズとclient.responsePipelineのTransformフェーズにinterceptorが追加する

install自体はこれで完了で、リクエスト実行時は

  1. requestPipelineが発火される。パイプラインを通るのはAny型で初期値はsetBodyでユーザーが決めたものか決めなければEmptyContent
  2. requestPipelineのそれまでの処理が完了してTransformフェーズに到達する
  3. 登録されたシリアライザを用いて上流から送られてきたbody: Anyのシリアライズを試みて、成功したらproceedWithでそれを下流に伝える

と言った感じ