Open8

opentelemetry-go/opentelemetry-goの実装を読み解く

あかりあかり

前提として、この構成を理解しておく必要がある。
https://opentelemetry.io/docs/specs/otel/library-guidelines/


では、open-telemetry/opentelemetry-goは上記の構成をどのように実装しているのかを見ていく。

OpenTelemetryには、signalsと呼ばれる独立したobservability toolsがあり、Context Propagationという仕組みを共有している。

そして、それぞれのシグナルは同様の構成で作られている。

つまり、シグナル一つに着目して実装を読み解くことで大体の実装を理解することができる。
https://opentelemetry.io/docs/specs/otel/overview/

ここでは、stableであるTraceに着目して実装を読み解いていく。

あかりあかり

まず、プロジェクトの構成は以下のような感じ。

.
├── internal
│   └── global
│      └── state.go
├── sdk # コアロジック
│   ├── doc.go
│   └── optional_function.go
├── trace # APIの具体的な実装
│   ├── doc.go
│   └── optional_function.go
├── trace.go # API
├── go.mod
└── go.sum
あかりあかり

The API package is a self-sufficient dependency, in the sense that if the end-user application or a third-party library depends only on it and does not plug a full SDK implementation then the application will still build and run without failing, although no telemetry data will be actually delivered to a telemetry backend.

アプリケーションや3rd-partyライブラリがSDKの実装を利用していなかったとしてもビルドできて、失敗せずに動作する必要がある(下図のように)。これをどう実現しているのか?

opentelemetry-go/internal/global/state.go
package global

type (
	tracerProviderHolder struct {
		tp trace.TracerProvider
	}
)

var (
	globalTracer        = defaultTracerValue()
)

func TracerProvider() trace.TracerProvider {
	return globalTracer.Load().(tracerProviderHolder).tp
}

func defaultTracerValue() *atomic.Value {
	v := &atomic.Value{}
	v.Store(tracerProviderHolder{tp: &tracerProvider{}})
	return v
}
opentelemetry-go/internal/global/trace.go
package global

type tracerProvider struct {
	embedded.TracerProvider

	mtx      sync.Mutex
	tracers  map[il]*tracer
	delegate trace.TracerProvider
}

変数globalTracerにデフォルト値としてdefaultTracerValue()を入れているのポイント。

あかりあかり

じゃあ、SDKがちゃんと利用されるようになっている場合はどうか?

SDKを利用するというのは、以下のようにTraceProviderをセットアップした上でTraceを取得するということ。

sdktrace "go.opentelemetry.io/otel/sdk/trace"

tp = sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
tr := otel.Tracer(moduleName)

つまり、otel.SetTracerProvider(tp)が何をしているのかを見れば良い。

opentelemetry-go/trace.go
package otel

import (
	"go.opentelemetry.io/otel/internal/global"
	"go.opentelemetry.io/otel/trace"
)

func SetTracerProvider(tp trace.TracerProvider) {
	global.SetTracerProvider(tp)
}
opentelemetry-go/internal/global/state.go
package global

type (
	tracerProviderHolder struct {
		tp trace.TracerProvider
	}
)

var (
    //
	globalTracer        = defaultTracerValue()

    //
	delegateTraceOnce             sync.Once
)

func TracerProvider() trace.TracerProvider {
	return globalTracer.Load().(tracerProviderHolder).tp
}

func SetTracerProvider(tp trace.TracerProvider) {
	current := TracerProvider()

	if _, cOk := current.(*tracerProvider); cOk {
		if _, tpOk := tp.(*tracerProvider); tpOk && current == tp {
			// Do not assign the default delegating TracerProvider to delegate
			// to itself.
			Error(
				errors.New("no delegate configured in tracer provider"),
				"Setting tracer provider to its current value. No delegate will be configured",
			)
			return
		}
	}

	delegateTraceOnce.Do(func() {
		if def, ok := current.(*tracerProvider); ok {
			def.setDelegate(tp)
		}
	})
	globalTracer.Store(tracerProviderHolder{tp: tp})
}
opentelemetry-go/internal/global/trace.go
package global

type tracerProvider struct {
	embedded.TracerProvider

	mtx      sync.Mutex
	tracers  map[il]*tracer
	delegate trace.TracerProvider
}

func (p *tracerProvider) setDelegate(provider trace.TracerProvider) {
	p.mtx.Lock()
	defer p.mtx.Unlock()

	p.delegate = provider

	if len(p.tracers) == 0 {
		return
	}

	for _, t := range p.tracers {
		t.setDelegate(provider)
	}

	p.tracers = nil
}
あかりあかり
import (
    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    "google.golang.org/grpc"
)

type serverHandler struct {
	*config
}

//もしかしたらgrpc.StatsHandlerをセットしなくても、Unaryのトレースをとるだけなら可能かも
s := grpc.NewServer(grpc.StatsHandler(otelgrpc.NewServerHandler(
    otelgrpc.WithTracerProvider(tp),
    otelgrpc.WithPropagators(p),
)))
instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
package otelgrpc

