🕌

Open TelemetryのトレースをMQTT越しに送りたい

2024/12/05に公開

こんにちは。しろうです。

この記事は OpenTelemetry Advent Calendar 2024の5日目の記事です。

はじめに

OpenTelemetryではTrace Contextというトレースするために必要な情報を埋め込み、これを伝搬(Propagation)していくことで、分散した環境においても一貫したトレースとなるようにしています。

例えばHTTPでは、HTTPヘッダにこのTrace Contextを埋め込んでいます。以下みたいな感じです。

traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
tracestate: congo=t61rcWkgMzE

詳細は Propagators APIや、 W3C Trace Contextのあたりをご確認ください。

Trace Context over MQTT

ではMQTTを使っている場合、どのようにこのTrace Contextを送ればいいでしょうか。

以下のURLにW3Cの仕様があります。しかし、2022年に最初のWorking Draftになったまま全く動きがないようです。しようがないですね。

といっても、内容自体は簡単です。

  • MQTT v3の場合: PayloadをJSONにし、その中に埋め込むこと
  • MQTT v5の場合: User Propertiesに埋め込むこと

というだけです。

User Propertiesって?

MQTT v3のみを知っている方には馴染みがないかも知れませんが、MQTT v5では User Properties という仕様が追加されました。これは、MQTTのメッセージの中にメタデータとして任意の情報を埋め込める、というものです。

MQTT v3?知らない子ですね…

実装

サンプル実装を以下のレポジトリに上げておきます。

https://github.com/shirou/mqttotel

といっても、ほとんどの内容はすでにライブラリ内に実装されているので難しいことはありません。Trace Contextを運ぶ TextMapCarrier Interface を実装するのみです。短いので全部載っけます。

package propagation

import (
	"github.com/eclipse/paho.golang/packets"
	"github.com/eclipse/paho.golang/paho"
)

// MQTTCarrier adapts http.Header to satisfy the TextMapCarrier interface.
type MQTTCarrier struct {
	*paho.Publish
}

func NewMQTTCarrier(p *paho.Publish) *MQTTCarrier {
	if p.Properties == nil {
		p.InitProperties(&packets.Properties{
			User: make([]packets.User, 0),
		})
	}
	return &MQTTCarrier{p}
}

// Get returns the value associated with the passed key.
func (mc MQTTCarrier) Get(key string) string {
	return mc.Properties.User.Get(key)
}

// Set stores the key-value pair.
func (mc MQTTCarrier) Set(key string, value string) {
	mc.Properties.User.Add(key, value)
}

// Keys lists the keys stored in this carrier.
func (mc MQTTCarrier) Keys() []string {
	keys := make([]string, 0, len(mc.Properties.User))
	for _, p := range mc.Properties.User {
		keys = append(keys, p.Key)
	}
	return keys
}

簡単ですね。あとは、このCarrierを送信側でcontextに入れ込む inject と受信側で取り出すextractを実装し、MQTTにpublishするとき、あるいはメッセージを受け取った後に呼び出すのみです。

// inject injects the span context into the MQTT message.
func inject(ctx context.Context, p *paho.Publish) {
	carrier := mqttPropagation.NewMQTTCarrier(p)
	otel.GetTextMapPropagator().Inject(ctx, carrier)
}

// extract extracts the span context from the MQTT message.
func extract(ctx context.Context, pr *paho.PublishReceived) context.Context {
	carrier := mqttPropagation.NewMQTTCarrier(pr.Packet)
	return otel.GetTextMapPropagator().Extract(ctx, carrier)
}

以下はmqttotel-pubから2つのメッセージを送った例です。mqttotel-pub から mqttotel-subにトレースがつながっていることが確認できました。

トレースがつながっている様子

おわりに

User Propertyに情報を入れると当然のことながらメッセージサイズが大きくなるので、特に通信量に敏感なアプリケーションでは用法用量を守ってお使いください。

ご参考

Discussion