👽

Micrometer Context Propagation について

2024/08/14に公開

はじめに

Micrometer Context PropagationというライブラリがSpring BootおよびReactorに導入されました。 (Spring Boot3から)

その名の通り、スレッド間でContext Propagationを容易にするためのライブラリです。

この記事ではこのMicrometer Context Propagationについて解説をしようと思います。ニッチが故にあまり解説している記事が見当たらないため、誰かの役に立てば幸いです。

なぜMicrometer Context Propagationが作られたのか

元々、Java界隈ではMetrics用のライブラリとしてMicrometerが流行していました。このMicrometerが守備範囲を広げ、TracingやLoggingももカバーするようになっていきました。(要するに、守備範囲をObservabilityまで広げた)

一般的に(Javaに限った話ではなく)、Tracingを行うためにはContext Propagationと呼ばれるテクニックが必要になってきます。このテクニックを誰でも利用しやすく切り出したのがMicrometer Context Propagationです。

Micrometer専用というわけではなく、Kotlin Coroutine ContextやReactor Contextといった様々なライブラリとも連携しやすいように作られています。

なお、Spring BootにおいてはSpring Cloud Sleuth(以下、Sleuth)というSpring WebMVC/WebFluxでTracingを行うためのライブラリがありました。
https://spring.io/projects/spring-cloud-sleuth

Spring Boot3にてSleuthは廃止され、Sleuthが担っていた機能はMicrometerとSpring Boot側に組み込まれるようになりました。

Context Propagationってなに?

その名の通り、Contextを正しく伝搬させるテクニックのことです。すると今度はContextってなんだよ...となると思いますが、MDCだったりTraceContext(traceId, spandId等の集合)のことだと思ってください。

従来、JavaにおいてはこういったContextを伝搬させるにはThread Localに格納するだけでOKでした。実際、MDCも(braveの)TraceContextもThread Localに格納されています。

...文章だと伝わりにくいと思うので、具体例でみましょう。一番馴染みのあるMDCで確認してみましょう。

fun main() {
    MDC.put("hoge", "hoge-value")
 
    someFunc1()
    someFunc2()
}
 
fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

結果はこうなります。この結果については特に違和感はないでしょう。一度設定したMDCが、ちゃんと伝わっていることが確認できます。

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=main, mdc={hoge=hoge-value}

それではこの例だとどうでしょうか?someFunc2だけ別のスレッド上で実行させます。

fun main() {
    MDC.put("hoge", "hoge-value")
 
    someFunc1()
    thread { someFunc2() }.join()
}
 
fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

結果はこうなります。someFunc2だとMDCの中身がnullになっています。MDCがThread Localベースで作られているため、threadが切り替えると正しく伝搬することができません。

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc=null

正しく伝搬させるにはどうしたらいいかというと、以下のようなテクニック(このテクニックがContext Propagationです)が必要になってきます。今回はmdcPropagatingThreadという自前の関数を用意しています。

fun main() {
    MDC.put("hoge", "hoge-value")
 
    someFunc1()
    mdcPropagatingThread { someFunc2() }.join()
}
 
fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun mdcPropagatingThread(block: () -> Unit): Thread {
    val map = MDC.getCopyOfContextMap()
    return thread {
        MDC.setContextMap(map)
        try {
            block()
        } finally {
            MDC.clear() // この例だと使い捨てのthreadなので、clearしなくてもいいのだが...
        }
    }
}

結果はこうなります。someFunc2にも元のMDCが引き継がれていますね。

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc={hoge=hoge-value}

こういったテクニックを抽象化してOSSのライブラリとして提供しているのがMicrometer Context Propagationです。

先程の例は、Micrometer Context Propagationを使うとこんなふうに書くことができます。
これだけみるとMDCAccessorクラスのぶんコードが長くなってて大変そうやん...とツッコミたくなるかもしれませんが、色々な場所で再利用しやすくなるためトータルでは楽になります。

val snapshotFactory = ContextSnapshotFactory.builder().build()
 
fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
 
    MDC.put("hoge", "hoge-value")
 
    someFunc1()
    contextPropagatingThread { someFunc2() }.join()
}
 
fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
 
fun contextPropagatingThread(block: () -> Unit): Thread {
    val snapshot = snapshotFactory.captureAll()
    return thread {
        snapshot.setThreadLocals().use {
            block()
        }
    }
}  
 
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }
 
    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }
 
    override fun setValue(value: Map<String, String?>) {
        MDC.setContextMap(value)
    }
 
    override fun setValue() {
        MDC.clear()
    }
}

