gRPC-Go+CloudTraceで分散トレーシング
はじめに
以前
以前にAkka gRPC(Scala)+CloudTraceで分散トレーシングなんて記事を書きましたが、Akka gRPCだけじゃなくてGo言語(gRPC-Go)でもサービスが動いてたんで追加してみました。
※CloudTraceおよびOpenTelemetryについての説明は割愛(上記記事または各ドキュメントを参照してください)。
※GCPドキュメント上ではGoでのOpenTelemetryの利用はβ扱いになっています。使用ライブラリであるOpenTelemetry-Goを見ると「Traces」はStableになっているのでトレースのみなら問題なさそうです。
Goで実装
導入は楽だよ
Akka gRPCの時もそうでしたが分散トレーシングを考えない場合、GoとOpenTelemetryを参考にすれば特に苦も無く実装できます。
ただ、Akka gRPC同様に分散トレーシングしたい場合はコンテキストをプロパゲーション(伝搬)する必要があります。
Akka gRPCではJavaとの兼ね合いでコンテキストのパーサーであるプロパーゲーター(伝搬屋)を自力実装したりInterceptorが無い等、結構面倒なことになってましたね。
それに比べてgRPC-Goは、W3C準拠のプロパゲーターがそのまま使えるし、Interceptorもあるしで楽勝です。
前準備
GoとOpenTelemetryの通り、必要なライブラリをインポートし、トレーサーを初期化します。
そこにプロパゲーターを指定する設定を追加します。←必須
// https://cloud.google.com/trace/docs/setup/go-otを一部改変
package main
import (
"context"
"log"
"os"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func main() {
ctx := context.Background()
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
exporter, err := texporter.New(texporter.WithProjectID(projectID))
if err != nil {
log.Fatalf("texporter.NewExporter: %v", err)
}
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
defer tp.ForceFlush(ctx) // flushes any pending spans
otel.SetTracerProvider(tp)
// ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
otel.SetTextMapPropagator(propagation.TraceContext{})
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
tracer := otel.GetTracerProvider().Tracer("example.com/trace")
err = func(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "foo")
defer span.End()
// Do some work.
return nil
}(ctx)
}
Interceptorを実装
gRPC-GoにはInterceptorがあるのでそこにトレーサーを組み込みましょう。
その前にキャリアを更新・削除用にプロパゲーターに引き渡すTextMapCarrier
インタフェースに従った定義が必要になります。今回はgRPC-GoなのでMetadata(google.golang.org/grpc/metadata.MD
)を拡張します。
package tracing
// importは省略
type MetadataCarrier metadata.MD
func (mc MetadataCarrier) Get(key string) string {
md := metadata.MD(mc)
v := md.Get(key)
if len(v) == 0 {
return ""
}
return v[0]
}
func (mc MetadataCarrier) Set(key string, value string) {
md := metadata.MD(mc)
md.Append(key, value)
}
func (mc MetadataCarrier) Keys() []string {
md := metadata.MD(mc)
keys := make([]string, 0, len(md))
for k := range md {
keys = append(keys, k)
}
return keys
}
上記を利用してInterceptorを定義します。
Interceptorは全サービスが通ってしまうので素朴なフィルタも考えてみました。
package tracing
// importは省略
var (
IGNORE_METHOD = map[string]interface{}{
"/grpc.health.v1.Health/Check": nil,
"/grpc.health.v1.Health/Watch": nil,
}
)
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if _, ignore := IGNORE_METHOD[info.FullMethod]; ignore {
return handler(ctx, req)
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(ctx, req)
}
copyMD := md.Copy()
ctx = otel.GetTextMapPropagator().Extract(ctx, MetadataCarrier(copyMD))
ctx, span := otel.GetTracerProvider().Tracer(TRACER_NAME).Start(
ctx,
info.FullMethod,
trace.WithSpanKind(trace.SpanKindServer),
)
defer span.End()
res, err := handler(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
return res, err
}
}
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
if _, ignore := IGNORE_METHOD[info.FullMethod]; ignore {
return handler(srv, ss)
}
ctx := ss.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(srv, ss)
}
copyMD := md.Copy()
ctx = otel.GetTextMapPropagator().Extract(ctx, MetadataCarrier(copyMD))
tracer := otel.GetTracerProvider().Tracer(TRACER_NAME)
_, span := tracer.Start(
ctx,
info.FullMethod,
trace.WithSpanKind(trace.SpanKindServer),
)
defer span.End()
err := handler(srv, ss)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
return err
}
}
あとはこれらをサーバーの初期化時に噛ませるだけです。
// 初期化部分のみ抜粋
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(tracing.StreamServerInterceptor()),
grpc.UnaryInterceptor(tracing.UnaryServerInterceptor()),
)
終わりに
まとめ
- gRPC-Go+CloudTraceは楽。
Discussion