Akka gRPC(Scala)+CloudTraceで分散トレーシング
はじめに
CloudTraceとは
概要はこちら。
GCP曰く「誰でも使用できる分散トレース システム」とのことで、GKE上で動かしているAkka gRPCで書かれたサービスのトレース結果を下のイメージ(クイックスタートより拝借)のように見たいなぁというのが今回の動機。
Scala環境ではOpenTelemetry(Java)を利用できそうです。
OpenTelemetryとは
概要はこちら。
Google翻訳によると「OpenTelemetryは、トレース、メトリック、ログなどのテレメトリデータの作成と管理用に設計された、API、SDK、ツール、および統合のセットです。」らしい。
OpenTracingとOpenCensusの合併した後継プロジェクトであり、CNCFのインキュベーターになっています。
Java以外にもGoとか.Netとか用意されているみたいですね。
Akka gRPCで利用する
分散しないトレーシング
早速GCPのドキュメント「Java と OpenTelemetry」辺りを参考に実装してみました。
めっちゃ簡単じゃん。
と思いきや全然分散トレーシングになっておらず、個々のトレース結果が別々に記録されているだけの寂しいものに・・・。
リモート間でコンテキストを伝搬せよ
仕方ないのでOpenTelemetry側のドキュメントを見てみるとどうやらTraceIDやらSpanIDやらのコンテキストをキャリア(Httpを利用したサービスだったらHttpヘッダとか)に埋め込んでプロパゲーション(伝搬)しろと・・・。
まぁ当たり前か。
でも親切なことにW3C準拠のコンテキストのプロパゲーター(伝搬屋)と呼ばれるコンテキストのパーサーが標準で用意してあるのでそれを使えば良いと書いてありますね。
JavaのgRPCだったら・・・
gRPCではHttpヘッダのように外部データを埋め込みたい場合は、Metadataを利用すれば良さそうです。
これまた親切なことにgithubのexamples/grpcにInterceptorを利用した実装例が載っています。
Javaの標準gRPCだったら苦も無く実装できそうですね・・・。
Akka gRPC故に
前提として、Akka gRPCのMetadataへのデータ埋め込み方法は、SingleResponseRequestBuilder
(もしくはStreamResponseRequestBuilder
)のaddHeader(key: String, value: String)
メソッドを呼ぶことでMetadataの埋め込まれたSingleResponseRequestBuilder
が返ってきます(参考)。
そしてOpenTelemetryに標準で用意されているTextMapPropergetor
はTextMapSetter<T>
インターフェースを受け取るinject
メソッドと、TextMapGetter<T>
インターフェースを受け取るextract
メソッドにより文字列←→コンテキストのパース処理を行います(T型はキャリアの型)。
TextMapSetter<T>
で実装を必要とするset(carrier: T, key: String, value: String)
メソッドの戻り値はUnit
であり、更新されたT
は返ってきません。
Akka gRPCの場合、TextMapSetter<T>
のT型はSingleResponseRequestBuilder
になるわけなんですが・・・。
シグネチャが噛み合いません。ResponseRequestBuilder
のラッパーを用意してゴニョゴニョすることもできそうですが、副作用くさくなるのは許容できません。
あ、あとInterceptorも無いです(コンセプトが違うから用意する気ないよと言っておられる)。メソッド毎にトレーサーの埋め込み・取り出し処理が必要です。←これが地味にしんどかった。
プロパゲーターを実装する
しようがないでAkka gRPC用にプロパゲーターを自力で実装することにしました。
とりあえずコンテキストはW3C準拠でここら辺を参考に実装します。
長いので畳みます
package com.example
import java.util.regex.Pattern
import scala.jdk.CollectionConverters.*
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import cats.implicits.*
import io.opentelemetry.api.internal.OtelEncodingUtils
import io.opentelemetry.api.internal.TemporaryBuffers
import io.opentelemetry.api.internal.Utils.checkArgument
import io.opentelemetry.api.trace.*
import io.opentelemetry.context.Context
import net.logstash.logback.argument.StructuredArguments.keyValue
/** W3C Trace context propergator */
object W3CTraceContextPropergator {
private val TRACE_PARENT = "traceparent"
private val TRACE_STATE = "tracestate"
private val VERSION = "00"
private val VERSION_SIZE = 2
private val TRACEPARENT_DELIMITER = '-'
private val TRACEPARENT_DELIMITER_SIZE = 1
private val TRACE_ID_HEX_SIZE = TraceId.getLength
private val SPAN_ID_HEX_SIZE = SpanId.getLength
private val TRACE_OPTION_HEX_SIZE = TraceFlags.getLength
private val TRACE_ID_OFFSET = VERSION_SIZE + TRACEPARENT_DELIMITER_SIZE
private val SPAN_ID_OFFSET =
TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE
private val TRACE_OPTION_OFFSET =
SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE
private val TRACEPARENT_HEADER_SIZE =
TRACE_OPTION_OFFSET + TRACE_OPTION_HEX_SIZE
private val TRACESTATE_MAX_SIZE = 512
private val TRACESTATE_MAX_MEMBERS = 32
private val TRACESTATE_KEY_VALUE_DELIMITER = '='
private val TRACESTATE_ENTRY_DELIMITER = ','
private val TRACESTATE_ENTRY_DELIMITER_SPLIT_PATTERN =
Pattern.compile("[ \t]*" + TRACESTATE_ENTRY_DELIMITER + "[ \t]*")
private def stringFromTraceParent(traceParent: SpanContext) = Try {
val chars = TemporaryBuffers.chars(TRACEPARENT_HEADER_SIZE)
chars.update(0, VERSION.charAt(0))
chars.update(1, VERSION.charAt(1))
chars.update(2, TRACEPARENT_DELIMITER)
val traceId = traceParent.getTraceId
traceId.toCharArray.zipWithIndex.foreach { case (c, i) =>
chars.update(TRACE_ID_OFFSET + i, c)
}
chars.update(SPAN_ID_OFFSET - 1, TRACEPARENT_DELIMITER)
val spanId = traceParent.getSpanId
spanId.toCharArray.zipWithIndex.foreach { case (c, i) =>
chars.update(SPAN_ID_OFFSET + i, c)
}
chars.update(TRACE_OPTION_OFFSET - 1, TRACEPARENT_DELIMITER)
val traceFlagsHex = traceParent.getTraceFlags.asHex()
chars.update(TRACE_OPTION_OFFSET, traceFlagsHex.charAt(0))
chars.update(TRACE_OPTION_OFFSET + 1, traceFlagsHex.charAt(1))
new String(chars, 0, TRACEPARENT_HEADER_SIZE)
} match {
case Success(v) =>
v
case Failure(ex) =>
println(s"Error: $ex")
""
}
private def stringFromTraceState(traceState: TraceState) = Try {
val stringBuilder = traceState
.asMap()
.asScala
.foldLeft(new StringBuilder(TRACESTATE_MAX_SIZE)) {
case (acc, (key, value)) =>
if (acc.nonEmpty)
acc.apply(TRACESTATE_ENTRY_DELIMITER)
acc
.append(key)
.append(TRACESTATE_KEY_VALUE_DELIMITER)
.append(value)
}
stringBuilder.toString()
} match {
case Success(v) =>
v
case Failure(ex) =>
println(s"Error: $ex")
""
}
private def traceParentFromString(traceParentHeader: String) =
Try {
if (
!((traceParentHeader.length == TRACEPARENT_HEADER_SIZE ||
traceParentHeader.length > TRACEPARENT_HEADER_SIZE &&
traceParentHeader
.charAt(TRACEPARENT_HEADER_SIZE) == TRACEPARENT_DELIMITER) &&
traceParentHeader.charAt(
TRACE_ID_OFFSET - 1
) == TRACEPARENT_DELIMITER &&
traceParentHeader.charAt(
SPAN_ID_OFFSET - 1
) == TRACEPARENT_DELIMITER &&
traceParentHeader.charAt(
TRACE_OPTION_OFFSET - 1
) == TRACEPARENT_DELIMITER)
)
throw new IllegalArgumentException("Invalid trace parent header.")
else {
val version = traceParentHeader.substring(0, VERSION_SIZE)
if (version != VERSION)
throw new IllegalArgumentException("Invalid trace version.")
else {
val traceIdHex = traceParentHeader.substring(
TRACE_ID_OFFSET,
TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE
)
val spanIdHex = traceParentHeader.substring(
SPAN_ID_OFFSET,
SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE
)
val firstTraceFlagsChar =
traceParentHeader.charAt(TRACE_OPTION_OFFSET)
val secondTraceFlagsChar =
traceParentHeader.charAt(TRACE_OPTION_OFFSET + 1)
if (
!OtelEncodingUtils.isValidBase16Character(firstTraceFlagsChar)
|| !OtelEncodingUtils.isValidBase16Character(secondTraceFlagsChar)
)
throw new IllegalArgumentException("Invalid trace flags.")
else {
val traceFlags = TraceFlags.fromByte(
OtelEncodingUtils
.byteFromBase16(firstTraceFlagsChar, secondTraceFlagsChar)
)
SpanContext
.createFromRemoteParent(
traceIdHex,
spanIdHex,
traceFlags,
TraceState.getDefault
)
}
}
}
} match {
case Success(v) =>
v.some
case Failure(ex) =>
println(s"Error: $ex")
none[SpanContext]
}
private def traceStateFromString(traceStateHeader: String) = Try {
val listMembers =
TRACESTATE_ENTRY_DELIMITER_SPLIT_PATTERN.split(traceStateHeader)
checkArgument(
listMembers.length <= TRACESTATE_MAX_MEMBERS,
"TraceState has too many elements."
)
val traceStateBuilder = listMembers.reverse.foldLeft(TraceState.builder()) {
case (acc, v) =>
val index = v.indexOf(TRACESTATE_KEY_VALUE_DELIMITER)
checkArgument(index != -1, "Invalid TraceState list-member format.")
acc.put(v.substring(0, index), v.substring(index + 1))
}
val traceState = traceStateBuilder.build()
if (traceState.size() != listMembers.length)
throw new IllegalArgumentException("Invalid trace state size.")
else
traceState.some
} match {
case Success(v) =>
v
case Failure(ex) =>
println(s"Error: $ex")
none[TraceState]
}
/** Inject
*
* @param carrier
* carrier
* @param tracer
* tracer
* @param setter
* setter
* @tparam T
* type of carrier
*/
def inject[T](
carrier: T,
tracer: OpenTelemetryTracer
)(implicit setter: W3CTraceContextSetter[T]): T = {
val spanContext = tracer.getSpanContext
val traceParentSetCarrier =
if (!spanContext.isValid)
carrier
else {
val traceParentHeader = stringFromTraceParent(spanContext)
setter
.set(carrier, TRACE_PARENT, traceParentHeader)
}
val traceState = spanContext.getTraceState
if (traceState.isEmpty)
traceParentSetCarrier
else {
val traceStateHeader = stringFromTraceState(traceState)
setter
.set(traceParentSetCarrier, TRACE_STATE, traceStateHeader)
}
}
/** Extract
*
* @param carrier
* carrier
* @param getter
* getter
* @tparam T
* type of carrier
* @return
* context
*/
def extract[T](
carrier: T
)(implicit getter: W3CTraceContextGetter[T]): Context = {
val maybeTraceParent = for {
traceParentHeader <- if (getter.get(carrier, TRACE_PARENT).isEmpty) none[String] else getter.get(carrier, TRACE_PARENT)
traceParent <- traceParentFromString(traceParentHeader)
} yield traceParent
val maybeSpanContext = maybeTraceParent.map { traceParent =>
val maybeTraceState = for {
traceStateHeader <- if (getter.get(carrier, TRACE_STATE).isEmpty) none[String] else getter.get(carrier, TRACE_STATE)
traceState <- traceStateFromString(traceStateHeader)
} yield traceState
maybeTraceState match {
case Some(v) =>
SpanContext.createFromRemoteParent(
traceParent.getTraceId,
traceParent.getSpanId,
traceParent.getTraceFlags,
v
)
case None =>
SpanContext.createFromRemoteParent(
traceParent.getTraceId,
traceParent.getSpanId,
traceParent.getTraceFlags,
TraceState.getDefault
)
}
}
maybeSpanContext match {
case Some(v) =>
Context.current().`with`(Span.wrap(v))
case None =>
Context.current()
}
}
}
/** Setter */
trait W3CTraceContextSetter[T] {
/** Set
*
* @param carrier
* carrier
* @param key
* key
* @param value
* value
* @return
* carrier
*/
def set(carrier: T, key: String, value: String): T
}
/** Getter */
trait W3CTraceContextGetter[T] {
/** Get
*
* @param carrier
* carrier
* @param key
* key
* @return
* value of string
*/
def get(carrier: T, key: String): String
}
やっていることは愚直にcharを詰め込んだり、その逆をしているだけです。
setter
とgetter
をimplicit
にしてますが、この辺は好みの問題で特に深い理由は無いです。
Akka gRPCで利用する
出来上がったプロパゲーターをAkka gRPCに組み込んでみます。
使用するprotocolはこちら。
とりあえずコンテキストの開始終了の管理にscala.util.Using
を使おうと思うのでヘルパーを用意します。
package com.example
import akka.grpc.scaladsl.Metadata
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
class ContextManager(spanName: String, spanKind: SpanKind, context: Context = Context.root()): AutoCloeable {
val tracer: Tracer = GlobalOpenTelemetry.getTracer("com.example")
val span: Span = tracer
.spanBuilder(spanName)
.setSpanKind(spanKind)
.setParent(context)
.startSpan()
val scope: Scope = span.makeCurrent()
override def close(): Unit = {
scope.close()
span.end()
}
}
単純に親コンテキストからSpan
とScope
を生成し、最終的にclose
します。親コンテキストが指定されなければルートコンテキストが親となります。
まずサーバーから。
package com.example
// importは省略
class GreeterServiceImpl(implicit system: ActorSystem) extends GreeterServicePowerApi {
import system.dispatcher
implicit val metaDataW3CTraceContextGetter: W3CTraceContextGetter[Metadata] =
(carrier: Metadata, key: String) => carrier.getText(key).getOrElse("")
override def sayHello(in: HelloRequest, metadata: Metadata): Future[HelloReply] = {
Future.fromTry {
Using(new ContextManager("SayHello", SpanKind.SERVER, W3CTraceContextPropergator.extract(metadata)) { _ =>
HelloReply(s"Hello, ${in.name}")
}
}
}
}
サーバー側はMetadataへアクセスする場合、通常コンパイルされたサービスではMetadataへのアクセス方法が無く、sbtにakkaGrpcCodeGeneratorSettings += "server_power_apis"
というオプションを設定し、末尾にPowerApiとついたサービスを実装する必要があります。
サーバーをAkka Httpに乗せたりする部分は割愛するのでドキュメント等を参考にしてください。
続いてクライアント。
package com.example
// importは省略
object Main extends App {
implicit def singleResponseRequestBuilderW3CTraceContextSetter[Req, Res]
: W3CTraceContextSetter[SingleResponseRequestBuilder[Req, Res]] =
(
carrier: SingleResponseRequestBuilder[Req, Res],
key: String,
value: String
) => carrier.addHeader(key, value)
implicit val sys = ActorSystem("HelloWorldClient")
implicit val ec = sys.dispatcher
val clientSettings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8080).withTls(false)
val client: GreeterService = GreeterServiceClient(clientSettings)
Using(new ContextManager("call SayHello", SpanKind.CLIENT) {ctx =>
W3CTraceContextPropergator
.inject(client.SayHello(), ctx.tracer)
.invoke(HelloRequest("Alice"))
.onComplete {
case Success(msg) =>
msg
case Failure(e) =>
throw e
}
}.onComplete {
case Success(msg) =>
println(msg)
case Failure(e) =>
println(s"Error: $e")
}
}
Usingの戻り値がTry
に固定されているので助長になっているのは勘弁してください。
クライアントでは引数無しのメソッドを呼ぶことでSingleResponseRequestBuilder
を抽出することができます(参考)。
検証画面
ごめんなさい。テスト環境まで用意するのがしんどくて検証画面がありません。
試したかったら各自試してみてね(無責任)。
終わりに
本当は・・・
本当はCatsのResoureを使ったりしてもっと流れるようなコードを提示したいのですが、コードや解説が輪をかけて長ったらしくなるので辞めておきました。
まとめ
- ただトレースするだけならCloudTraceのドキュメント通りで簡単に実装可能。
- 分散トレーシングにはコンテキストの伝搬が必要。
- プロパゲーターの副作用が嫌だったから自作。
- Akka gRPCにはInterceptorは無いから度々コンテキストの子スパンを作って伝搬してね。
Discussion