Micrometer Context Propagationの使い方

前置きが長くなりすぎてしまいましたが、ここからが本題です。Micrometer Context Propagationの使い方を説明していきます。

基本的に覚えるクラス(概念)は以下の4つ。

  • ThreadLocalAccessor
  • ContextAccessor
  • ContextRegistry
  • ContextSnapshot

ThreadLocalAccessor

Thread Localの値を伝搬させたいときは、ThreadLocalAccessorインタフェースに沿って実装しましょう。

以下のメソッドをオーバーライドする必要があります。

実際の例を見たほうがイメージがつかめると思うので、いくつか例を載せます。

例: MDC

再掲:

class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }
  
    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }
  
    override fun setValue(value: Map<String, String?>) {
        MDC.setContextMap(value)
    }
  
    override fun setValue() {
        MDC.clear()
    }
}

ただし、MDCに関してはbraveが更新することもあるため、上の書き方だと伝搬漏れするケースがある。(上の例で問題がないなら、上の例でもいい)
なので、場合によっては以下のように書いたほうがいい。

class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }
 
    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }
 
    override fun setValue(value: Map<String, String?>) {
        // braveのMDC decorationを併用するとうまく動作しないケースが発生するので、put方式で。
        // see: https://github.com/openzipkin/brave/blob/69003dfc811418f0dbc42e9e17ff880ebe1f4b02/brave/src/main/java/brave/propagation/CurrentTraceContext.java#L130
        value.forEach { (k, v) -> MDC.put(k, v) }
    }
 
    override fun setValue() {
        // NOOP
    }
 
    override fun restore(previousValue: Map<String, String?>) {
        MDC.setContextMap(previousValue)
    }
 
    override fun restore() {
        MDC.clear()
    }
}

restoreメソッドもあるので、こんなふうにある程度融通がききます。

例: Brave Trace Context

動作確認していないですが、多分こんな感じでしょう。

あくまでサンプルとして書いてみましたが、micrometer-tracingを使っているなら、micrometer組み込みのObservationThreadLocalAccessorを使えばTrace Contextも一緒に伝搬されます。

class BraveTracingContextAccessor : ThreadLocalAccessor<TraceContext> {
    override fun key(): Any {
        return BraveTracingContextAccessor::class.java
    }
 
    override fun getValue(): TraceContext? {
        return Tracing.current().currentTraceContext().get()
    }
 
    override fun setValue(value: TraceContext) {
        Tracing.current().currentTraceContext().maybeScope(value)
    }
 
    override fun setValue() {
        Tracing.current().currentTraceContext().maybeScope(null)
    }
}

ContextAccessor

Reactor ContextのようなMap-likeなオブジェクトを伝搬させたいときは、ContextAccessorインタフェースに沿って実装しましょう。

ただしこれはあんま自分で実装するケースはなさそうなので、そんなに覚えておかなくていいです。

例: Reactor Context

Reactor側でReactorContextAccessorとして実装されています。

/**
 * A {@code ContextAccessor} to enable reading values from a Reactor
 * {@link ContextView} and writing values to {@link Context}.
 * <p>
 * Please note that this public class implements the {@code libs.micrometer.contextPropagation}
 * SPI library, which is an optional dependency.
 *
 * @author Rossen Stoyanchev
 * @author Simon Baslé
 * @since 3.5.0
 */
public final class ReactorContextAccessor implements ContextAccessor<ContextView, Context> {
 
    @Override
    public Class<? extends ContextView> readableType() {
        return ContextView.class;
    }
 
    @Override
    public void readValues(ContextView source, Predicate<Object> keyPredicate, Map<Object, Object> target) {
        source.forEach((k, v) -> {
            if (keyPredicate.test(k)) {
                target.put(k, v);
            }
        });
    }
 
    @Override
    @Nullable
    public <T> T readValue(ContextView sourceContext, Object key) {
        return sourceContext.getOrDefault(key, null);
    }
 
    @Override
    public Class<? extends Context> writeableType() {
        return Context.class;
    }
 
    @Override
    public Context writeValues(Map<Object, Object> source, Context target) {
        return target.putAllMap(source);
    }
}

ContextRegistry

ContextRegistryは前述したThreadLocalAccessorとContextAccesorを保持するインスタンスのことです。

こんな感じで登録します。mainメソッドでアプリケーションを起動するまえに登録することを期待しているようです。

ContextRegistry.getInstance()
    .registerThreadLocalAccessor(...) // ThreadLocalAccessor
    .registerContextAccessor(...) // ContextAccesor