func NewServerHandler(opts ...Option) stats.Handler {
	h := &serverHandler{
		config: newConfig(opts, "server"),
	}

	return h
}
opentelemetry-go-contrib/instrumentation/google.golang.org/grpc/otelgrpc/config.go
package otelgrpc

type config struct {
	Filter            Filter
	InterceptorFilter InterceptorFilter
	Propagators       propagation.TextMapPropagator
	TracerProvider    trace.TracerProvider
	MeterProvider     metric.MeterProvider
	SpanStartOptions  []trace.SpanStartOption

	ReceivedEvent bool
	SentEvent     bool

	tracer trace.Tracer
	meter  metric.Meter

	rpcDuration        metric.Float64Histogram
	rpcRequestSize     metric.Int64Histogram
	rpcResponseSize    metric.Int64Histogram
	rpcRequestsPerRPC  metric.Int64Histogram
	rpcResponsesPerRPC metric.Int64Histogram
}

// Option applies an option value for a config.
type Option interface {
	apply(*config)
}

func newConfig(opts []Option, role string) *config {
	c := &config{
		Propagators:    otel.GetTextMapPropagator(),
		TracerProvider: otel.GetTracerProvider(),
		MeterProvider:  otel.GetMeterProvider(),
	}
	for _, o := range opts {
		o.apply(c)
	}

	c.tracer = c.TracerProvider.Tracer(
		ScopeName,
		trace.WithInstrumentationVersion(SemVersion()),
	)

	c.meter = c.MeterProvider.Meter(
		ScopeName,
		metric.WithInstrumentationVersion(Version()),
		metric.WithSchemaURL(semconv.SchemaURL),
	)

	var err error
	c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
		metric.WithDescription("Measures the duration of inbound RPC."),
		metric.WithUnit("ms"))
	if err != nil {
		otel.Handle(err)
		if c.rpcDuration == nil {
			c.rpcDuration = noop.Float64Histogram{}
		}
	}

	c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
		metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
		metric.WithUnit("By"))
	if err != nil {
		otel.Handle(err)
		if c.rpcRequestSize == nil {
			c.rpcRequestSize = noop.Int64Histogram{}
		}
	}

	c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
		metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
		metric.WithUnit("By"))
	if err != nil {
		otel.Handle(err)
		if c.rpcResponseSize == nil {
			c.rpcResponseSize = noop.Int64Histogram{}
		}
	}

	c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
		metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
		metric.WithUnit("{count}"))
	if err != nil {
		otel.Handle(err)
		if c.rpcRequestsPerRPC == nil {
			c.rpcRequestsPerRPC = noop.Int64Histogram{}
		}
	}

	c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
		metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
		metric.WithUnit("{count}"))
	if err != nil {
		otel.Handle(err)
		if c.rpcResponsesPerRPC == nil {
			c.rpcResponsesPerRPC = noop.Int64Histogram{}
		}
	}

	return c
}

type tracerProviderOption struct{ tp trace.TracerProvider }

func (o tracerProviderOption) apply(c *config) {
	if o.tp != nil {
		c.TracerProvider = o.tp
	}
}

func WithTracerProvider(tp trace.TracerProvider) Option {
	return tracerProviderOption{tp: tp}
}
あかりあかり
stats/handlers.go
package stats

type Handler interface {
    // TagRPC can attach some information to the given context.
	// The context used for the rest lifetime of the RPC will be derived from
	// the returned context.
	TagRPC(context.Context, *RPCTagInfo) context.Context // 重要なのはこれ
	HandleRPC(context.Context, RPCStats)
	TagConn(context.Context, *ConnTagInfo) context.Context
	HandleConn(context.Context, ConnStats)
}
opentelemetry-go-contrib/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
	return ctx
}

func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}

func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
	ctx = extract(ctx, h.config.Propagators)

	name, attrs := internal.ParseFullMethod(info.FullMethodName)
	attrs = append(attrs, RPCSystemGRPC)
	ctx, _ = h.tracer.Start(
		trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
		name,
		trace.WithSpanKind(trace.SpanKindServer),
		trace.WithAttributes(attrs...),
	)

	gctx := gRPCContext{
		metricAttrs: attrs,
		record:      true,
	}
	if h.config.Filter != nil {
		gctx.record = h.config.Filter(info)
	}
	return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}

func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
	isServer := true
	h.handleRPC(ctx, rs, isServer)
}
あかりあかり

https://opentelemetry.io/docs/specs/otel/logs/

Of all telemetry signals logs have probably the biggest legacy. Most programming languages have built-in logging capabilities or well-known, widely used logging libraries.

For metrics and traces OpenTelemetry takes the approach of a clean-sheet design, specifies a new API and provides full implementations of this API in multiple languages.

Our approach with logs is somewhat different. For OpenTelemetry to be successful in logging space we need to support existing legacy of logs and logging libraries, while offering improvements and better integration with the rest of observability world where possible.

This is in essence the philosophy behind OpenTelemetry’s logs support. We embrace existing logging solutions and make sure OpenTelemetry works nicely with existing logging libraries, log collection and processing solutions.

なるほど、Logは他のTelemetryとは違って、言語ごとにすでに実装が存在しているので、それを上手にOpenTelemetryに統合する必要があるのだと。