Datadog dd-trace-goが何をしているのか追ってみよう
Datadogって?
インフラ監視やログ、アプリケーションのパフォーマンス分析を行うためのクラウドサービスを提供している企業とそのサービス名です。
また、このようなツールを使うことで効率的なパフォーマンス改善や不具合修正を行うことができます。
競合的なサービスはNew RelicやDynatraceなどでしょうか。
かつては私もAWS環境にSSH接続をしてファイルをgrepしたりlessしたりtailしたりでログを追っていましたが、Voicyのようにk8sを使っていて、アプリケーションがたくさんのpodにいるようなサービスでそのようなことを行うことは困難ですので、いつもdatadogには助けられています。お高いですが。
APMって?
Application Performance Managementの略
Datadogに限定された言葉ではなく、ソフトウェアのパフォーマンスを監視して管理すること(情報増えてない)。
Datadogが顧客に提供しているのは、このAPMのために便利なダッシュボードであったり検索機能であったり、パフォーマンス低下や不具合の原因特定のために便利な諸々。
今回はその中でもトレースについて、どのような動きをしているのか気になったのでコードを追ってみました。
dd-trace-goって?
go言語で作られたアプリケーションに、datadogによる監視を組み込むためのライブラリです。
traceって?
トレースは、アプリケーションがリクエストを処理するのにかかった時間とこのリクエストのステータスを追跡するために使用されます。各トレースは、1 つ以上のスパンで構成されます。
とのことです。
traceをどんなふうに使うか
あるAPIにリクエストが来たときにtraceを見ると以下のような事がわかります
全体でどれくらいの時間がかかっているのか
全体のうち最も時間がかかっている処理はどこなのか
そういった処理では例えばどんなSQLが発行されているのか
他にもあるかと思いますが、これらをヒントにパフォーマンス改善や不具合の特定が行えます
こんなふうにきれいな図で見せてくれます。
dd-trace-goはどのようにtraceを集めてdatadogに送信しているのか
traceの開始
アプリケーションからは、tracer.Start()
関数を呼びます。
tracer構造体
t := newTracer(opts...)
というようにtracer構造体を初期化していて、これが重要な役割を担っていそうです。
この構造体には以下のようなコメントがついていました。
// tracer creates, buffers and submits Spans which are used to time blocks of
// computation. They are accumulated and streamed into an internal payload,
// which is flushed to the agent whenever its size exceeds a specific threshold
// or when a certain interval of time has passed, whichever happens first.
//
// tracer operates based on a worker loop which responds to various request
// channels. It additionally holds two buffers which accumulates error and trace
// queues to be processed by the payload encoder.
テクノロジーの力を使って機械翻訳すると以下のようなことが書いてあります。
// トレーサーは、計算ブロックの時間計測に使用されるスパンを作成、バッファリング、送信します。
// スパンは蓄積され、内部ペイロードにストリームされます。
// ペイロードのサイズが特定の閾値を超えるか、一定の時間間隔が経過した場合、そのどちらか早い方でエージェントにフラッシュされます。
// トレーサーは、さまざまなリクエストチャネルに応答するワーカーループに基づいて動作します。
// さらに、ペイロードエンコーダーによって処理されるエラーキューとトレースキューを蓄積する2つのバッファを保持しています。
スパンはバッファに貯められて、バッファがいっぱいになるか、特定の時間が経過したときに、トレースがdatadogに送られる?ってコト!?
newTracer()
go func() {
defer t.wg.Done()
tick := t.config.tickChan
if tick == nil {
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
tick = ticker.C
}
t.worker(tick)
}()
何かを働かせていそうなgoroutineが作られていそうです。
worker()
// worker receives finished traces to be added into the payload, as well
// as periodically flushes traces to the transport.
func (t *tracer) worker(tick <-chan time.Time) {
ワーカーは完了したトレースを受け取ってペイロードに追加し、定期的にトレースを送信します。
tracer構造体のフィールド
// traceWriter is responsible for sending finished traces to their
// destination, such as the Trace Agent or Datadog Forwarder.
traceWriter traceWriter
// 中略
// out receives chunk with spans to be added to the payload.
out chan *chunk
// flush receives a channel onto which it will confirm after a flush has been
// triggered and completed.
flush chan chan<- struct{}
// stop causes the tracer to shut down when closed.
stop chan struct{}
// traceWriterは完了したトレースをTrace AgentやDatadog Forwarderなどの
// 送信先に送る役割を担っています。
traceWriter traceWriter
// outはペイロードに追加されるスパンを含むチャンクを受け取ります。
out chan *chunk
// flushはフラッシュがトリガーされ、完了した後に確認するチャネルを受け取ります。
flush chan chan<- struct{}
// stopはクローズされたときにトレーサーをシャットダウンします。
stop chan struct{}
workerの無限ループの中では
func (t *tracer) worker(tick <-chan time.Time) {
for {
select {
case trace := <-t.out: // spanを受信し、traceWriterに追加している
t.sampleChunk(trace)
if len(trace.spans) != 0 {
t.traceWriter.add(trace.spans)
}
case <-tick: // 時間が来たらデータをflushしている(tickはworkerの引数で、定時で送られるっぽい)
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1)
t.traceWriter.flush()
case done := <-t.flush: // flushしなさいという命令を受信したらflush()
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1)
t.traceWriter.flush()
t.statsd.Flush()
t.stats.flushAndSend(time.Now(), withCurrentBucket)
// TODO(x): In reality, the traceWriter.flush() call is not synchronous
// when using the agent traceWriter. However, this functionality is used
// in Lambda so for that purpose this mechanism should suffice.
done <- struct{}{}
case <-t.stop: // シャットダウン
loop:
// the loop ensures that the payload channel is fully drained
// before the final flush to ensure no traces are lost (see #526)
for {
select {
case trace := <-t.out:
t.sampleChunk(trace)
if len(trace.spans) != 0 {
t.traceWriter.add(trace.spans)
}
default:
break loop
}
}
return
}
}
}
traceWriter flush()
for attempt := 0; attempt <= h.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
log.Debug("Sending payload: size: %d traces: %d\n", size, count)
var rc io.ReadCloser
rc, err = h.config.transport.send(p) // どこかになにかを送っていそう!
if err == nil {
log.Debug("sent traces after %d attempts", attempt+1)
h.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1)
h.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1)
if err := h.prioritySampling.readRatesJSON(rc); err != nil {
h.statsd.Incr("datadog.tracer.decode_error", nil, 1)
}
return
}
log.Error("failure sending traces (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
}
transport send()
func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
req, err := http.NewRequest("POST", t.traceURL, p)
if err != nil {
return nil, fmt.Errorf("cannot create http request: %v", err)
}
POSTしていました。
まとめると
アプリケーションのなかでdatadog agentは、spanのチャネルを受信し続け、一定程度溜まったら、あるいは時間が経過するかでdatadogにデータを送信していることが実装を追うことでわかりました。
しかし、spanがいつどうやって作られるかまではちょっと時間がなく追えなかったので次回に期待。
最後に
間違っている箇所もあろうかと思いますので気になった方はやさしくご指摘いただけますと幸いです。
Discussion