ContextRegistry自体は何個でもインスタンスを作れるのですが、ContextRegistry.getInstance()で取得できるインスタンスを共有して使用すれば良さそう。少なくともReactorはContextRegistry.getInstance()で取得できるインスタンスを使用しています

また、Service Loader経由で自動で登録することもできます。

ContextRegistry()
    .loadThreadLocalAccessors()
    .loadContextAccessors()

ContextRegistry.getInstance()で取得できるインスタンスに関しては、Service Loader経由の登録が予めされています。
https://github.com/micrometer-metrics/context-propagation/blob/75a243f3427d0941e09302c3fc29f5b2a0297583/context-propagation/src/main/java/io/micrometer/context/ContextRegistry.java#L40-L41

例えば、Micrometer ObservationのためのObservationThreadLocalAccessorは、Service Loader経由で自動で登録されるようになっています。
https://github.com/micrometer-metrics/micrometer/blob/a56b968ba5b3db9b5e4a4feac813080783a16f5f/micrometer-observation/src/main/resources/META-INF/services/io.micrometer.context.ThreadLocalAccessor

ReactorContextAccessorもService Loader経由で自動で登録されるようになっています。
https://github.com/reactor/reactor-core/blob/5553aa80137482ec26acb960f4d4f42b8a44da94/reactor-core/src/main/resources/META-INF/services/io.micrometer.context.ContextAccessor

ContextSnapshot

その名の通り、ContextのSnapshotです。このクラスに実装されているsetThreadLocalsやupdateContextを呼び出すことで、Contextを任意の場所(別のスレッドやReactor Context等)に伝搬させることができます。

ContextSnapshotはContextSnapshotFactory経由で作成することができます。

// 登録してあるThreadLocalAccessorから取得
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
 
// 登録してあるThreadLocalAccessorとReactor Contextから取得。ReactorContextAccessorが登録されている必要があります。
val snapshot = ContextSnapshotFactory.builder().build().captureAll(reactorContext)

ContextSnapshotFactoryを作るときに、builderで以下の設定をすることができます。(上の例はなにも設定していない)

ContextSnapshotFactoryを毎回作るのはダルいので、static変数として持つなり、beanにいれるなりしておくといいでしょう。

ContextSnapshotを使って、別のthreadに伝搬させるときはこんな感じ。

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
 
    MDC.put("hoge", "hoge-value")
 
    val snapshot = ContextSnapshotFactory.builder().build().captureAll()
    thread {
        snapshot.setThreadLocals().use {
            someFunc1()
        }
    }
}
 
fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

setThreadLocalsを呼び出すタイミングでkeyPredicateを指定すると、動作するThreadLocalAccessorを絞ることができます。
例えばMDCAccesorのみ動かしたいならこう。

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
 
    MDC.put("hoge", "hoge-value")
 
    val snapshot = ContextSnapshotFactory.builder().build().captureAll()
    thread {
        snapshot.setThreadLocals { key -> key == MDCAccessor::class.java }.use {
            someFunc1()
        }
    }
}

Reactor Contextへの伝搬

ContextSnapshotを使って、例えばReactor Contextに伝搬させたいときはこう。ReactorContextAccessorが登録されている必要はあります。

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    val snapshotFactory = ContextSnapshotFactory.builder().build()
 
    MDC.put("hoge", "hoge-value")
 
    val snapshot = snapshotFactory.captureAll()
    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Reactor Contextに伝搬されている
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
 
                // thread localには伝搬しない
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
 
                // thread localに伝搬したい場合は、再度こう書く
                snapshotFactory.captureFrom(it).setThreadLocals().use {
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
                }
 
                Mono.just(value)
            }
        }
        // updateContextメソッドでReactor Contextに伝搬させる
        .contextWrite { snapshot.updateContext(it) }
        .block()
}
 
// 結果
thread=parallel-1, mdcMap={hoge=hoge-value}
thread=parallel-1, mdc=null
thread=parallel-1, mdc={hoge=hoge-value}

ただし、Reactorの場合contextCaptureという便利なメソッドが用意されていて、こう書くこともできます。

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    val snapshotFactory = ContextSnapshotFactory.builder().build()
 
    MDC.put("hoge", "hoge-value")
 
    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Reactor Contextに伝搬されている
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
 
                // thread localには伝搬しない
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
 
                // thread localに伝搬したい場合は、再度こう書く
                snapshotFactory.captureFrom(it).setThreadLocals().use {
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
                }
 
                Mono.just(value)
            }
        }
        .contextCapture() // これ
        .block()
}

