💾

Akka gRPC(Scala)+CloudTraceで分散トレーシング

17 min read

はじめに

CloudTraceとは

概要はこちら

GCP曰く「誰でも使用できる分散トレース システム」とのことで、GKE上で動かしているAkka gRPCで書かれたサービスのトレース結果を下のイメージ(クイックスタートより拝借)のように見たいなぁというのが今回の動機。
トレース画面

Scala環境ではOpenTelemetry(Java)を利用できそうです。

OpenTelemetryとは

概要はこちら

Google翻訳によると「OpenTelemetryは、トレース、メトリック、ログなどのテレメトリデータの作成と管理用に設計された、API、SDK、ツール、および統合のセットです。」らしい。

OpenTracingとOpenCensusの合併した後継プロジェクトであり、CNCFのインキュベーターになっています。

Java以外にもGoとか.Netとか用意されているみたいですね。

Akka gRPCで利用する

分散しないトレーシング

早速GCPのドキュメント「Java と OpenTelemetry」辺りを参考に実装してみました。

めっちゃ簡単じゃん。

と思いきや全然分散トレーシングになっておらず、個々のトレース結果が別々に記録されているだけの寂しいものに・・・。

リモート間でコンテキストを伝搬せよ

仕方ないのでOpenTelemetry側のドキュメントを見てみるとどうやらTraceIDやらSpanIDやらのコンテキストをキャリア(Httpを利用したサービスだったらHttpヘッダとか)に埋め込んでプロパゲーション(伝搬)しろと・・・。

まぁ当たり前か。

でも親切なことにW3C準拠のコンテキストのプロパゲーター(伝搬屋)と呼ばれるコンテキストのパーサーが標準で用意してあるのでそれを使えば良いと書いてありますね。

JavaのgRPCだったら・・・

gRPCではHttpヘッダのように外部データを埋め込みたい場合は、Metadataを利用すれば良さそうです。

これまた親切なことにgithubのexamples/grpcにInterceptorを利用した実装例が載っています。

Javaの標準gRPCだったら苦も無く実装できそうですね・・・。

Akka gRPC故に

前提として、Akka gRPCのMetadataへのデータ埋め込み方法は、SingleResponseRequestBuilder(もしくはStreamResponseRequestBuilder)のaddHeader(key: String, value: String)メソッドを呼ぶことでMetadataの埋め込まれたSingleResponseRequestBuilder返ってきます参考)。

そしてOpenTelemetryに標準で用意されているTextMapPropergetorTextMapSetter<T>インターフェースを受け取るinjectメソッドと、TextMapGetter<T>インターフェースを受け取るextractメソッドにより文字列←→コンテキストのパース処理を行います(T型はキャリアの型)。

TextMapSetter<T>で実装を必要とするset(carrier: T, key: String, value: String)メソッドの戻り値はUnitであり、更新されたT返ってきません

Akka gRPCの場合、TextMapSetter<T>のT型はSingleResponseRequestBuilderになるわけなんですが・・・。

シグネチャが噛み合いません。ResponseRequestBuilderのラッパーを用意してゴニョゴニョすることもできそうですが、副作用くさくなるのは許容できません。

あ、あとInterceptorも無いです(コンセプトが違うから用意する気ないよと言っておられる)。メソッド毎にトレーサーの埋め込み・取り出し処理が必要です。←これが地味にしんどかった。

プロパゲーターを実装する

しようがないでAkka gRPC用にプロパゲーターを自力で実装することにしました。

とりあえずコンテキストはW3C準拠でここら辺を参考に実装します。

長いので畳みます
package com.example

import java.util.regex.Pattern

import scala.jdk.CollectionConverters.*
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import cats.implicits.*
import io.opentelemetry.api.internal.OtelEncodingUtils
import io.opentelemetry.api.internal.TemporaryBuffers
import io.opentelemetry.api.internal.Utils.checkArgument
import io.opentelemetry.api.trace.*
import io.opentelemetry.context.Context
import net.logstash.logback.argument.StructuredArguments.keyValue

/** W3C Trace context propergator */
object W3CTraceContextPropergator {

  private val TRACE_PARENT = "traceparent"

  private val TRACE_STATE = "tracestate"

  private val VERSION = "00"

  private val VERSION_SIZE = 2

  private val TRACEPARENT_DELIMITER = '-'

  private val TRACEPARENT_DELIMITER_SIZE = 1

  private val TRACE_ID_HEX_SIZE = TraceId.getLength

