🌉

Apache Kafka を使った非同期処理で分散トレーシングをする with OpenTelemetry

2023/07/27に公開

■ この記事は何か

こんにちは 👋
今回は、Apache Kafka を使ったサービス間でも分散トレーシングしましょう!について記事にまとめました。トレーシングの計装には OpenTelemetry(以下、Otel)を使っています。Kafka や Otel については他の記事に任せて、本記事ではトレーシングの仕組み / 計装にフォーカスを当てようと思います。

Kafka Producer(Msg を送信するひと)と Kafka Consumer(Msg を受信するひと) の間のやりとりを、一気通貫でトレースできることがゴールとなります👶

producer 側 と consumer 側のトレース。Jaeger で可視化。

■ Propagator について

分散トレーシングをする際には、サービス間をトレース情報を伝播させる必要があります。
Otel では Propagators API(以下、Propagator)がその役割を担います。Propagator の仕様は以下に定義されています。
https://opentelemetry.io/docs/specs/otel/context/api-propagators/

Propagator には以下の Type があり、現在は TextMapPropagator のみ定義されています。

  • TextMapPropagator: String key/value データを伝播する
  • Binary Propagator: バイナリデータを伝播する(こちらは将来的に追加されるようです)

情報の伝搬は Carrier(キャリア)という媒体を通して行われます。
Propagator には InjectExtractという操作が定義され、この Operation により Carrier に対しデータの読み書きを行います。

これらを実現する Propagator には W3C TraceContextB3 などの実装があり、Otel のパッケージとして配布されています。Otel セットアップ時に以下のように指定をします。

propagater の設定
# W3C TraceContext を使う場合
tc := propagation.TraceContext{}
otel.SetTextMapPropagator(tc)

# B3 を使う場合
p := b3.New()
otel.SetTextMapPropagator(p)

W3C TraceContext の場合は、以下のようにトレース情報が Carrier に Inject されます。

  • key: traceparent
  • value: {version}-{trace-id}-{parent-id}-{trace-flags}
    (ex: 00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01)

実際のコードは以下で、確かにトレース情報を carrier にセットしていることが分かります。
https://github.com/open-telemetry/opentelemetry-go/blob/main/propagation/trace_context.go#L60-L65

ここまでがトレース情報を伝播する方法で、イメージで表すと以下のようになります!!


Carrier を使ってトレース情報を伝播しているイメージ図

■ Producer / Consumer への計装

では、どのように Kafka の Producer / Consumer に計装していくか見ていきましょう!
サンプルアプリは Go を用い、Kafka のクライアントライブラリには sarama を使いました。
コードは GitHub にアップしています。本記事では Otel のセットアップなどにはほとんど触れないので、興味のある方はご確認ください。

Producer 側

トレース情報を TextMapPropagator を使って Carrier に Inject します。
便利なことに OpenTelemetry の contrib リポジトリにある otelsarama に Carrier の定義があるため、これを使えばよさそうです。

Producer 側
// Span 生成
ctx, span := tracer.Start(context.Background(), "produce message")
defer span.End()

// produce する message の定義
msg := sarama.ProducerMessage{Topic: "otel-topic"}

// carrier を定義
carrier := otelsarama.NewProducerMessageCarrier(&msg)
tc := otel.GetTextMapPropagator() // otel.SetTextMapPropagator で定義された propagator が使われます

// carrier に "traceparent" を Key にトレース情報を Inject
tc.Inject(ctx, carrier)

Consumer 側

Consumer 側も簡単です。こちらでは逆に、トレース情報を Carrier から Extract し、新しい Span を生成すれば良いだけです。

Consumer 側
// consume した message からトレース情報を取り出す
carrier := otelsarama.NewConsumerMessageCarrier(msg)
tc := otel.GetTextMapPropagator() // "SetTextMapPropagator" で定義された propagator が使われます
// producer 側から伝搬された情報がセットされた context を生成
ctx := tc.Extract(context.Background(), carrier)

// この context を使って Consumer 側で Span を生成すれば良い
ctx, span := tracer.Start(ctx, "consume message")
defer span.End()

トレーシングする

以上の計装により、冒頭にお見せしたトレースを取得することができます 🎉
"produce message" と "consume message" という Span が生成されていますね。


producer 側 と consumer 側のトレース。Jaeger で可視化。(再掲)

アプリ内で、Carrier からトレース情報を取得し出力してあげると以下のようになり、W3C TraceContext の仕様に基づいていることも確認できます。

carrier のトレース情報取得/出力
val := carrier.Get("traceparent")
fmt.Println(val)
// 00-dcd5d1f8dd6b2744e74ead276cde7fee-a7dc127ad934144f-01

■ ちょっと補足というか発展

今回、トレース情報の伝搬を追いやすいように Span 生成と TextMapPropagator を使った Carrier へのトレース情報の Inject / Extract を自分で実施しました。
しかし、otelsarama の関数を使うことで更に簡単に計装することができます。WrapAyncProducer などを使って producer をラッパーすると、Kafka Msg を produce する際に Carrier へのトレース情報の Inject や、Span の生成を自動でおこなってくれるためとても便利です。同様に、Consume 側も計装できます。詳しい実装が気になる方は、otelsarama を確認してください。[1]
https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/github.com/Shopify/sarama/otelsarama

