opentelemetry-go/opentelemetry-goの実装を読み解く
前提として、この構成を理解しておく必要がある。
では、open-telemetry/opentelemetry-go
は上記の構成をどのように実装しているのかを見ていく。
OpenTelemetryには、signalsと呼ばれる独立したobservability toolsがあり、Context Propagationという仕組みを共有している。
そして、それぞれのシグナルは同様の構成で作られている。
つまり、シグナル一つに着目して実装を読み解くことで大体の実装を理解することができる。
ここでは、stableであるTraceに着目して実装を読み解いていく。
まず、プロジェクトの構成は以下のような感じ。
.
├── internal
│ └── global
│ └── state.go
├── sdk # コアロジック
│ ├── doc.go
│ └── optional_function.go
├── trace # APIの具体的な実装
│ ├── doc.go
│ └── optional_function.go
├── trace.go # API
├── go.mod
└── go.sum
The API package is a self-sufficient dependency, in the sense that if the end-user application or a third-party library depends only on it and does not plug a full SDK implementation then the application will still build and run without failing, although no telemetry data will be actually delivered to a telemetry backend.
アプリケーションや3rd-partyライブラリがSDKの実装を利用していなかったとしてもビルドできて、失敗せずに動作する必要がある(下図のように)。これをどう実現しているのか?
package global
type (
tracerProviderHolder struct {
tp trace.TracerProvider
}
)
var (
globalTracer = defaultTracerValue()
)
func TracerProvider() trace.TracerProvider {
return globalTracer.Load().(tracerProviderHolder).tp
}
func defaultTracerValue() *atomic.Value {
v := &atomic.Value{}
v.Store(tracerProviderHolder{tp: &tracerProvider{}})
return v
}
package global
type tracerProvider struct {
embedded.TracerProvider
mtx sync.Mutex
tracers map[il]*tracer
delegate trace.TracerProvider
}
変数globalTracer
にデフォルト値としてdefaultTracerValue()
を入れているのポイント。
じゃあ、SDKがちゃんと利用されるようになっている場合はどうか?
SDKを利用するというのは、以下のようにTraceProviderをセットアップした上でTraceを取得するということ。
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tp = sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
tr := otel.Tracer(moduleName)
つまり、otel.SetTracerProvider(tp)
が何をしているのかを見れば良い。
package otel
import (
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/trace"
)
func SetTracerProvider(tp trace.TracerProvider) {
global.SetTracerProvider(tp)
}
package global
type (
tracerProviderHolder struct {
tp trace.TracerProvider
}
)
var (
//
globalTracer = defaultTracerValue()
//
delegateTraceOnce sync.Once
)
func TracerProvider() trace.TracerProvider {
return globalTracer.Load().(tracerProviderHolder).tp
}
func SetTracerProvider(tp trace.TracerProvider) {
current := TracerProvider()
if _, cOk := current.(*tracerProvider); cOk {
if _, tpOk := tp.(*tracerProvider); tpOk && current == tp {
// Do not assign the default delegating TracerProvider to delegate
// to itself.
Error(
errors.New("no delegate configured in tracer provider"),
"Setting tracer provider to its current value. No delegate will be configured",
)
return
}
}
delegateTraceOnce.Do(func() {
if def, ok := current.(*tracerProvider); ok {
def.setDelegate(tp)
}
})
globalTracer.Store(tracerProviderHolder{tp: tp})
}
package global
type tracerProvider struct {
embedded.TracerProvider
mtx sync.Mutex
tracers map[il]*tracer
delegate trace.TracerProvider
}
func (p *tracerProvider) setDelegate(provider trace.TracerProvider) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.delegate = provider
if len(p.tracers) == 0 {
return
}
for _, t := range p.tracers {
t.setDelegate(provider)
}
p.tracers = nil
}
import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)
type serverHandler struct {
*config
}
//もしかしたらgrpc.StatsHandlerをセットしなくても、Unaryのトレースをとるだけなら可能かも
s := grpc.NewServer(grpc.StatsHandler(otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithPropagators(p),
)))
package otelgrpc
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts, "server"),
}
return h
}
package otelgrpc
type config struct {
Filter Filter
InterceptorFilter InterceptorFilter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
SpanStartOptions []trace.SpanStartOption
ReceivedEvent bool
SentEvent bool
tracer trace.Tracer
meter metric.Meter
rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}
// Option applies an option value for a config.
type Option interface {
apply(*config)
}
func newConfig(opts []Option, role string) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
MeterProvider: otel.GetMeterProvider(),
}
for _, o := range opts {
o.apply(c)
}
c.tracer = c.TracerProvider.Tracer(
ScopeName,
trace.WithInstrumentationVersion(SemVersion()),
)
c.meter = c.MeterProvider.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
if c.rpcDuration == nil {
c.rpcDuration = noop.Float64Histogram{}
}
}
c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcRequestSize == nil {
c.rpcRequestSize = noop.Int64Histogram{}
}
}
c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcResponseSize == nil {
c.rpcResponseSize = noop.Int64Histogram{}
}
}
c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcRequestsPerRPC == nil {
c.rpcRequestsPerRPC = noop.Int64Histogram{}
}
}
c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcResponsesPerRPC == nil {
c.rpcResponsesPerRPC = noop.Int64Histogram{}
}
}
return c
}
type tracerProviderOption struct{ tp trace.TracerProvider }
func (o tracerProviderOption) apply(c *config) {
if o.tp != nil {
c.TracerProvider = o.tp
}
}
func WithTracerProvider(tp trace.TracerProvider) Option {
return tracerProviderOption{tp: tp}
}
package stats
type Handler interface {
// TagRPC can attach some information to the given context.
// The context used for the rest lifetime of the RPC will be derived from
// the returned context.
TagRPC(context.Context, *RPCTagInfo) context.Context // 重要なのはこれ
HandleRPC(context.Context, RPCStats)
TagConn(context.Context, *ConnTagInfo) context.Context
HandleConn(context.Context, ConnStats)
}
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)
gctx := gRPCContext{
metricAttrs: attrs,
record: true,
}
if h.config.Filter != nil {
gctx.record = h.config.Filter(info)
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := true
h.handleRPC(ctx, rs, isServer)
}
ハンズオン
Getting started with otelsql, the OpenTelemetry instrumentation for Go SQL
Of all telemetry signals logs have probably the biggest legacy. Most programming languages have built-in logging capabilities or well-known, widely used logging libraries.
For metrics and traces OpenTelemetry takes the approach of a clean-sheet design, specifies a new API and provides full implementations of this API in multiple languages.
Our approach with logs is somewhat different. For OpenTelemetry to be successful in logging space we need to support existing legacy of logs and logging libraries, while offering improvements and better integration with the rest of observability world where possible.
This is in essence the philosophy behind OpenTelemetry’s logs support. We embrace existing logging solutions and make sure OpenTelemetry works nicely with existing logging libraries, log collection and processing solutions.
なるほど、Logは他のTelemetryとは違って、言語ごとにすでに実装が存在しているので、それを上手にOpenTelemetryに統合する必要があるのだと。