  private val SPAN_ID_HEX_SIZE = SpanId.getLength

  private val TRACE_OPTION_HEX_SIZE = TraceFlags.getLength

  private val TRACE_ID_OFFSET = VERSION_SIZE + TRACEPARENT_DELIMITER_SIZE

  private val SPAN_ID_OFFSET =
    TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE

  private val TRACE_OPTION_OFFSET =
    SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE

  private val TRACEPARENT_HEADER_SIZE =
    TRACE_OPTION_OFFSET + TRACE_OPTION_HEX_SIZE

  private val TRACESTATE_MAX_SIZE = 512

  private val TRACESTATE_MAX_MEMBERS = 32

  private val TRACESTATE_KEY_VALUE_DELIMITER = '='

  private val TRACESTATE_ENTRY_DELIMITER = ','

  private val TRACESTATE_ENTRY_DELIMITER_SPLIT_PATTERN =
    Pattern.compile("[ \t]*" + TRACESTATE_ENTRY_DELIMITER + "[ \t]*")

  private def stringFromTraceParent(traceParent: SpanContext) = Try {
    val chars = TemporaryBuffers.chars(TRACEPARENT_HEADER_SIZE)

    chars.update(0, VERSION.charAt(0))
    chars.update(1, VERSION.charAt(1))
    chars.update(2, TRACEPARENT_DELIMITER)

    val traceId = traceParent.getTraceId

    traceId.toCharArray.zipWithIndex.foreach { case (c, i) =>
      chars.update(TRACE_ID_OFFSET + i, c)
    }

    chars.update(SPAN_ID_OFFSET - 1, TRACEPARENT_DELIMITER)

    val spanId = traceParent.getSpanId

    spanId.toCharArray.zipWithIndex.foreach { case (c, i) =>
      chars.update(SPAN_ID_OFFSET + i, c)
    }

    chars.update(TRACE_OPTION_OFFSET - 1, TRACEPARENT_DELIMITER)

    val traceFlagsHex = traceParent.getTraceFlags.asHex()
    chars.update(TRACE_OPTION_OFFSET, traceFlagsHex.charAt(0))
    chars.update(TRACE_OPTION_OFFSET + 1, traceFlagsHex.charAt(1))

    new String(chars, 0, TRACEPARENT_HEADER_SIZE)
  } match {
    case Success(v) =>
      v

    case Failure(ex) =>
      println(s"Error: $ex")

      ""
  }

  private def stringFromTraceState(traceState: TraceState) = Try {
    val stringBuilder = traceState
      .asMap()
      .asScala
      .foldLeft(new StringBuilder(TRACESTATE_MAX_SIZE)) {
        case (acc, (key, value)) =>
          if (acc.nonEmpty)
            acc.apply(TRACESTATE_ENTRY_DELIMITER)

          acc
            .append(key)
            .append(TRACESTATE_KEY_VALUE_DELIMITER)
            .append(value)
      }

    stringBuilder.toString()
  } match {
    case Success(v) =>
      v

    case Failure(ex) =>
      println(s"Error: $ex")

      ""
  }

  private def traceParentFromString(traceParentHeader: String) =
    Try {
      if (
        !((traceParentHeader.length == TRACEPARENT_HEADER_SIZE ||
          traceParentHeader.length > TRACEPARENT_HEADER_SIZE &&
          traceParentHeader
            .charAt(TRACEPARENT_HEADER_SIZE) == TRACEPARENT_DELIMITER) &&
          traceParentHeader.charAt(
            TRACE_ID_OFFSET - 1
          ) == TRACEPARENT_DELIMITER &&
          traceParentHeader.charAt(
            SPAN_ID_OFFSET - 1
          ) == TRACEPARENT_DELIMITER &&
          traceParentHeader.charAt(
            TRACE_OPTION_OFFSET - 1
          ) == TRACEPARENT_DELIMITER)
      )
        throw new IllegalArgumentException("Invalid trace parent header.")
      else {
        val version = traceParentHeader.substring(0, VERSION_SIZE)

        if (version != VERSION)
          throw new IllegalArgumentException("Invalid trace version.")
        else {
          val traceIdHex = traceParentHeader.substring(
            TRACE_ID_OFFSET,
            TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE
          )

          val spanIdHex = traceParentHeader.substring(
            SPAN_ID_OFFSET,
            SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE
          )

          val firstTraceFlagsChar =
            traceParentHeader.charAt(TRACE_OPTION_OFFSET)

          val secondTraceFlagsChar =
            traceParentHeader.charAt(TRACE_OPTION_OFFSET + 1)

          if (
            !OtelEncodingUtils.isValidBase16Character(firstTraceFlagsChar)
            || !OtelEncodingUtils.isValidBase16Character(secondTraceFlagsChar)
          )
            throw new IllegalArgumentException("Invalid trace flags.")
          else {
            val traceFlags = TraceFlags.fromByte(
              OtelEncodingUtils
                .byteFromBase16(firstTraceFlagsChar, secondTraceFlagsChar)
            )

            SpanContext
              .createFromRemoteParent(
                traceIdHex,
                spanIdHex,
                traceFlags,
                TraceState.getDefault
              )
          }
        }
      }
    } match {
      case Success(v) =>
        v.some

      case Failure(ex) =>
        println(s"Error: $ex")

        none[SpanContext]
    }