...書くことが多くてまだまだ不便です。Hooks.enableAutomaticContextPropagation()を実行しておくと、先程の以下のコードが自動で行ってくれるようになります。これでだいぶ便利になります。

  • block()の前のcontextCapture() (thread local → reactor contextの処理)
  • Mono.deferContextualで書いてる snapshotFactory.captureFrom(it).setThreadLocals() (reactor context → thread localの処理)
fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    Hooks.enableAutomaticContextPropagation()
 
    MDC.put("hoge", "hoge-value")
 
    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Reactor Contextに伝搬されている
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
 
                // thread localにも伝搬されている
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
 
                Mono.just(value)
            }
        }
        .block()
}
(おまけ) Kotlin Coroutines と Hooks.enableAutomaticContextPropagation

前述した通り、Hooks.enableAutomaticContextPropagationを利用するとMono.block等のメソッドでは自動でcontextCapture()を実行してくれるようになりますが、CoroutinesのawaitSingle等のメソッドでは実行されません。

ffun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    Hooks.enableAutomaticContextPropagation()
 
    val job = GlobalScope.launch {
        MDC.put("hoge", "hoge-value")
 
        Mono.delay(Duration.ofMillis(100))
            .flatMap { value ->
                Mono.deferContextual {
                    // Reactor Contextに伝搬されていない
                    println("thread=${Thread.currentThread().name}, mdcMap=${it.getOrDefault<Map<String, String?>>(MDCAccessor::class.java, null)}")
 
                    // thread localには伝搬されている
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
 
                    Mono.just(value)
                }
            }
            .awaitSingle()
    }
    runBlocking { job.join() }
}
 
// 結果
thread=parallel-1, mdcMap=null
thread=parallel-1, mdc={hoge=hoge-value}

以下のようなクラスを用意し、サービスローダーに登録するとblockと同様に自動でcontextCaptureをすることができますが、これでいいのかどうかはまだあまり自信はありません。
( issue をたてているので興味ある人はキャッチアップしてみてください → https://github.com/reactor/reactor-core/issues/3563)

package observation101.context.coroutines
 
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext
 
@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
        return when (publisher) {
            is Mono -> publisher.contextCapture()
            is Flux -> publisher.contextCapture()
            else -> publisher
        }
    }
}
// META-INF/services/kotlinx.coroutines.reactive.ContextInjectorに以下を記述
 
observation101.context.coroutines.ContextCaptureInjector

応用

Thread LocalをRunnable/Callable等に伝搬させたい

ContextSnapshotにwrapというメソッドが実装されています。これを使うといいでしょう.

val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = snapshot.wrap {
    // ...
}
 
// ↑と同等のコード
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = Runnable {
    snapshot.setThreadLocals().use {
        // ...
    }
}

Thread LocalをExecutorService / ScheduledExecutorServiceへ伝搬させたい

ContextExecutorService と ContextScheduledExecutorService が実装されています。これを使うといいでしょう。

Thread LocalをKotlin CoroutineContextへ伝搬させたい

Kotlin CoroutineではThead LocalのためにThreadContextElementという仕組みが用意されています。これとMicrometer Context Propagationを繋いであげればOKです。

class ContextPropagationThreadLocalElement : ThreadContextElement<AutoCloseable>, AbstractCoroutineContextElement(KEY) {
    private val snapshot = factory.captureAll()
 
    override fun updateThreadContext(context: CoroutineContext): AutoCloseable {
        return snapshot.setThreadLocals()
    }
 
    override fun restoreThreadContext(context: CoroutineContext, oldState: AutoCloseable) {
        oldState.close()
    }
 
    companion object {
        private val KEY = object : CoroutineContext.Key<ContextPropagationThreadLocalElement> {}
        private val factory = ContextSnapshotFactory.builder().build()
    }
}

おわりに

JavaではThread Localをベースに色々なエコシステムができているため、ReactorやKotlin Coroutineを使っているとしばしば困りがちです。

元々自分は仕事にてこれと似たようなライブラリを書いて使っていたのですが、OSSとしてMicrometerが提供してくれたことにより、だいぶ楽になりました。

また、記事中にも書いたように、Micrometer Context Propagationに沿って実装しておくと、ReactorやKotlin Coroutineにも応用ができるので便利です。

今まではReactorやKotlin Coroutine向けに似たようなコードを書いていたのですが、Micrometer Context Propagationがいい感じに抽象化してくれるので使い回せることができます。

というわけでだいぶニッチな内容ではありましたが、どなたかの役に立てば幸いです。

Discussion