Apache Kafka を使った非同期処理で分散トレーシングをする with OpenTelemetry
■ この記事は何か
こんにちは 👋
今回は、Apache Kafka を使ったサービス間でも分散トレーシングしましょう!について記事にまとめました。トレーシングの計装には OpenTelemetry(以下、Otel)を使っています。Kafka や Otel については他の記事に任せて、本記事ではトレーシングの仕組み / 計装にフォーカスを当てようと思います。
Kafka Producer(Msg を送信するひと)と Kafka Consumer(Msg を受信するひと) の間のやりとりを、一気通貫でトレースできることがゴールとなります👶
producer 側 と consumer 側のトレース。Jaeger で可視化。
■ Propagator について
分散トレーシングをする際には、サービス間をトレース情報を伝播させる必要があります。
Otel では Propagators API(以下、Propagator)がその役割を担います。Propagator の仕様は以下に定義されています。
Propagator には以下の Type があり、現在は TextMapPropagator
のみ定義されています。
-
TextMapPropagator
: String key/value データを伝播する -
Binary Propagator
: バイナリデータを伝播する(こちらは将来的に追加されるようです)
情報の伝搬は Carrier(キャリア)という媒体を通して行われます。
Propagator には Inject
と Extract
という操作が定義され、この Operation により Carrier に対しデータの読み書きを行います。
これらを実現する Propagator には W3C TraceContext や B3 などの実装があり、Otel のパッケージとして配布されています。Otel セットアップ時に以下のように指定をします。
# W3C TraceContext を使う場合
tc := propagation.TraceContext{}
otel.SetTextMapPropagator(tc)
# B3 を使う場合
p := b3.New()
otel.SetTextMapPropagator(p)
W3C TraceContext の場合は、以下のようにトレース情報が Carrier に Inject
されます。
- key:
traceparent
- value:
{version}-{trace-id}-{parent-id}-{trace-flags}
(ex: 00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01)
実際のコードは以下で、確かにトレース情報を carrier にセットしていることが分かります。
ここまでがトレース情報を伝播する方法で、イメージで表すと以下のようになります!!
Carrier を使ってトレース情報を伝播しているイメージ図
■ Producer / Consumer への計装
では、どのように Kafka の Producer / Consumer に計装していくか見ていきましょう!
サンプルアプリは Go を用い、Kafka のクライアントライブラリには sarama を使いました。
コードは GitHub にアップしています。本記事では Otel のセットアップなどにはほとんど触れないので、興味のある方はご確認ください。
Producer 側
トレース情報を TextMapPropagator
を使って Carrier に Inject
します。
便利なことに OpenTelemetry の contrib リポジトリにある otelsarama に Carrier の定義があるため、これを使えばよさそうです。
// Span 生成
ctx, span := tracer.Start(context.Background(), "produce message")
defer span.End()
// produce する message の定義
msg := sarama.ProducerMessage{Topic: "otel-topic"}
// carrier を定義
carrier := otelsarama.NewProducerMessageCarrier(&msg)
tc := otel.GetTextMapPropagator() // otel.SetTextMapPropagator で定義された propagator が使われます
// carrier に "traceparent" を Key にトレース情報を Inject
tc.Inject(ctx, carrier)
Consumer 側
Consumer 側も簡単です。こちらでは逆に、トレース情報を Carrier から Extract
し、新しい Span を生成すれば良いだけです。
// consume した message からトレース情報を取り出す
carrier := otelsarama.NewConsumerMessageCarrier(msg)
tc := otel.GetTextMapPropagator() // "SetTextMapPropagator" で定義された propagator が使われます
// producer 側から伝搬された情報がセットされた context を生成
ctx := tc.Extract(context.Background(), carrier)
// この context を使って Consumer 側で Span を生成すれば良い
ctx, span := tracer.Start(ctx, "consume message")
defer span.End()
トレーシングする
以上の計装により、冒頭にお見せしたトレースを取得することができます 🎉
"produce message" と "consume message" という Span が生成されていますね。
producer 側 と consumer 側のトレース。Jaeger で可視化。(再掲)
アプリ内で、Carrier からトレース情報を取得し出力してあげると以下のようになり、W3C TraceContext
の仕様に基づいていることも確認できます。
val := carrier.Get("traceparent")
fmt.Println(val)
// 00-dcd5d1f8dd6b2744e74ead276cde7fee-a7dc127ad934144f-01
■ ちょっと補足というか発展
今回、トレース情報の伝搬を追いやすいように Span 生成と TextMapPropagator
を使った Carrier へのトレース情報の Inject
/ Extract
を自分で実施しました。
しかし、otelsarama の関数を使うことで更に簡単に計装することができます。WrapAyncProducer などを使って producer をラッパーすると、Kafka Msg を produce する際に Carrier へのトレース情報の Inject
や、Span の生成を自動でおこなってくれるためとても便利です。同様に、Consume 側も計装できます。詳しい実装が気になる方は、otelsarama を確認してください。[1]
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
return nil, fmt.Errorf("starting Sarama producer: %w", err)
}
// Wrap instrumentation
producer = otelsarama.WrapAsyncProducer(config, producer)
Producer と Consumer に Wrap Instrumentation を使用
"topic-otel publish" と "topic-otel receive" という Span が追加されていることがわかります。Span の Attribute も充実しており、簡単に計装できました。
■ 最後に
Kafka を使った非同期処理でも Otel を使って簡単にトレーシングできることを書きました!
P.S. 今回の検証環境について
今回の検証環境の環境構築手順。長いので展開してください。
Kafka 構築
$ k create ns kafka
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm install kafka-cluster bitnami/kafka -n kafka --version 23.0.7 -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/manifest/values.yaml
# 確認
$ k get po -nkafka
NAME READY STATUS RESTARTS AGE
kafka-cluster-0 1/1 Running 0 26m
# topic の作成
$ k -n kafka run kafka-client --restart='Never' --image docker.io/bitnami/kafka --command -- sleep infinity
$ k -n kafka exec -i kafka-client -- bash << 'EOS'
kafka-topics.sh --create --bootstrap-server=kafka-cluster-0.kafka-cluster-headless.kafka.svc.cluster.local:9092 --replication-factor 1 --partitions 2 --topic topic-otel
EOS
# topic 確認
$ k -n kafka exec -i kafka-client -- bash << 'EOS'
kafka-topics.sh --list --bootstrap-server=kafka-cluster-0.kafka-cluster-headless.kafka.svc.cluster.local:9092
EOS
topic-otel
Otel Collector とか Jaeger 構築
$ k create ns cert-manager
$ k create ns observability
$ helm repo add jetstack https://charts.jetstack.io
$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
$ helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
$ helm repo update
# cert manager インストール(Otel Collector で必要)
$ helm install \
cert-manager jetstack/cert-manager \
--namespace cert-manager \
--version v1.12.2 \
--set installCRDs=true \
--wait
# 確認
$ k get po -ncert-manager
NAME READY STATUS RESTARTS AGE
cert-manager-869f7b446-bnz5t 1/1 Running 0 22m
cert-manager-cainjector-869d958c8-v9qbg 1/1 Running 0 22m
cert-manager-webhook-67cf5854d-mdw8t 1/1 Running 0 22m
# jaeger のインストール
$ helm install \
jaeger-operator jaegertracing/jaeger-operator \
--namespace observability \
--version 2.46.2 \
--set rbac.clusterRole=true \
--wait
$ k apply -f - <<EOF
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: jaeger
namespace: observability
spec:
strategy: allInOne # default
storage:
type: memory
EOF
# Otel Collector のインストール
$ helm install \
opentelemetry-operator open-telemetry/opentelemetry-operator \
--namespace observability \
--version 0.32.0 \
--set installCRDs=true \
--wait
$ k apply -f - <<EOF
apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
name: otelcol
namespace: observability
spec:
mode: deployment
config: |
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
processors:
exporters:
otlp:
endpoint: jaeger-collector.observability.svc.cluster.local:4317
tls:
insecure: true
logging:
loglevel: debug
service:
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [logging, otlp]
EOF
# 確認
$ k get po -nobservability
NAME READY STATUS RESTARTS AGE
jaeger-7fd6d9549f-wck4m 1/1 Running 0 22m
jaeger-operator-bc6d84785-gfcvz 1/1 Running 0 22m
opentelemetry-operator-85478549c9-zcbsj 2/2 Running 0 22m
otelcol-collector-9864f896b-4h8xj 1/1 Running 0 22m
サンプルアプリ
$ k apply -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/app/client/deployments/pod.yaml
$ k apply -f https://raw.githubusercontent.com/keisukesakasai/work-kafka-otel/main/app/server/deployments/pod.yaml
# 確認
$ k get po -nkafka
NAME READY STATUS RESTARTS AGE
kafka-client-545f79f5c4-gls86 1/1 Running 0 30s
kafka-cluster-0 1/1 Running 0 26m
kafka-server-576d4bf5fd-zc84r 1/1 Running 0 11s
動作
# サンプルアプリケーション(Producer 側)にアクセス
$ k port-forward svc/kafka-client-service 8080:8080 -nkafka
$ curl http://localhost:8080
# Jaeger UI
$ k port-forward svc/jaeger-query 8090:16686 -nobservability
ここまでの環境構築が"成功"していれば、Jaeger(http://localhost:8090
)にブラウザで接続してトレースを確認することができます。
-
トレーシングの収集において、言語によってはアプリケーションに手動で変更が不要な自動計装 (Auto-Instrumentation) が可能です(Java や Python、Node.js など)。Kafka のトレーシングでは試せていないので、実施した方はぜひ記事執筆をお待ちします。 ↩︎
Discussion