  private def traceStateFromString(traceStateHeader: String) = Try {
    val listMembers =
      TRACESTATE_ENTRY_DELIMITER_SPLIT_PATTERN.split(traceStateHeader)

    checkArgument(
      listMembers.length <= TRACESTATE_MAX_MEMBERS,
      "TraceState has too many elements."
    )

    val traceStateBuilder = listMembers.reverse.foldLeft(TraceState.builder()) {
      case (acc, v) =>
        val index = v.indexOf(TRACESTATE_KEY_VALUE_DELIMITER)

        checkArgument(index != -1, "Invalid TraceState list-member format.")

        acc.put(v.substring(0, index), v.substring(index + 1))
    }

    val traceState = traceStateBuilder.build()

    if (traceState.size() != listMembers.length)
      throw new IllegalArgumentException("Invalid trace state size.")
    else
      traceState.some
  } match {
    case Success(v) =>
      v

    case Failure(ex) =>
      println(s"Error: $ex")

      none[TraceState]
  }

  /** Inject
    *
    * @param carrier
    *   carrier
    * @param tracer
    *   tracer
    * @param setter
    *   setter
    * @tparam T
    *   type of carrier
    */
  def inject[T](
    carrier: T,
    tracer: OpenTelemetryTracer
  )(implicit setter: W3CTraceContextSetter[T]): T = {
    val spanContext = tracer.getSpanContext

    val traceParentSetCarrier =
      if (!spanContext.isValid)
        carrier
      else {
        val traceParentHeader = stringFromTraceParent(spanContext)

        setter
          .set(carrier, TRACE_PARENT, traceParentHeader)
      }

    val traceState = spanContext.getTraceState

    if (traceState.isEmpty)
      traceParentSetCarrier
    else {
      val traceStateHeader = stringFromTraceState(traceState)

      setter
        .set(traceParentSetCarrier, TRACE_STATE, traceStateHeader)
    }
  }

  /** Extract
    *
    * @param carrier
    *   carrier
    * @param getter
    *   getter
    * @tparam T
    *   type of carrier
    * @return
    *   context
    */
  def extract[T](
    carrier: T
  )(implicit getter: W3CTraceContextGetter[T]): Context = {
    val maybeTraceParent = for {
      traceParentHeader <- if (getter.get(carrier, TRACE_PARENT).isEmpty) none[String] else getter.get(carrier, TRACE_PARENT)
      traceParent       <- traceParentFromString(traceParentHeader)
    } yield traceParent

    val maybeSpanContext = maybeTraceParent.map { traceParent =>
      val maybeTraceState = for {
        traceStateHeader <- if (getter.get(carrier, TRACE_STATE).isEmpty) none[String] else getter.get(carrier, TRACE_STATE)
        traceState       <- traceStateFromString(traceStateHeader)
      } yield traceState

      maybeTraceState match {
        case Some(v) =>
          SpanContext.createFromRemoteParent(
            traceParent.getTraceId,
            traceParent.getSpanId,
            traceParent.getTraceFlags,
            v
          )

        case None =>
          SpanContext.createFromRemoteParent(
            traceParent.getTraceId,
            traceParent.getSpanId,
            traceParent.getTraceFlags,
            TraceState.getDefault
          )
      }
    }

    maybeSpanContext match {
      case Some(v) =>
        Context.current().`with`(Span.wrap(v))

      case None =>
        Context.current()
    }
  }
}
/** Setter */
trait W3CTraceContextSetter[T] {

  /** Set
    *
    * @param carrier
    *   carrier
    * @param key
    *   key
    * @param value
    *   value
    * @return
    *   carrier
    */
  def set(carrier: T, key: String, value: String): T
}

