Micrometer Context Propagation について
はじめに
Micrometer Context PropagationというライブラリがSpring BootおよびReactorに導入されました。 (Spring Boot3から)
- https://github.com/micrometer-metrics/context-propagation
- https://docs.micrometer.io/context-propagation/reference/
その名の通り、スレッド間でContext Propagationを容易にするためのライブラリです。
この記事ではこのMicrometer Context Propagationについて解説をしようと思います。ニッチが故にあまり解説している記事が見当たらないため、誰かの役に立てば幸いです。
なぜMicrometer Context Propagationが作られたのか
元々、Java界隈ではMetrics用のライブラリとしてMicrometerが流行していました。このMicrometerが守備範囲を広げ、TracingやLoggingももカバーするようになっていきました。(要するに、守備範囲をObservabilityまで広げた)
一般的に(Javaに限った話ではなく)、Tracingを行うためにはContext Propagationと呼ばれるテクニックが必要になってきます。このテクニックを誰でも利用しやすく切り出したのがMicrometer Context Propagationです。
Micrometer専用というわけではなく、Kotlin Coroutine ContextやReactor Contextといった様々なライブラリとも連携しやすいように作られています。
なお、Spring BootにおいてはSpring Cloud Sleuth(以下、Sleuth)というSpring WebMVC/WebFluxでTracingを行うためのライブラリがありました。
Spring Boot3にてSleuthは廃止され、Sleuthが担っていた機能はMicrometerとSpring Boot側に組み込まれるようになりました。
Context Propagationってなに?
その名の通り、Contextを正しく伝搬させるテクニックのことです。すると今度はContextってなんだよ...となると思いますが、MDCだったりTraceContext(traceId, spandId等の集合)のことだと思ってください。
従来、JavaにおいてはこういったContextを伝搬させるにはThread Localに格納するだけでOKでした。実際、MDCも(braveの)TraceContextもThread Localに格納されています。
...文章だと伝わりにくいと思うので、具体例でみましょう。一番馴染みのあるMDCで確認してみましょう。
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
someFunc2()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
結果はこうなります。この結果については特に違和感はないでしょう。一度設定したMDCが、ちゃんと伝わっていることが確認できます。
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=main, mdc={hoge=hoge-value}
それではこの例だとどうでしょうか?someFunc2だけ別のスレッド上で実行させます。
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
thread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
結果はこうなります。someFunc2だとMDCの中身がnullになっています。MDCがThread Localベースで作られているため、threadが切り替えると正しく伝搬することができません。
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc=null
正しく伝搬させるにはどうしたらいいかというと、以下のようなテクニック(このテクニックがContext Propagationです)が必要になってきます。今回はmdcPropagatingThreadという自前の関数を用意しています。
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
mdcPropagatingThread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun mdcPropagatingThread(block: () -> Unit): Thread {
val map = MDC.getCopyOfContextMap()
return thread {
MDC.setContextMap(map)
try {
block()
} finally {
MDC.clear() // この例だと使い捨てのthreadなので、clearしなくてもいいのだが...
}
}
}
結果はこうなります。someFunc2にも元のMDCが引き継がれていますね。
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc={hoge=hoge-value}
こういったテクニックを抽象化してOSSのライブラリとして提供しているのがMicrometer Context Propagationです。
先程の例は、Micrometer Context Propagationを使うとこんなふうに書くことができます。
これだけみるとMDCAccessorクラスのぶんコードが長くなってて大変そうやん...とツッコミたくなるかもしれませんが、色々な場所で再利用しやすくなるためトータルでは楽になります。
val snapshotFactory = ContextSnapshotFactory.builder().build()
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
someFunc1()
contextPropagatingThread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun contextPropagatingThread(block: () -> Unit): Thread {
val snapshot = snapshotFactory.captureAll()
return thread {
snapshot.setThreadLocals().use {
block()
}
}
}
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
MDC.setContextMap(value)
}
override fun setValue() {
MDC.clear()
}
}
Micrometer Context Propagationの使い方
前置きが長くなりすぎてしまいましたが、ここからが本題です。Micrometer Context Propagationの使い方を説明していきます。
基本的に覚えるクラス(概念)は以下の4つ。
- ThreadLocalAccessor
- ContextAccessor
- ContextRegistry
- ContextSnapshot
ThreadLocalAccessor
Thread Localの値を伝搬させたいときは、ThreadLocalAccessorインタフェースに沿って実装しましょう。
以下のメソッドをオーバーライドする必要があります。
-
Object key()
- 任意のKeyを返してください。
-
Class<?>
を使用しておけばいい。(e.g.SomeThreadLocalAccessor::class.java
)
-
@Nullable V getValue()
- 伝搬させたい値を返す
- Thread Localから取得するケースが多いだろう
- 伝搬させたい値を返す
-
void setValue(V value)
- 伝搬させたい値をセットする
- Thread Localにセットするケースが多いだろう
- Vはnon-null
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L86
- 伝搬させたい値をセットする
-
void setValue()
-
restore()
をオーバーライドする場合は実装は必須ではない。ただしclearMissing設定が有効化されているときは必須。 - 伝搬させたい値がnullだったときに呼ばれる
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L89
-
-
void restore(V previousValue)
- 実装は必須ではない。実装しない場合は、
setValue(V value)
を使う - 元の値に戻す時に呼ばれる
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L149
- Vはnon-null
- 実装は必須ではない。実装しない場合は、
-
void restore()
- 実装は必須ではない。 実装しない場合は、
setValue()
を使う - 元の値に戻すときに、元の値がnullだったときに呼ばれる
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L152
- 実装は必須ではない。 実装しない場合は、
実際の例を見たほうがイメージがつかめると思うので、いくつか例を載せます。
例: MDC
再掲:
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
MDC.setContextMap(value)
}
override fun setValue() {
MDC.clear()
}
}
ただし、MDCに関してはbraveが更新することもあるため、上の書き方だと伝搬漏れするケースがある。(上の例で問題がないなら、上の例でもいい)
なので、場合によっては以下のように書いたほうがいい。
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
// braveのMDC decorationを併用するとうまく動作しないケースが発生するので、put方式で。
// see: https://github.com/openzipkin/brave/blob/69003dfc811418f0dbc42e9e17ff880ebe1f4b02/brave/src/main/java/brave/propagation/CurrentTraceContext.java#L130
value.forEach { (k, v) -> MDC.put(k, v) }
}
override fun setValue() {
// NOOP
}
override fun restore(previousValue: Map<String, String?>) {
MDC.setContextMap(previousValue)
}
override fun restore() {
MDC.clear()
}
}
restoreメソッドもあるので、こんなふうにある程度融通がききます。
例: Brave Trace Context
動作確認していないですが、多分こんな感じでしょう。
あくまでサンプルとして書いてみましたが、micrometer-tracingを使っているなら、micrometer組み込みのObservationThreadLocalAccessorを使えばTrace Contextも一緒に伝搬されます。
class BraveTracingContextAccessor : ThreadLocalAccessor<TraceContext> {
override fun key(): Any {
return BraveTracingContextAccessor::class.java
}
override fun getValue(): TraceContext? {
return Tracing.current().currentTraceContext().get()
}
override fun setValue(value: TraceContext) {
Tracing.current().currentTraceContext().maybeScope(value)
}
override fun setValue() {
Tracing.current().currentTraceContext().maybeScope(null)
}
}
ContextAccessor
Reactor ContextのようなMap-likeなオブジェクトを伝搬させたいときは、ContextAccessorインタフェースに沿って実装しましょう。
ただしこれはあんま自分で実装するケースはなさそうなので、そんなに覚えておかなくていいです。
例: Reactor Context
Reactor側でReactorContextAccessorとして実装されています。
/**
* A {@code ContextAccessor} to enable reading values from a Reactor
* {@link ContextView} and writing values to {@link Context}.
* <p>
* Please note that this public class implements the {@code libs.micrometer.contextPropagation}
* SPI library, which is an optional dependency.
*
* @author Rossen Stoyanchev
* @author Simon Baslé
* @since 3.5.0
*/
public final class ReactorContextAccessor implements ContextAccessor<ContextView, Context> {
@Override
public Class<? extends ContextView> readableType() {
return ContextView.class;
}
@Override
public void readValues(ContextView source, Predicate<Object> keyPredicate, Map<Object, Object> target) {
source.forEach((k, v) -> {
if (keyPredicate.test(k)) {
target.put(k, v);
}
});
}
@Override
@Nullable
public <T> T readValue(ContextView sourceContext, Object key) {
return sourceContext.getOrDefault(key, null);
}
@Override
public Class<? extends Context> writeableType() {
return Context.class;
}
@Override
public Context writeValues(Map<Object, Object> source, Context target) {
return target.putAllMap(source);
}
}
ContextRegistry
ContextRegistryは前述したThreadLocalAccessorとContextAccesorを保持するインスタンスのことです。
こんな感じで登録します。mainメソッドでアプリケーションを起動するまえに登録することを期待しているようです。
ContextRegistry.getInstance()
.registerThreadLocalAccessor(...) // ThreadLocalAccessor
.registerContextAccessor(...) // ContextAccesor
ContextRegistry自体は何個でもインスタンスを作れるのですが、ContextRegistry.getInstance()
で取得できるインスタンスを共有して使用すれば良さそう。少なくともReactorはContextRegistry.getInstance()
で取得できるインスタンスを使用しています。
また、Service Loader経由で自動で登録することもできます。
ContextRegistry()
.loadThreadLocalAccessors()
.loadContextAccessors()
ContextRegistry.getInstance()
で取得できるインスタンスに関しては、Service Loader経由の登録が予めされています。
例えば、Micrometer ObservationのためのObservationThreadLocalAccessorは、Service Loader経由で自動で登録されるようになっています。
ReactorContextAccessorもService Loader経由で自動で登録されるようになっています。
ContextSnapshot
その名の通り、ContextのSnapshotです。このクラスに実装されているsetThreadLocalsやupdateContextを呼び出すことで、Contextを任意の場所(別のスレッドやReactor Context等)に伝搬させることができます。
ContextSnapshotはContextSnapshotFactory経由で作成することができます。
// 登録してあるThreadLocalAccessorから取得
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
// 登録してあるThreadLocalAccessorとReactor Contextから取得。ReactorContextAccessorが登録されている必要があります。
val snapshot = ContextSnapshotFactory.builder().build().captureAll(reactorContext)
ContextSnapshotFactoryを作るときに、builderで以下の設定をすることができます。(上の例はなにも設定していない)
-
contextRegistry
- デフォだと
ContextRegistry.getInstance()
で取得できるインスタンスを使用します。 - 基本的にデフォのままでいいはず。困ったら指定しよう
- デフォだと
-
captureKeyPredicate
-
ThreadLocalAccesor#key()
の値を使って、captureするThreadLocalAccesorを指定することができます。指定しないと登録してあるThreadLocalAccesorを全部使用します。 - 基本的にデフォのままでいいはず。困ったら指定しよう
-
-
clearMissing
- 伝搬させたい値がなかったときに、
ThreadLocalAccessor#setValue
を呼ぶかどうか - https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L89
- 基本的にデフォのままでいいはず。困ったら指定しよう
- 伝搬させたい値がなかったときに、
ContextSnapshotFactoryを毎回作るのはダルいので、static変数として持つなり、beanにいれるなりしておくといいでしょう。
ContextSnapshotを使って、別のthreadに伝搬させるときはこんな感じ。
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
thread {
snapshot.setThreadLocals().use {
someFunc1()
}
}
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
setThreadLocalsを呼び出すタイミングでkeyPredicateを指定すると、動作するThreadLocalAccessorを絞ることができます。
例えばMDCAccesorのみ動かしたいならこう。
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
thread {
snapshot.setThreadLocals { key -> key == MDCAccessor::class.java }.use {
someFunc1()
}
}
}
Reactor Contextへの伝搬
ContextSnapshotを使って、例えばReactor Contextに伝搬させたいときはこう。ReactorContextAccessorが登録されている必要はあります。
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
val snapshotFactory = ContextSnapshotFactory.builder().build()
MDC.put("hoge", "hoge-value")
val snapshot = snapshotFactory.captureAll()
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Reactor Contextに伝搬されている
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// thread localには伝搬しない
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
// thread localに伝搬したい場合は、再度こう書く
snapshotFactory.captureFrom(it).setThreadLocals().use {
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Mono.just(value)
}
}
// updateContextメソッドでReactor Contextに伝搬させる
.contextWrite { snapshot.updateContext(it) }
.block()
}
// 結果
thread=parallel-1, mdcMap={hoge=hoge-value}
thread=parallel-1, mdc=null
thread=parallel-1, mdc={hoge=hoge-value}
ただし、Reactorの場合contextCaptureという便利なメソッドが用意されていて、こう書くこともできます。
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
val snapshotFactory = ContextSnapshotFactory.builder().build()
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Reactor Contextに伝搬されている
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// thread localには伝搬しない
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
// thread localに伝搬したい場合は、再度こう書く
snapshotFactory.captureFrom(it).setThreadLocals().use {
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Mono.just(value)
}
}
.contextCapture() // これ
.block()
}
...書くことが多くてまだまだ不便です。Hooks.enableAutomaticContextPropagation()
を実行しておくと、先程の以下のコードが自動で行ってくれるようになります。これでだいぶ便利になります。
-
block()
の前のcontextCapture()
(thread local → reactor contextの処理) -
Mono.deferContextual
で書いてるsnapshotFactory.captureFrom(it).setThreadLocals()
(reactor context → thread localの処理)
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
Hooks.enableAutomaticContextPropagation()
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Reactor Contextに伝搬されている
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// thread localにも伝搬されている
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
Mono.just(value)
}
}
.block()
}
(おまけ) Kotlin Coroutines と Hooks.enableAutomaticContextPropagation
前述した通り、Hooks.enableAutomaticContextPropagation
を利用するとMono.block
等のメソッドでは自動でcontextCapture()
を実行してくれるようになりますが、CoroutinesのawaitSingle
等のメソッドでは実行されません。
ffun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
Hooks.enableAutomaticContextPropagation()
val job = GlobalScope.launch {
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Reactor Contextに伝搬されていない
println("thread=${Thread.currentThread().name}, mdcMap=${it.getOrDefault<Map<String, String?>>(MDCAccessor::class.java, null)}")
// thread localには伝搬されている
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
Mono.just(value)
}
}
.awaitSingle()
}
runBlocking { job.join() }
}
// 結果
thread=parallel-1, mdcMap=null
thread=parallel-1, mdc={hoge=hoge-value}
以下のようなクラスを用意し、サービスローダーに登録するとblockと同様に自動でcontextCaptureをすることができますが、これでいいのかどうかはまだあまり自信はありません。
( issue をたてているので興味ある人はキャッチアップしてみてください → https://github.com/reactor/reactor-core/issues/3563)
package observation101.context.coroutines
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
return when (publisher) {
is Mono -> publisher.contextCapture()
is Flux -> publisher.contextCapture()
else -> publisher
}
}
}
// META-INF/services/kotlinx.coroutines.reactive.ContextInjectorに以下を記述
observation101.context.coroutines.ContextCaptureInjector
応用
Thread LocalをRunnable/Callable等に伝搬させたい
ContextSnapshotにwrapというメソッドが実装されています。これを使うといいでしょう.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = snapshot.wrap {
// ...
}
// ↑と同等のコード
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = Runnable {
snapshot.setThreadLocals().use {
// ...
}
}
Thread LocalをExecutorService / ScheduledExecutorServiceへ伝搬させたい
ContextExecutorService と ContextScheduledExecutorService が実装されています。これを使うといいでしょう。
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/ContextExecutorService.java
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/ContextScheduledExecutorService.java
Thread LocalをKotlin CoroutineContextへ伝搬させたい
Kotlin CoroutineではThead LocalのためにThreadContextElementという仕組みが用意されています。これとMicrometer Context Propagationを繋いであげればOKです。
class ContextPropagationThreadLocalElement : ThreadContextElement<AutoCloseable>, AbstractCoroutineContextElement(KEY) {
private val snapshot = factory.captureAll()
override fun updateThreadContext(context: CoroutineContext): AutoCloseable {
return snapshot.setThreadLocals()
}
override fun restoreThreadContext(context: CoroutineContext, oldState: AutoCloseable) {
oldState.close()
}
companion object {
private val KEY = object : CoroutineContext.Key<ContextPropagationThreadLocalElement> {}
private val factory = ContextSnapshotFactory.builder().build()
}
}
おわりに
JavaではThread Localをベースに色々なエコシステムができているため、ReactorやKotlin Coroutineを使っているとしばしば困りがちです。
元々自分は仕事にてこれと似たようなライブラリを書いて使っていたのですが、OSSとしてMicrometerが提供してくれたことにより、だいぶ楽になりました。
また、記事中にも書いたように、Micrometer Context Propagationに沿って実装しておくと、ReactorやKotlin Coroutineにも応用ができるので便利です。
今まではReactorやKotlin Coroutine向けに似たようなコードを書いていたのですが、Micrometer Context Propagationがいい感じに抽象化してくれるので使い回せることができます。
というわけでだいぶニッチな内容ではありましたが、どなたかの役に立てば幸いです。
Discussion