Producer 側の Wrap instrumentation
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
	return nil, fmt.Errorf("starting Sarama producer: %w", err)
}

// Wrap instrumentation
producer = otelsarama.WrapAsyncProducer(config, producer)


Producer と Consumer に Wrap Instrumentation を使用

"topic-otel publish" と "topic-otel receive" という Span が追加されていることがわかります。Span の Attribute も充実しており、簡単に計装できました。

■ 最後に

Kafka を使った非同期処理でも Otel を使って簡単にトレーシングできることを書きました!

P.S. 今回の検証環境について

今回の検証環境の環境構築手順。長いので展開してください。

Kafka 構築

Kafka 構築
$ k create ns kafka
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm install kafka-cluster bitnami/kafka -n kafka --version 23.0.7 -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/manifest/values.yaml 

# 確認
$ k get po -nkafka
NAME                            READY   STATUS    RESTARTS   AGE
kafka-cluster-0                 1/1     Running   0          26m

# topic の作成
$ k -n kafka run kafka-client --restart='Never' --image docker.io/bitnami/kafka --command -- sleep infinity
$ k -n kafka exec -i kafka-client -- bash << 'EOS'
  kafka-topics.sh --create --bootstrap-server=kafka-cluster-0.kafka-cluster-headless.kafka.svc.cluster.local:9092 --replication-factor 1 --partitions 2 --topic topic-otel
EOS

# topic 確認
$ k -n kafka exec -i kafka-client -- bash << 'EOS'
  kafka-topics.sh --list --bootstrap-server=kafka-cluster-0.kafka-cluster-headless.kafka.svc.cluster.local:9092
EOS
topic-otel

Otel Collector とか Jaeger 構築

Otel Collector とか Jaeger 構築
$ k create ns cert-manager
$ k create ns observability

$ helm repo add jetstack https://charts.jetstack.io
$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
$ helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
$ helm repo update

# cert manager インストール(Otel Collector で必要)
$ helm install \
  cert-manager jetstack/cert-manager \
  --namespace cert-manager \
  --version v1.12.2 \
  --set installCRDs=true \
  --wait

# 確認
$ k get po -ncert-manager
NAME                                      READY   STATUS    RESTARTS   AGE
cert-manager-869f7b446-bnz5t              1/1     Running   0          22m
cert-manager-cainjector-869d958c8-v9qbg   1/1     Running   0          22m
cert-manager-webhook-67cf5854d-mdw8t      1/1     Running   0          22m

# jaeger のインストール
$ helm install \
  jaeger-operator jaegertracing/jaeger-operator \
  --namespace observability \
  --version 2.46.2 \
  --set rbac.clusterRole=true \
  --wait
  
$ k apply -f - <<EOF
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: jaeger
  namespace: observability
spec:
  strategy: allInOne # default
  storage:
    type: memory
EOF

# Otel Collector のインストール
$ helm install \
  opentelemetry-operator open-telemetry/opentelemetry-operator \
  --namespace observability \
  --version 0.32.0 \
  --set installCRDs=true \
  --wait

$ k apply -f - <<EOF
apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
  name: otelcol
  namespace: observability
spec:
  mode: deployment
  config: |
    receivers:
      otlp:
        protocols:
          grpc:
            endpoint: "0.0.0.0:4317"
    processors:
    exporters:
      otlp:
        endpoint: jaeger-collector.observability.svc.cluster.local:4317
        tls:
          insecure: true
      logging:
        loglevel: debug
    service:
      pipelines:
        traces:
          receivers: [otlp]
          processors: []
          exporters: [logging, otlp]
EOF

# 確認
$ k get po -nobservability
NAME                                      READY   STATUS    RESTARTS   AGE
jaeger-7fd6d9549f-wck4m                   1/1     Running   0          22m
jaeger-operator-bc6d84785-gfcvz           1/1     Running   0          22m
opentelemetry-operator-85478549c9-zcbsj   2/2     Running   0          22m
otelcol-collector-9864f896b-4h8xj         1/1     Running   0          22m

サンプルアプリ

$ k apply -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/app/client/deployments/pod.yaml
$ k apply -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/app/server/deployments/pod.yaml

# 確認
$ k get po -nkafka
NAME                            READY   STATUS    RESTARTS   AGE
kafka-client-545f79f5c4-gls86   1/1     Running   0          30s
kafka-cluster-0                 1/1     Running   0          26m
kafka-server-576d4bf5fd-zc84r   1/1     Running   0          11s

動作

# サンプルアプリケーション(Producer 側)にアクセス
$ k port-forward svc/kafka-client-service 8080:8080 -nkafka
$ curl http://localhost:8080

# Jaeger UI
$ k port-forward svc/jaeger-query 8090:16686 -nobservability

ここまでの環境構築が"成功"していれば、Jaeger(http://localhost:8090)にブラウザで接続してトレースを確認することができます。

脚注
  1. トレーシングの収集において、言語によってはアプリケーションに手動で変更が不要な自動計装 (Auto-Instrumentation) が可能です(Java や Python、Node.js など)。Kafka のトレーシングでは試せていないので、実施した方はぜひ記事執筆をお待ちします。 ↩︎

Discussion