/** Getter */
trait W3CTraceContextGetter[T] {

  /** Get
    *
    * @param carrier
    *   carrier
    * @param key
    *   key
    * @return
    *   value of string
    */
  def get(carrier: T, key: String): String
}

やっていることは愚直にcharを詰め込んだり、その逆をしているだけです。

settergetterimplicitにしてますが、この辺は好みの問題で特に深い理由は無いです。

Akka gRPCで利用する

出来上がったプロパゲーターをAkka gRPCに組み込んでみます。

使用するprotocolはこちら

とりあえずコンテキストの開始終了の管理にscala.util.Usingを使おうと思うのでヘルパーを用意します。

package com.example

import akka.grpc.scaladsl.Metadata
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope

class ContextManager(spanName: String, spanKind: SpanKind, context: Context = Context.root()): AutoCloeable {
  val tracer: Tracer = GlobalOpenTelemetry.getTracer("com.example")
  
  val span: Span = tracer
    .spanBuilder(spanName)
    .setSpanKind(spanKind)
    .setParent(context)
    .startSpan()
    
  val scope: Scope = span.makeCurrent()
  
  override def close(): Unit = {
    scope.close()
    span.end()
  }
}

単純に親コンテキストからSpanScopeを生成し、最終的にcloseします。親コンテキストが指定されなければルートコンテキストが親となります。

まずサーバーから。

package com.example

// importは省略

class GreeterServiceImpl(implicit system: ActorSystem) extends GreeterServicePowerApi {
  import system.dispatcher

  implicit val metaDataW3CTraceContextGetter: W3CTraceContextGetter[Metadata] =
    (carrier: Metadata, key: String) => carrier.getText(key).getOrElse("")
  
  override def sayHello(in: HelloRequest, metadata: Metadata): Future[HelloReply] = {
    Future.fromTry {
      Using(new ContextManager("SayHello", SpanKind.SERVER, W3CTraceContextPropergator.extract(metadata)) { _ =>
        HelloReply(s"Hello, ${in.name}")
      }
    }
  }
}

サーバー側はMetadataへアクセスする場合、通常コンパイルされたサービスではMetadataへのアクセス方法が無く、sbtにakkaGrpcCodeGeneratorSettings += "server_power_apis"というオプションを設定し、末尾にPowerApiとついたサービスを実装する必要があります。

サーバーをAkka Httpに乗せたりする部分は割愛するのでドキュメント等を参考にしてください。

続いてクライアント。

package com.example

// importは省略

object Main extends App {
  implicit def singleResponseRequestBuilderW3CTraceContextSetter[Req, Res]
    : W3CTraceContextSetter[SingleResponseRequestBuilder[Req, Res]] =
    (
      carrier: SingleResponseRequestBuilder[Req, Res],
      key: String,
      value: String
    ) => carrier.addHeader(key, value)

  implicit val sys = ActorSystem("HelloWorldClient")
  implicit val ec = sys.dispatcher
  
  val clientSettings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8080).withTls(false)
  
  val client: GreeterService = GreeterServiceClient(clientSettings)
  
  Using(new ContextManager("call SayHello", SpanKind.CLIENT) {ctx =>
    W3CTraceContextPropergator
      .inject(client.SayHello(), ctx.tracer)
      .invoke(HelloRequest("Alice"))
      .onComplete {
        case Success(msg) =>
	  msg
	  
	case Failure(e) =>
	  throw e
      }
  }.onComplete {
    case Success(msg) =>
      println(msg)
      
    case Failure(e) =>
      println(s"Error: $e")
  }
}

Usingの戻り値がTryに固定されているので助長になっているのは勘弁してください。

クライアントでは引数無しのメソッドを呼ぶことでSingleResponseRequestBuilderを抽出することができます(参考)。

検証画面

ごめんなさい。テスト環境まで用意するのがしんどくて検証画面がありません。

試したかったら各自試してみてね(無責任)。

終わりに

本当は・・・

本当はCatsのResoureを使ったりしてもっと流れるようなコードを提示したいのですが、コードや解説が輪をかけて長ったらしくなるので辞めておきました。

まとめ

  • ただトレースするだけならCloudTraceのドキュメント通りで簡単に実装可能。
  • 分散トレーシングにはコンテキストの伝搬が必要。
  • プロパゲーターの副作用が嫌だったから自作。
  • Akka gRPCにはInterceptorは無いから度々コンテキストの子スパンを作って伝搬してね。

Discussion

ログインするとコメントできます